Python:Hadoop:MapReduceサンプル
Python:Hadoop:MapReduce?サンプル †
HadoopのMapReduce?のプログラムをHadoopStreaming?を使ってPythonで書いてみました。
※CDH環境で実行してますので、実行時のパス等は適当に読み替えてください。
Reducerの処理は一工夫必要だけど、簡単に書けます。
ログの集計とか、Hadoop使ってやるとほんと簡単に実行できるなーと実感した限りです。
処理対象データの一部 †
こんな感じのデータが入力で、時刻(分)単位のレスポンスタイムの平均を求めたいです。
■test.txt
#refpre(test.txt,,1);
- 第一カラム
- 時刻(ミリ秒まで出力されている)
- 第四カラム
- レスポンスタイム(ミリ秒)
ソース †
こんな感じです。
■map.py
#refpre(map.py,,1);
ほんとは、入力値チェックをして、エラーレコードを除外するコードとか入れないといけないですけど、まあサンプルということで割愛!
■reduce.py
#refpre(reduce.py,,1);
reduceの方のコードのほうが、javaで実装するときと違ってひと手間必要です。
詳細はしたの方で解説。。。
まずはhadoopを使わないで動作確認 †
3つのファイルを同じディレクトリにおいて、こんな感じで単体で実行できます。
$ cat test.txt | python ./map.py | sort | python ./reduce.py 2011/11/25-21:25 41 2011/11/26-02:00 32 2011/11/26-06:05 62
すると、各時刻(分)毎のレス分すタイムの平均値が出力されます。
こんな感じで、お手軽に動作確認することができます。
ちなみに、hadoop-streamingでreduceに引き渡されるレコードは、javaの場合と違ってkey+list<value>ではなく、(key+value)がn行引き渡されます。
具体的には
$ cat test.txt | python ./map.py | sort
の出力が引き渡されることになります。大体w
javaの場合は
- key毎のvalueのリストが渡されるため
- 処理の中では「特定のkey」が引き渡される想定で処理を書けばOK
ですが、hadoop-streamingでは
- reduce側でも常に「key,value」のペアが引き渡されることに注意
- reduce側で、複数種類のkeyが引き渡されることに注意
して実装する必要があります。
これを意識しての、ソースコードのポイント①~③です。
- ポイント①
- まず、current_dateがkeyとして引き渡された時刻と一致しているか確認
一致する場合は、同じ時刻ということで、総数のカウントアップとレスポンスタイムの足しこみを実施。
一致しない場合、最初のレコードか、もしくは引き渡されたkeyが次のキーとなったことを示す。 - ポイント②
- 一致しなかった場合、current_dateが存在するかを確認
存在する場合、keyが次のkeyに切り替わったこととなるため、レスポンスタイムの平均値を計算し、標準出力に出力⇒reduceのoutputとなる。
また、初回にしろ、keyが切り替わったにしろ、そのキーの初回のデータであるため、current_count、total_rtimeを初期化し、current_dateにkeyを入れる。 - ポイント③
- このロジックで行くと、一番最後のkeyの集計値はポイント②直上のelseブロックに入らずに終了してしまうため、最後に引き渡されたkeyを出力するためにポイント③のifブロックがある。
文章だと非常にわかりにくいが、ポイント③以降を削除し実行するわかると思います。(最後にreducerに引き渡されるkeyの集計値は出力されないことがわかります)
Hadoopクラスタ上で実行 †
スクリプトを以下のディレクトリに格納し、
/home/mapred/tools/hadoop-streaming
データをhdfs上の以下のディレクトリに格納した場合、
/tmp/test1/input
hadoop-streamingの実行コマンドラインは以下のような形になる。※えらい長い、、、
$ hadoop jar /usr/lib/hadoop/contrib/streaming/hadoop-streaming-0.20.2-cdh3u2.jar -file /home/mapred/tools/hadoop-streaming/map.py -mapper /home/mapred/tools/hadoop-streaming/map.py -file /home/mapred/tools/hadoop-streaming/reduce.py -reducer /home/mapred/tools/hadoop-streaming/reduce.py -input /tmp/test1/input/* -output /tmp/test1/output
まあ、実行時の出力は以下の通り。
$ hadoop jar /usr/lib/hadoop/contrib/streaming/hadoop-streaming-*.jar -file /home/mapred/tools/hadoop-streaming/map.py -mapper /home/mapred/tools/hadoop-streaming/map.py -file /home/mapred/tools/hadoop-streaming/reduce.py -reducer /home/mapred/tools/hadoop-streaming/reduce.py -input /tmp/test1/input/* -output /tmp/test1/output packageJobJar: [/home/mapred/tools/hadoop-streaming/map.py, /home/mapred/tools/hadoop-streaming/reduce.py, /var/lib/hadoop-0.20/cache/mapred/hadoop-unjar5748707038844758461/] [] /tmp/streamjob4463822418633633376.jar tmpDir=null 11/11/29 18:30:12 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 11/11/29 18:30:12 WARN snappy.LoadSnappy: Snappy native library not loaded 11/11/29 18:30:12 INFO mapred.FileInputFormat: Total input paths to process : 1 [GC 31872K->1712K(122304K), 0.0093920 secs] 11/11/29 18:30:13 INFO streaming.StreamJob: getLocalDirs(): [/var/lib/hadoop-0.20/cache/mapred/local] 11/11/29 18:30:13 INFO streaming.StreamJob: Running job: job_201111231541_0003 11/11/29 18:30:13 INFO streaming.StreamJob: To kill this job, run: 11/11/29 18:30:13 INFO streaming.StreamJob: /usr/lib/hadoop-0.20/bin/hadoop job -Dmapred.job.tracker=sadcm01:8021 -kill job_201111231541_0003 11/11/29 18:30:13 INFO streaming.StreamJob: Tracking URL: http://sadcm01:50030/jobdetails.jsp?jobid=job_201111231541_0003 11/11/29 18:30:14 INFO streaming.StreamJob: map 0% reduce 0% 11/11/29 18:30:28 INFO streaming.StreamJob: map 50% reduce 0% 11/11/29 18:30:32 INFO streaming.StreamJob: map 100% reduce 0% 11/11/29 18:30:41 INFO streaming.StreamJob: map 100% reduce 100% 11/11/29 18:30:42 INFO streaming.StreamJob: Job complete: job_201111231541_0003 11/11/29 18:30:42 INFO streaming.StreamJob: Output: /tmp/test1/output
よくできましたw