STORES Product Blog

こだわりを持ったお商売を支える「STORES」のテクノロジー部門のメンバーによるブログです。

`Ractor::Port` ― Ractor の API を一新した話

テクノロジー部門技術推進本部の笹田です。最近、ある大学で Ruby インタプリタを作ってみよう、みたいな集中講義をさせていただいてるんですが、実装に使う言語がみんな Ruby じゃなくて面白かったです。

本稿では、Ruby で並列処理を手軽に実現するための機構 Ractor の API について、以前から気になっていた部分を最近になって一新し、Ractor::Port というものを導入したので、その内容をご紹介します。従来の API にはどうもしっくりこず、ずっと喉に骨がつっかえている感じがして、5年くらいずっと考えていたんですが、ようやっと決断しました。

提案した ticket: Feature #21262: Proposal: Ractor::Port - Ruby - Ruby Issue Tracking System

概要だけ説明しますと、次のような感じです。

  • なくなった
    • Ractor#take, Ractor.yield, Ractor.receive_if, Ractor#incoming_port, Ractor#outgoing_port がなくなりました。
  • 代わりに次を使ってね
    • Ractor の終了を待ちたい -> Ractor#join(覚え方:Thread#join と一緒)

      r = Ractor.new{fib(30)}
      r.join
      
    • Ractor の終了を待ち、そしてそのブロックの結果が欲しい -> Ractor#value(覚え方:Thread#value と一緒)

      r = Ractor.new{fib(30)}
      p r.value #=> 1346269
      
    • ほかの Ractor からメッセージを受け取るための専用の通信路を設けたい -> Ractor::Port(覚え方:TCP の port 番号みたいなもん)

      port1 = Ractor::Port.new
      port2 = Ractor::Port.new
      Ractor.new(port1){|port| port << :hello}
      Ractor.new(port2){|port| port << :world}
      
      port2.receive #=> 必ず :world
      port1.receive #=> 必ず :hello
      
    • 各 Ractor には一つずつ最初から Port (default port)があいていて、それを Ractor#default_port で取り出せます。また、Ractor#send, Ractor.receive は、その default port に対する処理になります。つまり、Ractor#send, .receive はこれまで通り利用できます。

      r = Ractor.new{ p Ractor.receive } #=> 42 と表示
      r << 42
      

ついでに、Ractor#monitor, unmonitor, Ractor::Port#close というメソッドがありますが、普通に使う分には重要ではありません。

Ruby のメソッドを消すのは、最近はだいぶ慎重にやるんですが、Ractor は 3.0 からずっと「実験的機能」と言い張っていたので、強気の非互換です。まぁ、まだちゃんと使ってる人はいないよね、ということで。

$ ruby -e Ractor.new{}
-e:1: warning: Ractor is experimental, and the behavior may change in future versions of Ruby! Also there are many implementation issues.

背景: 既存の API の問題点

これまで、Ractor 同士で通信をするには、push 型(Ractor#send, .receiveのペア)、もしくは pull 型(Ractor.yield, #take のペア)という整理をしてきました。

このプリミティブで何か専用の仕事をする「サーバー」と通信しようとすると、ちょっと面倒くさい、という問題がありました。簡単な2者間通信だと問題ないんですが、登場人物(Ractors)が増えてくると難しくなってくるという。

問題 具体例 起きる不具合
メッセージの識別が困難 Ractor.receive は 1 個の受信箱。複数サーバの結果が混在 複数のサーバ的 Ractor から結果を送られたとき、混ざっちゃうかも
受信箱の奪い合い ライブラリ A も B も Ractor.receive を呼ぶ 他人のメッセージを“誤飲”する
API が多岐にわたる receive_if / yield / #take / select を組合せ 学習・実装コストが肥大、CI が不安定に

(ChatGPT がまとめてくれた表を一部手直し。受信箱って表現がいいね)

この問題を、具体的なコードをみながら解説します。

EX1: 応答なしサーバ

何らかの機能を提供するサーバ的な Ractor を1個作ってみます。今回はフィボナッチ数を計算する fib_srv

def fib(n) = n > 1 ? fib(n-2) + fib(n-1) : 1

# A ractor calculate fib(n)
fib_srv = Ractor.new do
  while true
    param = Ractor.receive
    result = fib(param)
  end
end

fib_srv << 10   # ← 戻り値を受け取る手段がない

とくに応答を受けるものではないので、問題ないです。例えば、とにかくメールを送信する、といったケースで使うでしょうか(でも失敗時にどうするかは難しいよね)。

ただ、結果を受け取りたいケースが多いと思うので(今回の例のようなフィボナッチを計算するサーバだと、答えが得られないと、多分意味がないですよね)、次から応答を受けるケースを考えていきます。

EX2: 呼び出し元 Ractor を一緒に送る

では、結果を送り返してくれるサーバを書きましょう。send/receive で書くとこんな感じでしょうか。

fib_srv = Ractor.new do
  while true
    param, sender = Ractor.receive
    result = fib(param)
    sender << result
  end
end

fib_srv << [10, Ractor.current]

do_some_work()

Ractor.receive #=> fib(10)

サーバは、パラメータと一緒に、クライアントの情報(sender)を受け取り、そこに実行結果を送り返しています。

クライアントは、Ractor.receive によって結果を受け取ります。これも問題なさそうです。

では、登場人物を増やしてみましょう。

EX3: サーバを増やすと混線

サーバを増やしてみます。fib に対して factorial を計算する fact サーバを追加してみました。

def fact(n) = n > 1 ? fact(n-1) * n : 1

fib_srv = Ractor.new do
  while true
    param, sender = Ractor.receive
    result = fib(param)
    sender << result
  end
end

fact_srv = Ractor.new do
  while true
    param, sender = Ractor.receive
    result = fact(param)
    sender << result
  end
end


fib_srv << [10, Ractor.current]
fib_srv << [20, Ractor.current]
fact_srv << [10, Ractor.current]
fact_srv << [20, Ractor.current]

do_some_work()

Ractor.receive
#=> fib(10) の結果、 fact(10) か、どっちだろう?

さて、これを実行してみると、何か結果が返ってくるんですが、この結果が fact(10) の結果か fib(10) の結果かわかりません。

というわけで、問題です。なんとかしないといけないです。

ちなみに、サーバがさらに子 worker Ractors を使っている場合、fib(20) や fact(20) が返ってきちゃうかもしれません(つまり、リクエストに対して、返ってくる順序が保障されないことがある)。まぁ、これは「どういうサーバにするか」って定義の話ではあります。

EX4: タグで識別

「どこから返ってきたか」を理解できるようにしましょう。というわけで、タグを加えて(この場合、Symbol)、「このメッセージはどういうものです」というのをちゃんと示すようにしましょう。

fib_srv = Ractor.new do
  while true
    param, sender = Ractor.receive
    result = fib(param)
    sender << [[:fib, param], result]
  end
end

fact_srv = Ractor.new do
  while true
    param, sender = Ractor.receive
    result = fact(param)
    sender << [[:fact, param], result]
  end
end

fib_srv << [10, Ractor.current]
fib_srv << [20, Ractor.current]
fact_srv << [10, Ractor.current]
fact_srv << [20, Ractor.current]

do_some_work()

Ractor.receive_if do |id, result|
  case id
  in [:fib, n]
    p "fib(#{n}) = #{result}"
  in [:fact, n]
    p "fact(#{n}) = #{result}"
  end
end

# or if you want to use specific results, like:

p fib20:  Ractor.receive_if{|id, result| id => [:fib, 20];  result}
p fact10: Ractor.receive_if{|id, result| id => [:fact, 10]; result}
p fact20: Ractor.receive_if{|id, result| id => [:fact, 20]; result}
p fib10:  Ractor.receive_if{|id, result| id => [:fib, 10];  result}

これで、Ractor.receive で結果を見ようとしたとき、どこから返ってきたか、ということがわかります。 問題ないように見えます。ちなみに、Erlang とかはこういうアプローチです。

ただ、実は問題は残っていて、例えば結果を受け取る前に使っている some_work()Ractor.receive をして、返ってきたメッセージを捨ててしまったりすると、本当に必要なところでメッセージを取得することができません。また、サーバが返すタグが、うっかり被ってしまったときも問題になりそうです(例えば、fact だから大丈夫だと思ったら、真実を返す fact サーバと被っちゃったりして)。

もちろん、自分が書いているアプリケーションであれば、そんなアホなことをしないと思います。しかし、外部ライブラリなどが不用意に Ractor.receive を呼んでいる場合、その限りではありません。言い換えれば、この仕組みは「自分の目が届く範囲でしか安全に使えない」という制限があるということです。

EX5: Channel を作ってみる

そこで、用途を限定した通信路を作ってみましょう。 Go言語にあるようなChannelを作ることで対応できます。 Ractor では、Channel 用 Ractor を使って同じようなものができます。

fib_srv = Ractor.new do
  while true
    param, sender = Ractor.receive
    result = fib(param)
    sender << result
  end
end

fact_srv = Ractor.new do
  while true
    param, sender = Ractor.receive
    result = fact(param)
    sender << result
  end
end

# Create a new channel using a Ractor
def new_channel
  Ractor.new do
    while true
      Ractor.yield Ractor.receive
    end
  end
end


fib_srv << [10, fib10_ch = new_channel]
fib_srv << [20, fib20_ch = new_channel]
fact_srv << [10, fact10_ch = new_channel]
fact_srv << [20, fact20_ch = new_channel]

do_some_work()

p fib20: fib20_ch.take   # wait for fib(20)
p fact10: fact10_ch.take # wait for fact(10)
p fib10: fib10_ch.take   # wait for fib(10)
p fact20: fact10_ch.take # wait for fact(20)

# or 
chs = [fib10_ch, fib20_ch, fact10_ch, fact20_ch]

while !chs.empty?
  ch, result = Ractor.select(*chs) # wait for multiple channels
  p ch, result
  chs.delete ch
end

Channel を使うアプローチによって、特定の結果を返す通信路ができ、その通信路を経由して特定の結果をクライアントは得ることができるようになりました。

まず、Channel を使うには Ractor を1つ作成する必要があり、そのぶんコストが高くなります。また、送信時に余分なコピー処理が発生するため、パフォーマンス上の負荷もあります。さらに、Ractor が「アクターモデル」を掲げているにもかかわらず、この Channel のアプローチは、アクターモデルらしさに欠けるという思想的な違和感もありました。

実は、Ractor を導入したとき、この仕組みで Channel を作れば問題ないかなー、性能については専用のデータ構造を作ればいいかなー、と思っていました。ただ、「アクターっぽくないなぁ」というのがひっかかって、どうしようかなぁ、と悩んで5年という感じでした。

で、他の方から「Channel を入れようぜ」、という提案(Feature #21121: Ractor channels - Ruby - Ruby Issue Tracking System)が来て、最初は「ついに入れるか...」と重い腰を上げようとしていたところだったのですが、あらためて色々と悩み直し、今回の Ractor::Port に至ったという次第です。尻たたかれ開発。

提案: Ractor::Port ― 軽量・一意・片方向エンドポイント

というわけで、Ractor::Port(以下 Port)を提案しました。Port は「誰でもそこに送信できる」、「作った Ractor しか受信できない」という仕組みです。

以下は、既存の Channel との比較です:

送信 受信
Channel だれでもできる だれでもできる
Port だれでもできる 生成者のみができる

ちょっとした違いですが、これでぐっとアクターモデルっぽくなります。また、生成が軽量で、送信も Ractor#send と同等のオーバーヘッドでできます。

Port は2つの要素を持ちます。

  1. 送信先 Ractor … どこへ届けるか
  2. 一意タグ … その Port に固有の識別子

この2つの要素をもつ Port を、これまでの Ractor API を用いて次のように定義できます。

class Ractor::Port
  def initialize
    @r   = Ractor.current
    @tag = genid()
  end

  def send(obj)               # 送信は誰でもOK
    @r << [@tag, obj]
  end

  def receive                 # 受信は作成者だけ
    raise unless @r == Ractor.current
    Ractor.receive_if { |(tag,res)| return res if tag == @tag } # タグが同じものだけ受信
  end
end

見てのとおり、簡単な構造なので、生成コストは軽量です。

同期処理を Ractor::Port#receive に限定することで、Ractor 間で同期をとる必要があった部分(クリティカルセクションは除く)の実装が、劇的に簡単になります。

例えば、こんな感じで使います。

port = Ractor::Port.new
Ractor.new(port) do |port|
    port.send 42 # port << 42 でもよい(alias)
end

port.receive #=> 42

「ここに送ってほしい」という情報として、Port を渡し、送るときはRactor::Port#sendを使って送ります。意味的には、ractor.send(tag, 42) とまったく同じです(上記実装もそんな感じですね)。

受信側は、Ractor::Port#receive を使って、その port へ届いたものを受け取ります。

この例だと、単に Port を出現させただけなので、従来の Ractor#send よりも面倒な感じになっていますが、複雑な例で意味が出てきます。

Port を用いた変更

では、具体的に fib, fact サーバを Port を用いて書き直していきます。

fib_srv = Ractor.new do
  while true
    param, sender = Ractor.receive
    result = fib(param)
    sender << result
  end
end

fact_srv = Ractor.new do
  while true
    param, sender = Ractor.receive
    result = fact(param)
    sender << result
  end
end

fib_srv << [10, fib10_port = Ractor::Port.new]
fib_srv << [20, fib20_port = Ractor::Port.new]
fact_srv << [10, fact10_port = Ractor::Port.new]
fact_srv << [20, fact20_port = Ractor::Port.new]

do_some_work()

p fib10_port.receive #=> fib(10)
p fib20_port.receive #=> fib(20)
p fact10_port.receive #=> fact(10)
p fact20_port.receive #=> fact(20)

ほぼ、Channel を使っている例を Port にしただけですね。tag + receive_if である実装とも、ほぼ同じようなものになりますが、違いは「tag の利用を強制している」ということになるかと思います(あと、別の場所で Ractor.receive で捨てられることはない、という安心感かな)。

Ractor.select は複数の Port を同時に待つことができるようにしたので、次のように待てます。

ports = [fib10_port, fib20_port, fact10_port, fact20_port]

while !ports.empty?
  port, result = Ractor.select(*ports)
  case port
  when fib10_port
    p fib10: result
  ...
  else
    raise "This should not happen (BUG)."
  end

  ports.delete(port)
end

この例では、ports に格納された4つの Port のうち、どれかにメッセージが到達した場合、それを receive した結果が返ります。

ちなみに、IO.select は「read などできる」状態の IO を返しますが、Ractor.select は受信まで終わらしてしまいます。Go 言語の select 文は、受け取ってどうする、って処理まで記述します。ちょっとずつ違いますね。

チャンネル/キューとの比較

まず、Port の利点をまとめておきます。

  • 受信メッセージが 必ず 目的の Ractor だけに配送されることを保証し、タグ衝突を防ぎます。
  • 意図しないメッセージ消費が起こり得る Ractor.receive に頼らずに、メッセージの通信路を作ることができます。
  • .receive_if, .yield, #take といった複雑なプリミティブを、よりシンプルで合成しやすい抽象に置き換えます。
  • Ruby が Ractor で目指す アクターモデルのセマンティクスに、自然でわかりやすい形で対応します。

そして、Channel との比較をまとめておきます。

まずは利点。

  • 実践上はチャンネルより安全
    • Port で #send が成功する → 送信先 Ractor がまだ生存・実行中であることを示します。
    • これに対しチャンネルでは、そのチャンネルを受け取れる Ractor が稼働中かどうか保証がありません。
    • もちろん Port でも、受信側がメッセージを必ず処理するわけではなく、無視される可能性はあります。とはいえ「送信先 Ractor がすでに落ちていてメッセージが闇に消える」という失敗ケースは排除できます。つまり Port を使うことで故障要因をひとつ取り除き、通信モデルをより予測しやすくできます。
    • ちなみに、これが Ruby が “CSP” ではなく “Actor” モデル(= Ractor)を採用した理由のひとつです。
  • 生成も送信もチャンネルより高速
    • チャンネルを作る際は送信されたメッセージを格納するためのコンテナ構造の割り当てが必要ですが、Port は「Ractor + 新規 ID」の軽量データだけで済みます(厳密には「開いている」という状態を記録するためにもうちょっと必要)。
    • チャンネル経由でのメッセージの送信では「送信元 → チャンネル」、「チャンネル → 受信側 Ractor」の 2 回コピーが発生しますが、Port なら「送信元 Ractor → 受信側 Ractor」の 1 回だけですみます。今後、Ractor-local GC によるオブジェクト空間の分離が計画されているため、この差はさらに効いてきます。
  • 実装がシンプル
    • 基本的に Port#receive の同期処理だけ実装すれば OK。
      • #send / .receive は「受信側 Ractor をロックするだけ」で済むので容易。
      • .yield / #take はランデブー同期のため送信・受信双方をロックする必要があり厄介。
      • .select は現行仕様では非常に難しく、CI もまだ不安定(二度大きく改修したんですが)。
    • 仕様が簡潔になればバグも減り、実装速度も向上します。多分。

個人的な一番の利点は「実装がシンプルになる」という点です。そもそもアクターモデルを採用する一つの理由がそこではあるのですが、yield/take と、それをサポートする select の実装が本当に本当にしんどくて...。結局最後まで完全にバグが取れませんでした。一部VM-wide のグローバルロックを取って実装すれば、そんな大変でもないのですが、性能のために使わないでやったら本当に難しかった。

そして欠点です。

  • Port という概念はあまり知られておらず、とくに Go ユーザーには馴染みが薄い。
  • 典型的な producer-consumer 型では、Port を土台にもう一段の抽象化が必要になる。

後者については、例えば multiple workers (Ractors)を用いるためには、こんな感じでしょうか。

controller = Ractor::Port.new

workers = (1..WN).times.map do
  Ractor.new controller do |controller|
    while true
      controller << Ractor.current
      port, param = Ractor.receive
      result = task(param)  
      port << [param, result]
    end
  end
end

task_q = Queue.new
result_port = Ractor::Port.new

Thread.new do
  while param = task_q.pop
    worker = controller.receive
    worker.send [result_port, param]
  end
end

task_q << 42
task_q << 43
task_q << 44

3.times do
  result_port.receive
end

この場合、producer は1つの Ractor しか対応していないので、複数の Ractor が worker にタスクを投入できるようにするには、調停役の Ractor を一つ用意してあげるひつようがあります。が、ちょっと面倒になってきたのでこの辺で。

ちなみに、ここで Thread を使っていますが、従来の Ractor API は Thread 未対応でした(何が起こるかわからない系)。今回 Port の導入でコードがすっきりしたので、勢いで対応させました。シンプルにした効果が出てる。

Port の制約は、Go の context みたいなのを Port で素直に作れません。これがどれくらい足枷になるかわかってないので、そこはちょっと気になっています(Channel の close でなんで足りなかったのかわかっていない)。

Ractor の終了待ち

Ractor#take を消すぞ、となると、Ractor の終了を待つ、そしてそのブロックの値を得る方法がなくなってしまいます。そこで、Thread にある Thread#join(終了を待つ)、Thread#value(終了を待って、そのスレッドのブロックの値を返す)の名前を持ってきて、Ractor#join, Ractor#value を用意することにしました。

r = Ractor.new{ 42 }
r.join # 待つ
r.value #=> 42

ちょっと特殊な仕様になっているのが、「Ractor#value はたかだか1つの Ractor しかできない」という点です。すでにある Ractor が #value で値を得ているとき、他の Ractor が #value を実行すると、エラーになります。

r = Ractor.new{42}
p r.value #=> 42, main Ractor が value を実行している
Ractor.new(r){|r| r.value } # エラー

というのも、最後の結果は、他の誰もアクセスできないので、ある1つの Ractor がアクセスする限り安全なので、コピーしないで渡します(unshareable objectを unshareable object のまま受け取れる)。

実は、Ractor#take でも、最後の値については、そのような特殊な仕様になっていました。が、同じメソッドなのに挙動が異なるのは気持ち悪いなぁ、と思っていたので、今回 Ractor#value で整理できたのはよかったです。

Default port と互換性

Ractor は、生成されたときから1つの default port を持っています。Ractor に引数を渡すために使われるのですが、これを Ractor#default_port で取り出せるようになっています。

Ractor に対する Ractor#send, Ractor.receive は、すべて default port への操作になります。

r = Ractor.new do
  Ractor.receive #=> 42
  # これは、Ractor.current.default_port.receive と同じ
  
end

r << 42
# これは、r.default_port.send と意味的に同じ
# Ractor の外からは取り出せなくなるかも

monitor API

join, value を実装するために、低レベルのモニター API というのを導入しています。ちなみに、Erlang 由来。

r = Ractor.new{ 42 }
r.monitor(monitor_port = Ractor::Port.new)

monitor_port.receive #=> :exited

monitor に登録した Port に、終了時に Symbol :exited が渡ってきます。例外などで死んだら :aborted。他のイベントがとれるようになるかは、ちょっとわかんないです(今後用途が出てきたら)。

これを用いることで、#join が簡単に実装できます(実際しています: 当該コード)。また、この API を用いることで、複数の Ractor の生存確認ができます。

monitor_port = Ractor::Port.new

10.times.map do
  Ractor.new{ unlimited_task }.tap do
    r.monitor monitor_port
  end
end

while true
  msg = monitor_port.receive
  do_something # 例えば、代わりの Ractor を追加するとか
end

(あれ、これだと :exited だけもらっても、どの Ractor が死んだかわからないですね。送信元 Ractor の情報も要りそうだな。どういう API がいいかなぁ。monitor Port にもうちょっと情報を渡すか、from がわかる receive を作るか。どっちがいいかな)

この辺がしたくて5年前に Ractor#take を設計したんですが、うーん、うまくいかないものですね。

close

Port は close することができます。

Ractor.new port = Ractor::Port.new do
  loop{ 
    port << 42 # closed な port に送ろうとして、いつか ClosedError になる
  }
end
  
port.close

現状、close できるのは、Port を作成した Ractor だけになります。誰でも close できるようにするのはちょっと面倒(待っている人をたたき起こす必要がある)なので、明確に必須であるとわかるまではそのままにしようと思います。

検討した他の案

Channel 的なものについての検討はすでに書いた通りです。

Port はタグ付き send を強制するものである、ということから、Ractor#send(tag, ...) とする、という案もありました。ただ、tag の名前が被っちゃう問題どうしようかなぁ、と思って、どうせなら OO らしくオブジェクトにしちゃえ、ということで Port にしました。

Port の名前はあんまり考えないで付けたんですが、あまり他の選択肢も思い浮かばず。他の言語に似たような仕組みがないか調べたのですが、意外と見つかりませんでした。TCP の port 番号くらい。

今後の予定

すでに Port は master branch に導入されているので、お試しいただけます。 皆さんの忌憚ないご意見を頂けると幸いです。

いくつか、積み残しの部分があります。

Ractor::Port.new が長い

Ractor.port とかどうだろうと思ってるんですが、どうですかね。IO.pipe みたいなノリ。あんまり作る感じがしないんだけど。ダメかな。

close の挙動

前提として、Port は unshareable object、つまり、他の Ractor に渡すときにコピーを作ります。

で、作っていて気づいたのですが、他の Ractor に渡した Port があろうとも、自 Ractor 内の Port がすべて GC された場合、その Port を読む人が居なくなるので、それは close するべきなんですよね。

port = Ractor::Port.new
Ractor.new port do |port|
  10times{ port << it } # 10 messages 送る
end

port.receive #=> 0 が返る。残り9個のメッセージは受け取らない
port = nil # もう受け取る手段はなくなる
...
# プログラムを続けるが、残り 9 個のメッセージは残り続ける

こういうケースでは、(自 Ractor の)Portが GCされた時点で close するとよいかなと思っています。そうすることで、すでに送られたメッセージを開放できる(誰も見ないしね)、それから他の Ractor が無駄に送信するのを防ぐことができる、と思っています。

この辺は Port に参照カウントを持たせて作れるかなぁと考えています。

あと、途中に書きましたが、他の Ractor が Port を close していいのか、という話。もう送らないよ、ということを示すために良いような、そうでもないような。まだこれから仕様が動きそうです。

まとめ

Ractor::Port を導入し、Ractor API を刷新しました。これにより、Ractor 間の通信が整理され、より利用しやすくなるかと思います。 よかったら遊んでみてください。

今度こそ、Ractor から experimental warning が外れるといいね。