Perlで並行処理

Perl 5.6だとスレッドが使えなかったので、forkして子プロセスを作るという伝統的なスタイルで実装した。単にforkしただけだと、子プロセスに値を渡すことはできても戻り値を受け取ることができないから、その部分にプロセス間通信(Unix-Domain TCP)を使ってみた。

### pararell.pl

use strict;

use POSIX ":sys_wait_h";
use Socket;
use IO::Handle;
use Data::Dumper;

my $sock_path = '/tmp/catsock.$$';

BEGIN { $ENV{PATH} = '/usr/bin:/bin' }

sub wait_all {
    my $kid;
    do {
        $kid = waitpid(-1, &WNOHANG);
    } until $kid == -1;
}

sub send_to_parent {
    my $obj = shift @_;

    my $sock;
    socket($sock, PF_UNIX, SOCK_STREAM, 0) || die "socket: $!";
    connect($sock, sockaddr_un($sock_path)) || die "connect: $!";

    $sock->autoflush(1);

    my $d = new Data::Dumper([$obj]);
    $d->Terse(1);
    $d->Indent(0);

    my $data = $d->Dump;

    print $sock $$, "\n", length($data), "\n", $data;
    close($sock);
}

sub recv_from_child {
    my $sock = shift @_;

    $sock->autoflush(1);

    my $pid = <$sock> + 0;
    my $len = <$sock> + 0;

    my $res;
    read($sock, $res, $len);
    close $sock;

    my $obj = eval $res;

    return ($pid, $obj);
}

sub parallel {
    my ($max_process, $worker, $callback, @args) = @_;

    my $uaddr = sockaddr_un($sock_path);
    my $proto = getprotobyname('tcp');

    socket(SERVER, PF_UNIX, SOCK_STREAM, 0) || die "socket: $!";
    unlink($sock_path);

    SERVER->autoflush(1);

    bind(SERVER, $uaddr) || die "bind: $!";
    listen(SERVER, SOMAXCONN) || die "listen: $!";

    my %children;
    while (1) {
        while (@args && (keys %children < $max_process)) {
            my $arg = shift @args;

            my $pid = fork;
            die "Can't fork: $!" unless defined $pid;

            if ($pid) {
                # parent
                $children{$pid} = $arg;
            } else {
                # child
                my $ret = $worker->($arg);
                &send_to_parent($ret);

                exit;
            }
        }

        last unless keys %children;

        my $sock;
        accept($sock, SERVER);

        my ($pid, $res) = &recv_from_child($sock);

        my $waitedpid;
        do {
            $waitedpid = waitpid($pid, &WNOHANG);
        } while $waitedpid == 0;
        die "$waitedpid is NOT my child!" unless $children{$waitedpid};

        $callback->($children{$pid}, $res);

        delete $children{$pid};
    }

    &wait_all;
}

こうやって使う。

#!/usr/bin/perl -Tw

use strict;

require 'parallel.pl';

sub worker {
    my $msg = shift @_;

    # do something
    sleep(1);
    srand;
    sleep(rand 5);

    return [length($msg), uc($msg)];
}

sub main {
    sub callback {
        my ($arg, $ret) = @_;

        my ($len, $msg) = @{$ret};

        printf "%s -> [$d, %s]\n", $arg, $len, $msg;
    }

    my @msg = qw(red green blue yellow white);

    &parallel(3, \&worker, \&callback, @msg);
}
&main;

接続して来たプロセスのPIDをサーバ側で得る方法が分からなかったので、苦肉の策としてクライアントから自己申告するようなプロトコルにしている。実際には子プロセスでなくてもソケットに書き込めるプロセスからはいくらでも接続可能なので、これはセキュリティ上の問題になるかも知れない。

参考URL