テクノロジー部門技術推進本部の笹田です。最近、ある大学で Ruby インタプリタを作ってみよう、みたいな集中講義をさせていただいてるんですが、実装に使う言語がみんな Ruby じゃなくて面白かったです。
本稿では、Ruby で並列処理を手軽に実現するための機構 Ractor の API について、以前から気になっていた部分を最近になって一新し、Ractor::Port
というものを導入したので、その内容をご紹介します。従来の API にはどうもしっくりこず、ずっと喉に骨がつっかえている感じがして、5年くらいずっと考えていたんですが、ようやっと決断しました。
提案した ticket: Feature #21262: Proposal: Ractor::Port
- Ruby - Ruby Issue Tracking System
概要だけ説明しますと、次のような感じです。
- なくなった
Ractor#take
,Ractor.yield
,Ractor.receive_if
,Ractor#incoming_port
,Ractor#outgoing_port
がなくなりました。
- 代わりに次を使ってね
Ractor の終了を待ちたい ->
Ractor#join
(覚え方:Thread#join
と一緒)r = Ractor.new{fib(30)} r.join
Ractor の終了を待ち、そしてそのブロックの結果が欲しい ->
Ractor#value
(覚え方:Thread#value
と一緒)r = Ractor.new{fib(30)} p r.value #=> 1346269
ほかの Ractor からメッセージを受け取るための専用の通信路を設けたい ->
Ractor::Port
(覚え方:TCP の port 番号みたいなもん)port1 = Ractor::Port.new port2 = Ractor::Port.new Ractor.new(port1){|port| port << :hello} Ractor.new(port2){|port| port << :world} port2.receive #=> 必ず :world port1.receive #=> 必ず :hello
各 Ractor には一つずつ最初から Port (default port)があいていて、それを
Ractor#default_port
で取り出せます。また、Ractor#send
,Ractor.receive
は、その default port に対する処理になります。つまり、Ractor#send, .receive
はこれまで通り利用できます。r = Ractor.new{ p Ractor.receive } #=> 42 と表示 r << 42
ついでに、Ractor#monitor
, unmonitor
, Ractor::Port#close
というメソッドがありますが、普通に使う分には重要ではありません。
Ruby のメソッドを消すのは、最近はだいぶ慎重にやるんですが、Ractor は 3.0 からずっと「実験的機能」と言い張っていたので、強気の非互換です。まぁ、まだちゃんと使ってる人はいないよね、ということで。
$ ruby -e Ractor.new{} -e:1: warning: Ractor is experimental, and the behavior may change in future versions of Ruby! Also there are many implementation issues.
背景: 既存の API の問題点
これまで、Ractor 同士で通信をするには、push 型(Ractor#send, .receive
のペア)、もしくは pull 型(Ractor.yield, #take
のペア)という整理をしてきました。
このプリミティブで何か専用の仕事をする「サーバー」と通信しようとすると、ちょっと面倒くさい、という問題がありました。簡単な2者間通信だと問題ないんですが、登場人物(Ractors)が増えてくると難しくなってくるという。
問題 | 具体例 | 起きる不具合 |
---|---|---|
メッセージの識別が困難 | Ractor.receive は 1 個の受信箱。複数サーバの結果が混在 |
複数のサーバ的 Ractor から結果を送られたとき、混ざっちゃうかも |
受信箱の奪い合い | ライブラリ A も B も Ractor.receive を呼ぶ |
他人のメッセージを“誤飲”する |
API が多岐にわたる | receive_if / yield / #take / select を組合せ |
学習・実装コストが肥大、CI が不安定に |
(ChatGPT がまとめてくれた表を一部手直し。受信箱って表現がいいね)
この問題を、具体的なコードをみながら解説します。
EX1: 応答なしサーバ
何らかの機能を提供するサーバ的な Ractor を1個作ってみます。今回はフィボナッチ数を計算する fib_srv
。
def fib(n) = n > 1 ? fib(n-2) + fib(n-1) : 1 # A ractor calculate fib(n) fib_srv = Ractor.new do while true param = Ractor.receive result = fib(param) end end fib_srv << 10 # ← 戻り値を受け取る手段がない
とくに応答を受けるものではないので、問題ないです。例えば、とにかくメールを送信する、といったケースで使うでしょうか(でも失敗時にどうするかは難しいよね)。
ただ、結果を受け取りたいケースが多いと思うので(今回の例のようなフィボナッチを計算するサーバだと、答えが得られないと、多分意味がないですよね)、次から応答を受けるケースを考えていきます。
EX2: 呼び出し元 Ractor を一緒に送る
では、結果を送り返してくれるサーバを書きましょう。send/receive
で書くとこんな感じでしょうか。
fib_srv = Ractor.new do while true param, sender = Ractor.receive result = fib(param) sender << result end end fib_srv << [10, Ractor.current] do_some_work() Ractor.receive #=> fib(10)
サーバは、パラメータと一緒に、クライアントの情報(sender
)を受け取り、そこに実行結果を送り返しています。
クライアントは、Ractor.receive
によって結果を受け取ります。これも問題なさそうです。
では、登場人物を増やしてみましょう。
EX3: サーバを増やすと混線
サーバを増やしてみます。fib に対して factorial を計算する fact サーバを追加してみました。
def fact(n) = n > 1 ? fact(n-1) * n : 1 fib_srv = Ractor.new do while true param, sender = Ractor.receive result = fib(param) sender << result end end fact_srv = Ractor.new do while true param, sender = Ractor.receive result = fact(param) sender << result end end fib_srv << [10, Ractor.current] fib_srv << [20, Ractor.current] fact_srv << [10, Ractor.current] fact_srv << [20, Ractor.current] do_some_work() Ractor.receive #=> fib(10) の結果、 fact(10) か、どっちだろう?
さて、これを実行してみると、何か結果が返ってくるんですが、この結果が fact(10) の結果か fib(10) の結果かわかりません。
というわけで、問題です。なんとかしないといけないです。
ちなみに、サーバがさらに子 worker Ractors を使っている場合、fib(20) や fact(20) が返ってきちゃうかもしれません(つまり、リクエストに対して、返ってくる順序が保障されないことがある)。まぁ、これは「どういうサーバにするか」って定義の話ではあります。
EX4: タグで識別
「どこから返ってきたか」を理解できるようにしましょう。というわけで、タグを加えて(この場合、Symbol)、「このメッセージはどういうものです」というのをちゃんと示すようにしましょう。
fib_srv = Ractor.new do while true param, sender = Ractor.receive result = fib(param) sender << [[:fib, param], result] end end fact_srv = Ractor.new do while true param, sender = Ractor.receive result = fact(param) sender << [[:fact, param], result] end end fib_srv << [10, Ractor.current] fib_srv << [20, Ractor.current] fact_srv << [10, Ractor.current] fact_srv << [20, Ractor.current] do_some_work() Ractor.receive_if do |id, result| case id in [:fib, n] p "fib(#{n}) = #{result}" in [:fact, n] p "fact(#{n}) = #{result}" end end # or if you want to use specific results, like: p fib20: Ractor.receive_if{|id, result| id => [:fib, 20]; result} p fact10: Ractor.receive_if{|id, result| id => [:fact, 10]; result} p fact20: Ractor.receive_if{|id, result| id => [:fact, 20]; result} p fib10: Ractor.receive_if{|id, result| id => [:fib, 10]; result}
これで、Ractor.receive
で結果を見ようとしたとき、どこから返ってきたか、ということがわかります。
問題ないように見えます。ちなみに、Erlang とかはこういうアプローチです。
ただ、実は問題は残っていて、例えば結果を受け取る前に使っている some_work()
が Ractor.receive
をして、返ってきたメッセージを捨ててしまったりすると、本当に必要なところでメッセージを取得することができません。また、サーバが返すタグが、うっかり被ってしまったときも問題になりそうです(例えば、fact
だから大丈夫だと思ったら、真実を返す fact
サーバと被っちゃったりして)。
もちろん、自分が書いているアプリケーションであれば、そんなアホなことをしないと思います。しかし、外部ライブラリなどが不用意に Ractor.receive
を呼んでいる場合、その限りではありません。言い換えれば、この仕組みは「自分の目が届く範囲でしか安全に使えない」という制限があるということです。
EX5: Channel を作ってみる
そこで、用途を限定した通信路を作ってみましょう。 Go言語にあるようなChannelを作ることで対応できます。 Ractor では、Channel 用 Ractor を使って同じようなものができます。
fib_srv = Ractor.new do while true param, sender = Ractor.receive result = fib(param) sender << result end end fact_srv = Ractor.new do while true param, sender = Ractor.receive result = fact(param) sender << result end end # Create a new channel using a Ractor def new_channel Ractor.new do while true Ractor.yield Ractor.receive end end end fib_srv << [10, fib10_ch = new_channel] fib_srv << [20, fib20_ch = new_channel] fact_srv << [10, fact10_ch = new_channel] fact_srv << [20, fact20_ch = new_channel] do_some_work() p fib20: fib20_ch.take # wait for fib(20) p fact10: fact10_ch.take # wait for fact(10) p fib10: fib10_ch.take # wait for fib(10) p fact20: fact10_ch.take # wait for fact(20) # or chs = [fib10_ch, fib20_ch, fact10_ch, fact20_ch] while !chs.empty? ch, result = Ractor.select(*chs) # wait for multiple channels p ch, result chs.delete ch end
Channel を使うアプローチによって、特定の結果を返す通信路ができ、その通信路を経由して特定の結果をクライアントは得ることができるようになりました。
まず、Channel を使うには Ractor を1つ作成する必要があり、そのぶんコストが高くなります。また、送信時に余分なコピー処理が発生するため、パフォーマンス上の負荷もあります。さらに、Ractor が「アクターモデル」を掲げているにもかかわらず、この Channel のアプローチは、アクターモデルらしさに欠けるという思想的な違和感もありました。
実は、Ractor を導入したとき、この仕組みで Channel を作れば問題ないかなー、性能については専用のデータ構造を作ればいいかなー、と思っていました。ただ、「アクターっぽくないなぁ」というのがひっかかって、どうしようかなぁ、と悩んで5年という感じでした。
で、他の方から「Channel を入れようぜ」、という提案(Feature #21121: Ractor channels - Ruby - Ruby Issue Tracking System)が来て、最初は「ついに入れるか...」と重い腰を上げようとしていたところだったのですが、あらためて色々と悩み直し、今回の Ractor::Port
に至ったという次第です。尻たたかれ開発。
提案: Ractor::Port
― 軽量・一意・片方向エンドポイント
というわけで、Ractor::Port
(以下 Port)を提案しました。Port は「誰でもそこに送信できる」、「作った Ractor しか受信できない」という仕組みです。
以下は、既存の Channel との比較です:
送信 | 受信 | |
---|---|---|
Channel | だれでもできる | だれでもできる |
Port | だれでもできる | 生成者のみができる |
ちょっとした違いですが、これでぐっとアクターモデルっぽくなります。また、生成が軽量で、送信も Ractor#send
と同等のオーバーヘッドでできます。
Port は2つの要素を持ちます。
- 送信先 Ractor … どこへ届けるか
- 一意タグ … その Port に固有の識別子
この2つの要素をもつ Port を、これまでの Ractor API を用いて次のように定義できます。
class Ractor::Port def initialize @r = Ractor.current @tag = genid() end def send(obj) # 送信は誰でもOK @r << [@tag, obj] end def receive # 受信は作成者だけ raise unless @r == Ractor.current Ractor.receive_if { |(tag,res)| return res if tag == @tag } # タグが同じものだけ受信 end end
見てのとおり、簡単な構造なので、生成コストは軽量です。
同期処理を Ractor::Port#receive
に限定することで、Ractor 間で同期をとる必要があった部分(クリティカルセクションは除く)の実装が、劇的に簡単になります。
例えば、こんな感じで使います。
port = Ractor::Port.new Ractor.new(port) do |port| port.send 42 # port << 42 でもよい(alias) end port.receive #=> 42
「ここに送ってほしい」という情報として、Port を渡し、送るときはRactor::Port#send
を使って送ります。意味的には、ractor.send(tag, 42)
とまったく同じです(上記実装もそんな感じですね)。
受信側は、Ractor::Port#receive
を使って、その port へ届いたものを受け取ります。
この例だと、単に Port を出現させただけなので、従来の Ractor#send
よりも面倒な感じになっていますが、複雑な例で意味が出てきます。
Port を用いた変更
では、具体的に fib, fact サーバを Port を用いて書き直していきます。
fib_srv = Ractor.new do while true param, sender = Ractor.receive result = fib(param) sender << result end end fact_srv = Ractor.new do while true param, sender = Ractor.receive result = fact(param) sender << result end end fib_srv << [10, fib10_port = Ractor::Port.new] fib_srv << [20, fib20_port = Ractor::Port.new] fact_srv << [10, fact10_port = Ractor::Port.new] fact_srv << [20, fact20_port = Ractor::Port.new] do_some_work() p fib10_port.receive #=> fib(10) p fib20_port.receive #=> fib(20) p fact10_port.receive #=> fact(10) p fact20_port.receive #=> fact(20)
ほぼ、Channel を使っている例を Port にしただけですね。tag + receive_if
である実装とも、ほぼ同じようなものになりますが、違いは「tag の利用を強制している」ということになるかと思います(あと、別の場所で Ractor.receive
で捨てられることはない、という安心感かな)。
Ractor.select
は複数の Port を同時に待つことができるようにしたので、次のように待てます。
ports = [fib10_port, fib20_port, fact10_port, fact20_port] while !ports.empty? port, result = Ractor.select(*ports) case port when fib10_port p fib10: result ... else raise "This should not happen (BUG)." end ports.delete(port) end
この例では、ports
に格納された4つの Port のうち、どれかにメッセージが到達した場合、それを receive した結果が返ります。
ちなみに、IO.select
は「read などできる」状態の IO を返しますが、Ractor.select
は受信まで終わらしてしまいます。Go 言語の select 文は、受け取ってどうする、って処理まで記述します。ちょっとずつ違いますね。
チャンネル/キューとの比較
まず、Port の利点をまとめておきます。
- 受信メッセージが 必ず 目的の Ractor だけに配送されることを保証し、タグ衝突を防ぎます。
- 意図しないメッセージ消費が起こり得る
Ractor.receive
に頼らずに、メッセージの通信路を作ることができます。 .receive_if
,.yield
,#take
といった複雑なプリミティブを、よりシンプルで合成しやすい抽象に置き換えます。- Ruby が Ractor で目指す アクターモデルのセマンティクスに、自然でわかりやすい形で対応します。
そして、Channel との比較をまとめておきます。
まずは利点。
- 実践上はチャンネルより安全
- Port で
#send
が成功する → 送信先 Ractor がまだ生存・実行中であることを示します。 - これに対しチャンネルでは、そのチャンネルを受け取れる Ractor が稼働中かどうか保証がありません。
- もちろん Port でも、受信側がメッセージを必ず処理するわけではなく、無視される可能性はあります。とはいえ「送信先 Ractor がすでに落ちていてメッセージが闇に消える」という失敗ケースは排除できます。つまり Port を使うことで故障要因をひとつ取り除き、通信モデルをより予測しやすくできます。
- ちなみに、これが Ruby が “CSP” ではなく “Actor” モデル(= Ractor)を採用した理由のひとつです。
- Port で
- 生成も送信もチャンネルより高速
- チャンネルを作る際は送信されたメッセージを格納するためのコンテナ構造の割り当てが必要ですが、Port は「Ractor + 新規 ID」の軽量データだけで済みます(厳密には「開いている」という状態を記録するためにもうちょっと必要)。
- チャンネル経由でのメッセージの送信では「送信元 → チャンネル」、「チャンネル → 受信側 Ractor」の 2 回コピーが発生しますが、Port なら「送信元 Ractor → 受信側 Ractor」の 1 回だけですみます。今後、Ractor-local GC によるオブジェクト空間の分離が計画されているため、この差はさらに効いてきます。
- 実装がシンプル
- 基本的に
Port#receive
の同期処理だけ実装すれば OK。#send
/.receive
は「受信側 Ractor をロックするだけ」で済むので容易。.yield
/#take
はランデブー同期のため送信・受信双方をロックする必要があり厄介。.select
は現行仕様では非常に難しく、CI もまだ不安定(二度大きく改修したんですが)。
- 仕様が簡潔になればバグも減り、実装速度も向上します。多分。
- 基本的に
個人的な一番の利点は「実装がシンプルになる」という点です。そもそもアクターモデルを採用する一つの理由がそこではあるのですが、yield/take と、それをサポートする select の実装が本当に本当にしんどくて...。結局最後まで完全にバグが取れませんでした。一部VM-wide のグローバルロックを取って実装すれば、そんな大変でもないのですが、性能のために使わないでやったら本当に難しかった。
そして欠点です。
- Port という概念はあまり知られておらず、とくに Go ユーザーには馴染みが薄い。
- 典型的な producer-consumer 型では、Port を土台にもう一段の抽象化が必要になる。
後者については、例えば multiple workers (Ractors)を用いるためには、こんな感じでしょうか。
controller = Ractor::Port.new workers = (1..WN).times.map do Ractor.new controller do |controller| while true controller << Ractor.current port, param = Ractor.receive result = task(param) port << [param, result] end end end task_q = Queue.new result_port = Ractor::Port.new Thread.new do while param = task_q.pop worker = controller.receive worker.send [result_port, param] end end task_q << 42 task_q << 43 task_q << 44 3.times do result_port.receive end
この場合、producer は1つの Ractor しか対応していないので、複数の Ractor が worker にタスクを投入できるようにするには、調停役の Ractor を一つ用意してあげるひつようがあります。が、ちょっと面倒になってきたのでこの辺で。
ちなみに、ここで Thread を使っていますが、従来の Ractor API は Thread 未対応でした(何が起こるかわからない系)。今回 Port の導入でコードがすっきりしたので、勢いで対応させました。シンプルにした効果が出てる。
Port の制約は、Go の context みたいなのを Port で素直に作れません。これがどれくらい足枷になるかわかってないので、そこはちょっと気になっています(Channel の close でなんで足りなかったのかわかっていない)。
Ractor の終了待ち
Ractor#take
を消すぞ、となると、Ractor の終了を待つ、そしてそのブロックの値を得る方法がなくなってしまいます。そこで、Thread にある Thread#join
(終了を待つ)、Thread#value
(終了を待って、そのスレッドのブロックの値を返す)の名前を持ってきて、Ractor#join
, Ractor#value
を用意することにしました。
r = Ractor.new{ 42 } r.join # 待つ r.value #=> 42
ちょっと特殊な仕様になっているのが、「Ractor#value
はたかだか1つの Ractor しかできない」という点です。すでにある Ractor が #value
で値を得ているとき、他の Ractor が #value
を実行すると、エラーになります。
r = Ractor.new{42} p r.value #=> 42, main Ractor が value を実行している Ractor.new(r){|r| r.value } # エラー
というのも、最後の結果は、他の誰もアクセスできないので、ある1つの Ractor がアクセスする限り安全なので、コピーしないで渡します(unshareable objectを unshareable object のまま受け取れる)。
実は、Ractor#take
でも、最後の値については、そのような特殊な仕様になっていました。が、同じメソッドなのに挙動が異なるのは気持ち悪いなぁ、と思っていたので、今回 Ractor#value
で整理できたのはよかったです。
Default port と互換性
Ractor は、生成されたときから1つの default port を持っています。Ractor に引数を渡すために使われるのですが、これを Ractor#default_port
で取り出せるようになっています。
Ractor に対する Ractor#send
, Ractor.receive
は、すべて default port への操作になります。
r = Ractor.new do Ractor.receive #=> 42 # これは、Ractor.current.default_port.receive と同じ end r << 42 # これは、r.default_port.send と意味的に同じ # Ractor の外からは取り出せなくなるかも
monitor API
join
, value
を実装するために、低レベルのモニター API というのを導入しています。ちなみに、Erlang 由来。
r = Ractor.new{ 42 } r.monitor(monitor_port = Ractor::Port.new) monitor_port.receive #=> :exited
monitor に登録した Port に、終了時に Symbol :exited
が渡ってきます。例外などで死んだら :aborted
。他のイベントがとれるようになるかは、ちょっとわかんないです(今後用途が出てきたら)。
これを用いることで、#join
が簡単に実装できます(実際しています: 当該コード)。また、この API を用いることで、複数の Ractor の生存確認ができます。
monitor_port = Ractor::Port.new 10.times.map do Ractor.new{ unlimited_task }.tap do r.monitor monitor_port end end while true msg = monitor_port.receive do_something # 例えば、代わりの Ractor を追加するとか end
(あれ、これだと :exited
だけもらっても、どの Ractor が死んだかわからないですね。送信元 Ractor の情報も要りそうだな。どういう API がいいかなぁ。monitor Port にもうちょっと情報を渡すか、from がわかる receive
を作るか。どっちがいいかな)
この辺がしたくて5年前に Ractor#take
を設計したんですが、うーん、うまくいかないものですね。
close
Port は close することができます。
Ractor.new port = Ractor::Port.new do loop{ port << 42 # closed な port に送ろうとして、いつか ClosedError になる } end port.close
現状、close できるのは、Port を作成した Ractor だけになります。誰でも close できるようにするのはちょっと面倒(待っている人をたたき起こす必要がある)なので、明確に必須であるとわかるまではそのままにしようと思います。
検討した他の案
Channel 的なものについての検討はすでに書いた通りです。
Port はタグ付き send を強制するものである、ということから、Ractor#send(tag, ...)
とする、という案もありました。ただ、tag の名前が被っちゃう問題どうしようかなぁ、と思って、どうせなら OO らしくオブジェクトにしちゃえ、ということで Port にしました。
Port の名前はあんまり考えないで付けたんですが、あまり他の選択肢も思い浮かばず。他の言語に似たような仕組みがないか調べたのですが、意外と見つかりませんでした。TCP の port 番号くらい。
今後の予定
すでに Port は master branch に導入されているので、お試しいただけます。 皆さんの忌憚ないご意見を頂けると幸いです。
いくつか、積み残しの部分があります。
Ractor::Port.new
が長い
Ractor.port
とかどうだろうと思ってるんですが、どうですかね。IO.pipe
みたいなノリ。あんまり作る感じがしないんだけど。ダメかな。
close の挙動
前提として、Port は unshareable object、つまり、他の Ractor に渡すときにコピーを作ります。
で、作っていて気づいたのですが、他の Ractor に渡した Port があろうとも、自 Ractor 内の Port がすべて GC された場合、その Port を読む人が居なくなるので、それは close するべきなんですよね。
port = Ractor::Port.new Ractor.new port do |port| 10times{ port << it } # 10 messages 送る end port.receive #=> 0 が返る。残り9個のメッセージは受け取らない port = nil # もう受け取る手段はなくなる ... # プログラムを続けるが、残り 9 個のメッセージは残り続ける
こういうケースでは、(自 Ractor の)Portが GCされた時点で close するとよいかなと思っています。そうすることで、すでに送られたメッセージを開放できる(誰も見ないしね)、それから他の Ractor が無駄に送信するのを防ぐことができる、と思っています。
この辺は Port に参照カウントを持たせて作れるかなぁと考えています。
あと、途中に書きましたが、他の Ractor が Port を close していいのか、という話。もう送らないよ、ということを示すために良いような、そうでもないような。まだこれから仕様が動きそうです。
まとめ
Ractor::Port
を導入し、Ractor API を刷新しました。これにより、Ractor 間の通信が整理され、より利用しやすくなるかと思います。
よかったら遊んでみてください。
今度こそ、Ractor から experimental warning が外れるといいね。