MapReduce::Framework::Simple 解説1(自ドメインブログからの転載)

ドメインのブログからの転載です。2016年10月7日に書かれた記事です。 今後は技術的な内容をはてなブログで書いて行こうかなと思います。

MapReduce::Framework::Simple 解説1

不定期で、自分が作ったモジュールの解説を3回に分けて投稿してみるテスト。

以前、MapReduce::Framework::Simpleというモジュールを作った。

MapReduce::Framework::Simple : GitHub

MapReduce::Framework::Simple : CPAN

httpでリモートのサーバと連携して分散処理を実現するためのモジュールとなっている。

作成動機

CPANにParallel::MapReduceというモジュールがある。 これは、リモートへmapperとなるサブルーチンと処理の対象となるデータをssh経由で送りグリッドコンピューティングなMapReduceを実現するという2008年頃の古いモジュールで、エクスペリメンタルなものだった。

処理をworker(処理を依頼されるリモートのコンピュータ)に投げるときに、master(処理を依頼するコンピュータ)からはsshでパスワードなし公開鍵認証を使いworkerへログインできる状態になっている必要があるため、「気軽に分散処理したい」というケースではなかなか使えない。 その上、プログラムでエラーがあると、ssh接続が強制的に切られ、sshのプロセスが大量に残るということもあった。

もう既にMapReduceよりも使い勝手やパフォーマンスの良いプログラミングモデルがある上に、Hadoopなどのエコシステムが成熟しているわけだが、とりあえず「手っ取り早く環境が作れて、持て余している計算資源でグリッドコンピューティングして計算時間を節約したい。できればperlだけで完結させたい」というコンセプトで、Parallel::MapReduceのようにsshでリモートとつなげるのではなく、普通にhttpで接続するようなお手軽MapReduce環境構築モジュールを作ることにした。

インストール方法

cpanmなどで、MapReduce::Framework::Simpleをインストールすればmaster、worker共に最低限の環境構築は完了する。

 $ cpanm MapReduce::Framework::Simple

使い方

MapReduceの"Hello, World!“としてはワードカウントが取り上げられることが多いが、とりあえず、ランダムに生成した3万個の数字の合計・平均をリモートの3台のコンピュータに計算させてみることにする。

worker側の準備

まずは、workerとして使いたいリモートのサーバ上で、MapReduce::Framework::Simpleをインストールし、以下のコマンドでworkerサーバを起動しておく。

 $ perl -MMapReduce::Framework::Simple -e 'MapReduce::Framework::Simple->new->worker("/your_secret_path");'

最後の方にある"/your_secret_path"は、workerが起動する処理受付用のhttpサーバのパスになるので、他人に推察されにくいパスにすることを推奨する。 また、workerのhttpサーバは既にStarletがインストールされていればそれで起動するようになっており、そうでなければシングルプロセスのPlackで起動する。1つのworkerへ同時に幾つもの処理対象データを投げるような使い方をする場合は、Starletをインストールしておくと良い。

さらに、Starletはインストールしているが敢えてStarletを使いたくない場合は、newの引数に「force_plackup => 1」を与えればPlackで起動させることができる。

 $ perl -MMapReduce::Framework::Simple -e 'MapReduce::Framework::Simple->new(force_plackup => 1)->worker("/your_secret_path");'

masterのプログラム

master側のプログラムでは、処理対象のデータとそれを処理を受付けるworkerのURLを格納した配列リファレンスの配列リファレンス、workerで実行させるサブルーチンのリファレンス(mapperと呼ぶ)、workerから受け取った処理結果をmaster側で集約するサブルーチンのリファレンス(reducerと呼ぶ)の、3つを用意する必要がある。

まずは例題の通り3万個の数値を3台のサーバに分散処理させるため、以下のように配列リファレンスの配列リファレンスを作る。

my $data_map_reduce;
my $remotes = [
    'http://192.168.0.1:5000/eval',
    'http://192.168.0.2:5000/eval',
    'http://192.168.0.3:5000/eval'
    ];
for(0 .. 2){
    my $tmp_data;
    for(1 .. 10000){
        push(@$tmp_data,rand(10000));
    }
    push(@$data_map_reduce,[$tmp_data,$remotes->[$_]]);
}

基本的には、[<処理対象データ>,<workerサーバのURL>]という形の配列リファレンスを配列リファレンスに詰め込んでいく。

次に、mapperとreducerの書き方を見てみる。

# mapper code
my $mapper = sub {
    my $input = shift;
    my $sum = 0;
    my $num = $#$input + 1;
    for(0 .. $#$input){
        $sum += $input->[$_];
    }
    my $avg = $sum / $num;
    return({avg => $avg, sum => $sum, num => $num});
};

# reducer code
my $reducer = sub {
    my $input = shift;
    my $sum = 0;
    my $avg = 0;
    my $total_num = 0;
    for(0 .. $#$input){
        $sum += $input->[$_]->{sum};
        $total_num += $input->[$_]->{num};
    }
    $avg = $sum / $total_num;
    return({avg => $avg, sum => $sum});
};

上記のように、mapper reducerいずれも引数としてデータを配列リファレンスの形で受け取るように作り、 mapperでは平均・合計・レコード数を計算してハッシュリファレンスでreturn、 reducerではハッシュリファレンスの配列リファレンスをloopで回して最終的な平均値と合計を計算している。

無論、このコーディング方法はあくまでも一つの例である。 mapperの引数が必ずしも配列リファレンスの配列リファレンスである必要はなく、引数を一つだけ、返り値も一つだけという形になっていれば良い。 変数をひとつだけにするという約束事のために、リファレンスを使っているということに過ぎない。

mapperのreturnとreducerのinputの関係には注意が必要である。 各workerのreturnは、処理が完了した順番にmaster側の配列にpushされ、そのリファレンスをreducerのinputとして扱うという約束事なっているため、 このサンプルコードのreducerでは入力配列リファレンスのループ処理中のように $input->[$_]->{sum} といった形でmapperからのreturn内容を参照することができる。

最後に、作成したdata,mapper,reducerでMapReduce処理をmap_reduceメソッドで実行させるコードを記述する。

my $result = $mfs->map_reduce(
    $data_map_reduce,
    $mapper,
    $reducer,
    5
    );

print Dumper $result; # 結果確認用

最後の引数はmaster側でforkする引数となる。masterのメモリが許す限り、worker数以上の値を設定しておくと良い。

実行

master側のスクリプトを cal_avg_total.pl という名前で保存したと仮定すると、後は以下のコマンドで実行するだけでよい。

$ perl cal_avg_total.pl

実行すると合計と平均値が出力されるはずである。 正常に実行できなかった場合、workerサーバが起動されているか、URLが正しいかを確認。

まとめ

基本的に、workerの環境構築はMapReduce::Framework::Simpleをインストールするだけで良く、master側はmapperのreturnとreducerのinputの関係に従って処理を書いていくだけでOKだ。 MapReduceな書き方に慣れていない場合はそれなりに時間がかかってしまうが、そのあたりは練度の問題である。

このような方式の分散処理は、単一のサーバでは計算に数分・数時間・数日間もかかる程のデータ量で「高度に並列化された」問題を手っ取り早く高速化させたい場合に有効である。 ただし、数秒で完了するような計算処理では通信オーバーヘッドが原因で逆に遅くなってしまったり、シーケンシャルに処理をしなければならないような「高度に並列化できない問題」ではそもそも分散処理ができなかったりするのでそこはご注意を。 このモジュール自体はその他にも、ウェブスクレイパーを並列に実行したり、バッチジョブを並列に実行したりなどといった使い方も可能ではあるが、そのような用途では普通に単一サーバ内でParallel::ForkManager等を使った方が良い。

次回

今回の例はランダムに生成した数値のsumとavgMapReduceで計算させるという内容だったが、次回は実践的な使い方を紹介したいと思う。