トップ   新規 一覧 単語検索 最終更新   ヘルプ   最終更新のRSS

Python:Hadoop:MapReduceサンプル のバックアップソース(No.2)

*Python:Hadoop:MapReduceサンプル [#y3cf2253]

HadoopのMapReduceのプログラムをHadoopStreamingを使ってPythonで書いてみました。

※CDH環境で実行してますので、実行時のパス等は適当に読み替えてください。

Reducerの処理は一工夫必要だけど、簡単に書けます。

ログの集計とか、Hadoop使ってやるとほんと簡単に実行できるなーと実感した限りです。

***処理対象データの一部 [#w1fb1b7a]
こんな感じのデータが入力で、時刻(分)単位のレスポンスタイムの平均を求めたいです。

■''test.txt''
#refpre(test.txt,,1);


:第一カラム|時刻(ミリ秒まで出力されている)
:第四カラム|レスポンスタイム(ミリ秒)


***ソース [#s43aec4d]
こんな感じです。

■''map.py''
#refpre(map.py,,1);

ほんとは、入力値チェックをして、エラーレコードを除外するコードとか入れないといけないですけど、まあサンプルということで割愛!


■''reduce.py''
#refpre(reduce.py,,1);

reduceの方のコードのほうが、javaで実装するときと違ってひと手間必要です。

詳細はしたの方で解説。。。

***まずはhadoopを使わないで動作確認 [#y866be9d]

3つのファイルを同じディレクトリにおいて、こんな感じで単体で実行できます。

 $ cat test.txt | python ./map.py | sort | python ./reduce.py 
 2011/11/25-21:25        41
 2011/11/26-02:00        32
 2011/11/26-06:05        62

すると、各時刻(分)毎のレス分すタイムの平均値が出力されます。

こんな感じで、お手軽に動作確認することができます。

ちなみに、hadoop-streamingでreduceに引き渡されるレコードは、javaの場合と違ってkey+list<value>ではなく、(key+value)がn行引き渡されます。

具体的には

 $ cat test.txt | python ./map.py | sort

の出力が引き渡されることになります。大体w

javaの場合は

-key毎のvalueのリストが渡されるため
-処理の中では「特定のkey」が引き渡される想定で処理を書けばOK

ですが、hadoop-streamingでは

-reduce側でも常に「key,value」のペアが引き渡されることに注意
-reduce側で、複数種類のkeyが引き渡されることに注意

して実装する必要があります。

これを意識しての、ソースコードのポイント①~③です。

:ポイント①|まず、current_dateがkeyとして引き渡された時刻と一致しているか確認&br;
一致する場合は、同じ時刻ということで、総数のカウントアップとレスポンスタイムの足しこみを実施。
&br;
一致しない場合、最初のレコードか、もしくは引き渡されたkeyが次のキーとなったことを示す。
:ポイント②|一致しなかった場合、current_dateが存在するかを確認
&br;
存在する場合、keyが次のkeyに切り替わったこととなるため、レスポンスタイムの平均値を計算し、標準出力に出力⇒reduceのoutputとなる。
&br;
また、初回にしろ、keyが切り替わったにしろ、そのキーの初回のデータであるため、current_count、total_rtimeを初期化し、current_dateにkeyを入れる。
:ポイント③|このロジックで行くと、一番最後のkeyの集計値はポイント②直上のelseブロックに入らずに終了してしまうため、最後に引き渡されたkeyを出力するためにポイント③のifブロックがある。&br;

文章だと非常にわかりにくいが、ポイント③以降を削除し実行するわかると思います。(最後にreducerに引き渡されるkeyの集計値は出力されないことがわかります)

***Hadoopクラスタ上で実行 [#s4557883]

スクリプトを以下のディレクトリに格納し、

 /home/mapred/tools/hadoop-streaming

データをhdfs上の以下のディレクトリに格納した場合、

 /tmp/test1/input

hadoop-streamingの実行コマンドラインは以下のような形になる。※えらい長い、、、

 $ hadoop jar /usr/lib/hadoop/contrib/streaming/hadoop-streaming-0.20.2-cdh3u2.jar -file /home/mapred/tools/hadoop-streaming/map.py -mapper /home/mapred/tools/hadoop-streaming/map.py -file /home/mapred/tools/hadoop-streaming/reduce.py -reducer /home/mapred/tools/hadoop-streaming/reduce.py -input /tmp/test1/input/* -output /tmp/test1/output

まあ、実行時の出力は以下の通り。

 $ hadoop jar /usr/lib/hadoop/contrib/streaming/hadoop-streaming-*.jar -file /home/mapred/tools/hadoop-streaming/map.py -mapper /home/mapred/tools/hadoop-streaming/map.py -file /home/mapred/tools/hadoop-streaming/reduce.py -reducer /home/mapred/tools/hadoop-streaming/reduce.py -input /tmp/test1/input/* -output /tmp/test1/output
 packageJobJar: [/home/mapred/tools/hadoop-streaming/map.py, /home/mapred/tools/hadoop-streaming/reduce.py, /var/lib/hadoop-0.20/cache/mapred/hadoop-unjar5748707038844758461/] [] /tmp/streamjob4463822418633633376.jar tmpDir=null
 11/11/29 18:30:12 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
 11/11/29 18:30:12 WARN snappy.LoadSnappy: Snappy native library not loaded
 11/11/29 18:30:12 INFO mapred.FileInputFormat: Total input paths to process : 1
 [GC 31872K->1712K(122304K), 0.0093920 secs]
 11/11/29 18:30:13 INFO streaming.StreamJob: getLocalDirs(): [/var/lib/hadoop-0.20/cache/mapred/local]
 11/11/29 18:30:13 INFO streaming.StreamJob: Running job: job_201111231541_0003
 11/11/29 18:30:13 INFO streaming.StreamJob: To kill this job, run:
 11/11/29 18:30:13 INFO streaming.StreamJob: /usr/lib/hadoop-0.20/bin/hadoop job  -Dmapred.job.tracker=sadcm01:8021 -kill job_201111231541_0003
 11/11/29 18:30:13 INFO streaming.StreamJob: Tracking URL: http://sadcm01:50030/jobdetails.jsp?jobid=job_201111231541_0003
 11/11/29 18:30:14 INFO streaming.StreamJob:  map 0%  reduce 0%
 11/11/29 18:30:28 INFO streaming.StreamJob:  map 50%  reduce 0%
 11/11/29 18:30:32 INFO streaming.StreamJob:  map 100%  reduce 0%
 11/11/29 18:30:41 INFO streaming.StreamJob:  map 100%  reduce 100%
 11/11/29 18:30:42 INFO streaming.StreamJob: Job complete: job_201111231541_0003
 11/11/29 18:30:42 INFO streaming.StreamJob: Output: /tmp/test1/output

よくできましたw