- 追加された行はこの色です。
- 削除された行はこの色です。
*Python:Hadoop:MapReduceサンプル [#y3cf2253]
HadoopのMapReduceのプログラムをHadoopStreamingを使ってPythonで書いてみました。
※CDH環境で実行してますので、実行時のパス等は適当に読み替えてください。
Reducerの処理は一工夫必要だけど、簡単に書けます。
ログの集計とか、Hadoop使ってやるとほんと簡単に実行できるなーと実感した限りです。
***処理対象データの一部 [#w1fb1b7a]
こんな感じのデータが入力で、時刻(分)単位のレスポンスタイムの平均を求めたいです。
■''test.txt''
#refpre(test.txt,,1);
:第一カラム|時刻(ミリ秒まで出力されている)
:第四カラム|レスポンスタイム(ミリ秒)
***ソース [#s43aec4d]
こんな感じです。
■''map.py''
#refpre(map.py,,1);
ほんとは、入力値チェックをして、エラーレコードを除外するコードとか入れないといけないですけど、まあサンプルということで割愛!
■''reduce.py''
#refpre(reduce.py,,1);
reduceの方のコードのほうが、javaで実装するときと違ってひと手間必要です。
詳細はしたの方で解説。。。
***まずはhadoopを使わないで動作確認 [#y866be9d]
3つのファイルを同じディレクトリにおいて、こんな感じで単体で実行できます。
$ cat test.txt | python ./map.py | sort | python ./reduce.py
2011/11/25-21:25 41
2011/11/26-02:00 32
2011/11/26-06:05 62
すると、各時刻(分)毎のレス分すタイムの平均値が出力されます。
こんな感じで、お手軽に動作確認することができます。
ちなみに、hadoop-streamingでreduceに引き渡されるレコードは、javaの場合と違ってkey+list<value>ではなく、(key+value)がn行引き渡されます。
具体的には
$ cat test.txt | python ./map.py | sort
の出力が引き渡されることになります。大体w
javaの場合は
-key毎のvalueのリストが渡されるため
-処理の中では「特定のkey」が引き渡される想定で処理を書けばOK
ですが、hadoop-streamingでは
-reduce側でも常に「key,value」のペアが引き渡されることに注意
-reduce側で、複数種類のkeyが引き渡されることに注意
して実装する必要があります。
これを意識しての、ソースコードのポイント①~③です。
:ポイント①|まず、current_dateがkeyとして引き渡された時刻と一致しているか確認&br;
一致する場合は、同じ時刻ということで、総数のカウントアップとレスポンスタイムの足しこみを実施。
&br;
一致しない場合、最初のレコードか、もしくは引き渡されたkeyが次のキーとなったことを示す。
:ポイント②|一致しなかった場合、current_dateが存在するかを確認
&br;
存在する場合、keyが次のkeyに切り替わったこととなるため、レスポンスタイムの平均値を計算し、標準出力に出力⇒reduceのoutputとなる。
&br;
また、初回にしろ、keyが切り替わったにしろ、そのキーの初回のデータであるため、current_count、total_rtimeを初期化し、current_dateにkeyを入れる。
:ポイント③|このロジックで行くと、一番最後のkeyの集計値はポイント②直上のelseブロックに入らずに終了してしまうため、最後に引き渡されたkeyを出力するためにポイント③のifブロックがある。&br;
文章だと非常にわかりにくいが、ポイント③以降を削除し実行するわかると思います。(最後にreducerに引き渡されるkeyの集計値は出力されないことがわかります)
***Hadoopクラスタ上で実行 [#s4557883]
スクリプトを以下のディレクトリに格納し、
/home/mapred/tools/hadoop-streaming
データをhdfs上の以下のディレクトリに格納した場合、
/tmp/test1/input
hadoop-streamingの実行コマンドラインは以下のような形になる。※えらい長い、、、
$ hadoop jar /usr/lib/hadoop/contrib/streaming/hadoop-streaming-0.20.2-cdh3u2.jar -file /home/mapred/tools/hadoop-streaming/map.py -mapper /home/mapred/tools/hadoop-streaming/map.py -file /home/mapred/tools/hadoop-streaming/reduce.py -reducer /home/mapred/tools/hadoop-streaming/reduce.py -input /tmp/test1/input/* -output /tmp/test1/output
まあ、実行時の出力は以下の通り。
$ hadoop jar /usr/lib/hadoop/contrib/streaming/hadoop-streaming-*.jar -file /home/mapred/tools/hadoop-streaming/map.py -mapper /home/mapred/tools/hadoop-streaming/map.py -file /home/mapred/tools/hadoop-streaming/reduce.py -reducer /home/mapred/tools/hadoop-streaming/reduce.py -input /tmp/test1/input/* -output /tmp/test1/output
packageJobJar: [/home/mapred/tools/hadoop-streaming/map.py, /home/mapred/tools/hadoop-streaming/reduce.py, /var/lib/hadoop-0.20/cache/mapred/hadoop-unjar5748707038844758461/] [] /tmp/streamjob4463822418633633376.jar tmpDir=null
11/11/29 18:30:12 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
11/11/29 18:30:12 WARN snappy.LoadSnappy: Snappy native library not loaded
11/11/29 18:30:12 INFO mapred.FileInputFormat: Total input paths to process : 1
[GC 31872K->1712K(122304K), 0.0093920 secs]
11/11/29 18:30:13 INFO streaming.StreamJob: getLocalDirs(): [/var/lib/hadoop-0.20/cache/mapred/local]
11/11/29 18:30:13 INFO streaming.StreamJob: Running job: job_201111231541_0003
11/11/29 18:30:13 INFO streaming.StreamJob: To kill this job, run:
11/11/29 18:30:13 INFO streaming.StreamJob: /usr/lib/hadoop-0.20/bin/hadoop job -Dmapred.job.tracker=sadcm01:8021 -kill job_201111231541_0003
11/11/29 18:30:13 INFO streaming.StreamJob: Tracking URL: http://sadcm01:50030/jobdetails.jsp?jobid=job_201111231541_0003
11/11/29 18:30:14 INFO streaming.StreamJob: map 0% reduce 0%
11/11/29 18:30:28 INFO streaming.StreamJob: map 50% reduce 0%
11/11/29 18:30:32 INFO streaming.StreamJob: map 100% reduce 0%
11/11/29 18:30:41 INFO streaming.StreamJob: map 100% reduce 100%
11/11/29 18:30:42 INFO streaming.StreamJob: Job complete: job_201111231541_0003
11/11/29 18:30:42 INFO streaming.StreamJob: Output: /tmp/test1/output
よくできましたw
***参考サイト [#bb2bb6df]
-http://www.michael-noll.com/tutorials/writing-an-hadoop-mapreduce-program-in-python/