0 && $edn < $lmax ) ? $edn: $lmax; $fmt = '%0'.strlen($edn).'d: '; for (; $stn<=$edn; $stn++ ) { $tstr = htmlspecialchars(mb_convert_encoding($lines[$stn-1],SOURCE_ENCODING,"auto")); $res .= ( $args[2] ? sprintf($fmt,$stn):'').refpre_replace_tab($tstr); } return '
'.$res.'
'; } ?> Python:Hadoop:MapReduceサンプル - HiiHahWIKI - making some notes for... -
トップ   編集 凍結 差分 バックアップ 添付 複製 名前変更 リロード   新規 一覧 単語検索 最終更新   ヘルプ   最終更新のRSS

Python:Hadoop:MapReduceサンプル

Last-modified: 2013-08-24 (土) 02:14:21 (3898d)
Top / Python:Hadoop:MapReduceサンプル

Python:Hadoop:MapReduce?サンプル

HadoopのMapReduce?のプログラムをHadoopStreaming?を使ってPythonで書いてみました。

※CDH環境で実行してますので、実行時のパス等は適当に読み替えてください。

Reducerの処理は一工夫必要だけど、簡単に書けます。

ログの集計とか、Hadoop使ってやるとほんと簡単に実行できるなーと実感した限りです。

処理対象データの一部

こんな感じのデータが入力で、時刻(分)単位のレスポンスタイムの平均を求めたいです。

test.txt

#refpre(test.txt,,1);

第一カラム
時刻(ミリ秒まで出力されている)
第四カラム
レスポンスタイム(ミリ秒)

ソース

こんな感じです。

map.py

#refpre(map.py,,1);

ほんとは、入力値チェックをして、エラーレコードを除外するコードとか入れないといけないですけど、まあサンプルということで割愛!

reduce.py

#refpre(reduce.py,,1);

reduceの方のコードのほうが、javaで実装するときと違ってひと手間必要です。

詳細はしたの方で解説。。。

まずはhadoopを使わないで動作確認

3つのファイルを同じディレクトリにおいて、こんな感じで単体で実行できます。

$ cat test.txt | python ./map.py | sort | python ./reduce.py 
2011/11/25-21:25        41
2011/11/26-02:00        32
2011/11/26-06:05        62

すると、各時刻(分)毎のレス分すタイムの平均値が出力されます。

こんな感じで、お手軽に動作確認することができます。

ちなみに、hadoop-streamingでreduceに引き渡されるレコードは、javaの場合と違ってkey+list<value>ではなく、(key+value)がn行引き渡されます。

具体的には

$ cat test.txt | python ./map.py | sort

の出力が引き渡されることになります。大体w

javaの場合は

  • key毎のvalueのリストが渡されるため
  • 処理の中では「特定のkey」が引き渡される想定で処理を書けばOK

ですが、hadoop-streamingでは

  • reduce側でも常に「key,value」のペアが引き渡されることに注意
  • reduce側で、複数種類のkeyが引き渡されることに注意

して実装する必要があります。

これを意識しての、ソースコードのポイント①~③です。

ポイント①
まず、current_dateがkeyとして引き渡された時刻と一致しているか確認
一致する場合は、同じ時刻ということで、総数のカウントアップとレスポンスタイムの足しこみを実施。
一致しない場合、最初のレコードか、もしくは引き渡されたkeyが次のキーとなったことを示す。
ポイント②
一致しなかった場合、current_dateが存在するかを確認
存在する場合、keyが次のkeyに切り替わったこととなるため、レスポンスタイムの平均値を計算し、標準出力に出力⇒reduceのoutputとなる。
また、初回にしろ、keyが切り替わったにしろ、そのキーの初回のデータであるため、current_count、total_rtimeを初期化し、current_dateにkeyを入れる。
ポイント③
このロジックで行くと、一番最後のkeyの集計値はポイント②直上のelseブロックに入らずに終了してしまうため、最後に引き渡されたkeyを出力するためにポイント③のifブロックがある。

文章だと非常にわかりにくいが、ポイント③以降を削除し実行するわかると思います。(最後にreducerに引き渡されるkeyの集計値は出力されないことがわかります)

Hadoopクラスタ上で実行

スクリプトを以下のディレクトリに格納し、

/home/mapred/tools/hadoop-streaming

データをhdfs上の以下のディレクトリに格納した場合、

/tmp/test1/input

hadoop-streamingの実行コマンドラインは以下のような形になる。※えらい長い、、、

$ hadoop jar /usr/lib/hadoop/contrib/streaming/hadoop-streaming-0.20.2-cdh3u2.jar -file /home/mapred/tools/hadoop-streaming/map.py -mapper /home/mapred/tools/hadoop-streaming/map.py -file /home/mapred/tools/hadoop-streaming/reduce.py -reducer /home/mapred/tools/hadoop-streaming/reduce.py -input /tmp/test1/input/* -output /tmp/test1/output

まあ、実行時の出力は以下の通り。

$ hadoop jar /usr/lib/hadoop/contrib/streaming/hadoop-streaming-*.jar -file /home/mapred/tools/hadoop-streaming/map.py -mapper /home/mapred/tools/hadoop-streaming/map.py -file /home/mapred/tools/hadoop-streaming/reduce.py -reducer /home/mapred/tools/hadoop-streaming/reduce.py -input /tmp/test1/input/* -output /tmp/test1/output
packageJobJar: [/home/mapred/tools/hadoop-streaming/map.py, /home/mapred/tools/hadoop-streaming/reduce.py, /var/lib/hadoop-0.20/cache/mapred/hadoop-unjar5748707038844758461/] [] /tmp/streamjob4463822418633633376.jar tmpDir=null
11/11/29 18:30:12 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
11/11/29 18:30:12 WARN snappy.LoadSnappy: Snappy native library not loaded
11/11/29 18:30:12 INFO mapred.FileInputFormat: Total input paths to process : 1
[GC 31872K->1712K(122304K), 0.0093920 secs]
11/11/29 18:30:13 INFO streaming.StreamJob: getLocalDirs(): [/var/lib/hadoop-0.20/cache/mapred/local]
11/11/29 18:30:13 INFO streaming.StreamJob: Running job: job_201111231541_0003
11/11/29 18:30:13 INFO streaming.StreamJob: To kill this job, run:
11/11/29 18:30:13 INFO streaming.StreamJob: /usr/lib/hadoop-0.20/bin/hadoop job  -Dmapred.job.tracker=sadcm01:8021 -kill job_201111231541_0003
11/11/29 18:30:13 INFO streaming.StreamJob: Tracking URL: http://sadcm01:50030/jobdetails.jsp?jobid=job_201111231541_0003
11/11/29 18:30:14 INFO streaming.StreamJob:  map 0%  reduce 0%
11/11/29 18:30:28 INFO streaming.StreamJob:  map 50%  reduce 0%
11/11/29 18:30:32 INFO streaming.StreamJob:  map 100%  reduce 0%
11/11/29 18:30:41 INFO streaming.StreamJob:  map 100%  reduce 100%
11/11/29 18:30:42 INFO streaming.StreamJob: Job complete: job_201111231541_0003
11/11/29 18:30:42 INFO streaming.StreamJob: Output: /tmp/test1/output

よくできましたw

参考サイト


添付ファイル: filemap.py 705件 [詳細] filetest.txt 784件 [詳細] filereduce.py 687件 [詳細]