Message Queueとか
だいたいみんな、Message QueueとしてGearmanとかQ4MとかResqueとかRabbitMQとかZeroMQとかまあたくさんあるけど、なにかを使っていると思う。
Perlの人だとQudoとかTheSchwartzとかをつかっている人も多いと思う。
でも、preforkなworkerを実装するとなるとSignal処理とかをちゃんとやるのが意外と難しい。
下の2つのスライドを読むと難しいんだなぁという事がなんとなくわかるとおもう。
graceful shutdownとかgraceful restartとかは欲しいし、max_reqs_per_child的なこともしたいし、時間が掛かり過ぎているjobはリトライして新しいworkerで処理させたいとかそういう要求がある。
そこで、そのへんが共通化されたプロダクトがあると良いのではないかと考えた。
Dainamoについて
なお、perlにおいては似ている既成のプロジェクトとしてDainamoというものがある。*1
これは以下のような事ができて、以下のようなメリットがある。
- Message Queueの種類をあまり考慮せずにコードが書ける
- サーバー毎にworker数を設定し、Profileという単位でweightをかけてworker数をバランスする
- サーバー毎のプロセス数をスペックに合わせて管理しやすい
- Profileを1つのプロジェクトで複数つくって即処理したいものはweightを多めにするといったこともできる
- 設定ファイルからProfile::Group(Profileをまとめたもの)を複数読み込み、全てのworkerを起動できる
- 複数プロジェクトが同居するケースとかのセットアップが楽
これは業務で運用しているが、とても便利な反面、困ったこともいくつかでてきた。
- dainamo毎にworker数を設定し、Profileという単位でweightをかけてworker数をバランスする
- 実際にProfileに割り当てられるプロセス数が分かりにくい。
- 他のProfileの影響を受けてProfileのworker数が変化するので複数Projectの同居などを慎重に検討しなければならない。
- graceful_shutdown_timeout(default:10秒)を超えるとSIGKILLをworkerプロセスに送る
- 容赦が無さすぎて失敗時に走らせたい処理とかが出来ない
- Proc::Daemonでdaemonizeする機能がある
- upstartとかdaemontoolsとか使うのでdaemonizeする機能はいらない
- 3段階のforkとなっていてデバッグし辛い
- master process -> manager process (profile毎に1つ) -> worker process (実際のjobを捌く)
- (依存モジュールが古いサーバーとかだと)manager processだけゾンビってるとかが稀にある
Workmanについて
というわけで、もっとシンプルなものが欲しいと思っていた。そこで最近、Workman というプロジェクトを開始した。 Job-Queue Worker frameworkと銘打っている。だいたい以下のような事が出来る事を目標にしている。
あらゆるMessage Queueを同一のインターフェースで扱う事が出来る
- 時代の変化で古いMessage Queueから新しいMessage Queueに移行するときとかにあまりめんどくさい事考えたくない。
- 代わりに、一部のMessage Queueに特有の機能は使いにくくなるがそれは許容する。
- Queueバインディングの仕様を満たすためのテストを提供してQueueバインディングを開発しやすくする。
単一のProfileに対して単一のServerを立てる
- (dainamoと比較して)manager process単位でサーバーを立てる。
- 複数建てたければ建てればいい
上に挙げた2つのスライドのノウハウを取り入れる
- シグナルハンドリングの不具合とかを心配したくない。
graceful shutdown / graceful shutdown をサポートする
- 1つだけ時間がかかっているjobがあったとしても他のworkerはすぐrestartして欲しい
- かといっていつまでも死なないのも困るのでgraceful_shutdown_timeoutを超えたら強制終了させる
graceful_shutdown_timeoutを過ぎて強制終了させるときにSIGABRTを送る
- worker内ではWorkman::Server::Exception::ForceKilledがthrowされるのでそれを好きにゴニョる
- on_abortというhookがあるので再enqueueなどの処理を書いてあげる
max_reqs_per_childしてくれる
- あたりまえのようにサポートしてほしい
daemonizeをサポートしない
- upstartとかdaemontoolsとか使う
テストを充実させる
- 特にシグナル処理の安全性やWorkerのライフサイクルとかは手動で動作確認とか結構厳しいし自動化するべきである
CodeRefを簡単にworkerにしたい
- ちょっとしたプロジェクトとかだとあると便利なはず。Amon2::Lite的な発想。syntax sugar用意してもいいかも。
- 当然classとしてもタスクを定義出来るようにする。
実装状況
実は、だいたいコア部分はできている。 たまにテストがコケる事はあるが、sleepとか絡むので仕方ないのかなともおもう。 以下のような感じで書くともうだいたい動く。
サーバー側
use strict; use warnings; use utf8; use Data::Dumper; use Workman::Server; use Workman::Server::Profile; use Workman::Queue::Gearman; use Workman::Task; my $queue = Workman::Queue::Gearman->new(job_servers => ['127.0.0.1:7003']); my $profile = Workman::Server::Profile->new(max_workers => 10, queue => $queue); $profile->set_task_loader(sub { my $set = shift; warn "[$$] register tasks..."; $set->add( Workman::Task->new(Echo => sub { my $args = shift; warn Dumper $args; return $args; }) ); $set->add( Workman::Task->new(Abort => sub { my $args = shift; die Dumper $args; return; }) ); $set->add( Workman::Task->new(Busy => sub { my $args = shift; sleep 1 for 1..1000; return; }) ); }); Workman::Server->new(profile => $profile)->run();
set_task_loaderはWorkerプロセスで実行されるのがミソ。
この中でモジュールのロードを行えばgraceful restartしたときでもTaskが再読み込みできるし、
メモリを節約したいなら先に読んでおいてCoWを効かせてメモリを節約することもできる。
重いモジュールは先に読ませておくとかそういう小手先の諸々ができる。
クライアント側
use strict; use warnings; use utf8; use Workman::Client; use Workman::Queue::Gearman; use Try::Tiny; use Data::Dumper; my $queue = Workman::Queue::Gearman->new(job_servers => ['127.0.0.1:7003']); my $client = Workman::Client->new(queue => $queue); $client->enqueue_background(Echo => { msg => 'hello' }) for 1..10000;
こんなかんじで、共通化のインターフェースでenqueueできる。 ドキュメンテーションとか、configファイルから読む機能とか、簡単にworker serverを立ち上げるためのコマンドとかはこれからという段階だが、なんとなく雰囲気は伝わるかなと思う。 どうだろうか?
*1:gearman限定であればMasayuki Matsuki / Gearman-Starter - search.cpan.orgがある