STORES Product Blog

こだわりを持ったお商売を支える「STORES」のテクノロジー部門のメンバーによるブログです。

サービス間でのpub/sub通信と一貫性

STORES でサーバーサイドエンジニアをしている片桐と申します。

STORES は元々別々の会社であったプロダクトが集まってできた会社です。各プロダクトのサービスは元々それぞれ独立したアプリケーションとして動いており、STORES プラットフォームとしてこれらのサービスを繋げていくには、さまざまなサービス間通信が求められるようになってきています。今回は、サービス間通信のうち、pub/sub の、特にpublishの実装についてお話しさせていただきます。

pub/subとは

pub/subとは、サービス間で情報をやり取りする手法の一形態で、publish / subscribe の略です。publishは、特定のサービスでデータの変更があった場合に、そのデータを全体に送信(publish)することを指します。subscribeは、特定の種別のデータを購読(subscribe)し、そのデータがやってきたことをフックに処理を行うことを指します。特定のサービス対サービスで通信を行うわけではないので、通信を媒介するハブとなるものが必要になります。代表的な実装としては、AWSSNS/SQSを組み合わせたものや、Google CloudのPub/Subがあります。pub/subでやり取りされるデータのことを指す名称は複数ありますが、この記事では「イベント」と呼ばせていただきます。

サービス間での一貫性の確保

サービス間通信を行う場合は、単一のサービスの場合には考えなくてよかったことを考える必要があります。特に大きいものが「一貫性の確保」です。

RDBを使用するサービスの場合、多くのケースにおいてtransactionを使用することで一貫性の確保が可能です。しかし、サービス間通信を行う場合、それぞれのサービスが独立したデータストアを背景に持つため、RDBのtransactionを利用できません。

「結果整合」という考え方

複数のアプリケーションとその背後にあるデータベースを、あるタイミングで参照したときに、そこから得られるデータに整合性があることを保証するのは非常に難しく、また可用性とのトレードオフであることも知られています (CAP定理)。そのため複数のサービスで一貫性を確保する方法として、弊社のシステムでは「結果整合」というモデルを基本的に採用にしています。

「結果整合」とは、通信の遅延や一時的なエラーなどで一貫性の無いタイミングはあるものの、最終的には一貫性が確保されるようにする、というようなモデルになります。

処理が単一のサービスで完結している場合、一貫性が必要な処理をtransactionで囲むだけで、処理に失敗した場合の対応は概ねできていました。しかし複数サービスで連携して処理を行なっている場合には、一貫性を満たすために注意深く実装する必要があります。

publishの重複制御について考える

単純に考えると、ある変更に対してpublishは1回のみ行うのが適切なように思えます。実際、1つの変更に対して、subscribeで複数回処理が行われることは不整合につながります。しかし、厳密にpublishを1回のみ行う実装は、複雑度はもちろんですが、パフォーマンスの観点からも懸念事項があります。

publishの通信が失敗した場合を考えてみます。通信が失敗したのであれば、「publishは失敗」であるように思えます。しかし、「通信が相手に届き、相手が正常に処理を終え、成功の応答を返した。しかし、送信元に応答が届くまでの経路でエラーが起きた」というケースではどうでしょうか。この場合、「publishは成功」しています。つまり「通信に失敗した」は「publishが失敗した」ではなく「publishの成否が不明」ということになります。

publishしたことを無かったことにするとしても、リトライしてpublishを完遂させるにしても、「publishの成否が不明」である以上、publish先にその成否を問い合わせる必要が出てきます。このためには最低1回以上通信を挟むことになるためパフォーマンスが低下しますし、publish先のサーバーの負荷も高まってしまいます。また、問い合わせから次の処理までの間に状態が変わってしまうことが考えられるので、これを避けるためにはロックを取るなどの仕組みが求められます。これは実装の複雑性を大きく上げる上に、並列処理が制限されることによるパフォーマンスの劣化も引き起こします。

冪等なsubscribe処理

1つの変更に対して複数回処理が行われることを制御するには、publish側で重複を制御するのではなく、subscribe側で制御をするということも考えられます。

subscribe側で重複が制御されている場合、そのsubscribeロジックは同じイベントを引数にして再度実行しても、結果は同じになります。このように「ある処理を1回だけ実行しても、複数回実行して結果が同じになる」ことを「冪等」と呼びます。

RDBを利用したシステムでは、処理を冪等にする場合には楽観ロック(optimistic lock)などの仕組みが利用できます。これを利用すると、他のプロセスから同時に変更が行われた場合の処理が即座に失敗するので、通常のロックを取得する場合と比べパフォーマンスへの影響が軽微になります。また、subscribe処理は失敗してもユーザーへ即座に影響が出ないので、失敗時の対処も単純なリトライで対応できます。

処理には失敗がつきものです。例えpublishが厳密に1回しか行われないとしても、subscribe処理が失敗した場合はsubscribe処理をリトライする必要が出てきます。そのため、いずれにしてもsubscribe側の処理には重複の制御が必要です。1

「結果整合」を達成できるpublishを達成する要件

ここまでのpub/subの重複制御に求められることを整理すると以下のようになります。

  • 1つの変更に対してsubscribe処理が複数回実行されても、処理内容が重複しない
  • subscribe処理の重複制御(冪等化)は比較的シンプルに、パフォーマンスの劣化を抑えて実装できる。
  • publish処理の重複制御は非常に複雑になり、パフォーマンスの劣化も伴う。
  • publishの処理の重複を厳密に制御したとしても、subscribe処理の失敗を考慮すると、いずれにしてもsubscribe側で重複制御は必要。

これらのことから、「publishの重複制御」は実装しなくても良いことがわかります。それを踏まえて、「結果整合」を達成するためのpublishの要件は以下のように整理できます。

  • 自身のデータが変更された場合、publishが確実に1回以上行われる。
  • 自身のデータが変更されなかった場合、通信が1度も行われない。

「結果整合」を達成できるpublishの実装を考える

要件が整理できたので、次に実装を考えていきます。まずは、シンプルにpublishする処理だけを書いてみます。

some_record.save!
publish_client.publish!(some_record.publishing_data)

当然ですが、このままではpublishが失敗した場合、「データは変更されたのに、publishはされていない」という状態になります。つまり、「自身のデータが変更された場合、publishが確実に1回以上行われる」を満たせていません。

次に、ちょっと捻って、transactionと組み合わせてみます。

some_record.transaction do
  some_record.save!
  publish_client.publish!(some_record.publishing_data)
end

こうすることで、publishに失敗した場合、データの変更も合わせてrollbackされるので、「データが変更されたのに、publishはされていない」ということは起こらなくなったように見えます。

しかし、commitに失敗した場合、今までとは逆に「データは変更されなかったのに、publishはされている」という状態になってしまいます。これは「自身のデータが変更されていないのに、通信を行わないこと」を満たせていません。

加えて、先ほどお話しした「通信には失敗したがpublishには成功している」というケースにも対応できていません。上記の実装では、データは変更されていない(rollbackされた)にもかかわらず、通信されてしまった状態になってしまいます。

これらのことから「データを変更」->「publish」->「publishの成否によってcommit/rollback」というフロー適用するのは不適切なことがわかります。

transactionを使用しない方法で考えてみる

これを踏まえて、transaction以外の方法で一貫性を確保する方法を模索してみましょう。SomeRecordに「データが変更された」ことを記録するフラグを持たせてみます。

some_record.publishing_required = true
some_record.save!

そして、定期的に実行されるバッチプロセスを用意します。

SomeRecord.where(publishing_required: true).find_each do |some_record|
  publish_client.publish!(some_record.publishing_event)
  some_record.update!(publishing_required: false)
rescue => e
  # 次回のバッチ実行時にリトライされるので、例外のロギングのみを行い処理は継続する。
  Rails.logger.error(e)
end

publish処理の必要性をDBに記録することで、publishが遅れることはあっても、最終的にはpublishが行われる、という状態を実現できます (これがまさに「結果整合」です)。

逆に、データの更新に失敗した場合はフラグがtrueにならないので、publishが行われてしまうこともありません。

最終的な実装

上記の実装でかなり良いところまで行っていますが、この実装だと細かい部分でいくつか課題があります。

  • 同じレコードに対して短時間に複数回の変更が行われた場合、最後の変更についてのイベントしかpublishされない可能性がある2
  • 同じような実装がさまざまな箇所で必要になる。

これらを解消し、ブラッシュアップ実装がこちらになります。

テーブル定義
create_table "publishing_events" do |t|
  t.text "data", null: false
  t.datetime "issued_at", null: false
  t.timestamps
end
更新処理
publishing_event = some_record.transaction do
  some_record.save!
  # publishする予定のデータを共通のテーブルに記録
  PublishingEvent.create!(data: some_record.publishing_data, issued_at: Time.current)
end
バッチ
PublishingEvent.find_each do |publishing_event|
  publish_client.publish!(publishing_event.data)
  publishing_event.destroy!
rescue => e
  # 次回のバッチ実行時にリトライされるので、例外のロギングのみを行い処理は継続する。
  Rails.logger.error(e)
end

publishかどうかをフラグで管理するのではなく、別のテーブルにpublishしたいデータを記録するようにしました。

このパターンであれば、すべてのpublish処理を1つのテーブル・ロジックで賄うことができます。また、フラグと違って同じレコードの更新が複数回行われた場合も、それぞれ個別のレコードとして記録されるので、すべての変更に対してpublishを行うことができます。

おわりに

今回はpub/subpublishの実装について紹介させていただきました。publishについては、今のところ上記の実装でほとんどのユースケースを汎用的にカバーできています。

実際の実装においては、ActiveJobを導入してリトライのリアルタイム性を上げたり、ActiveRecordの機能を利用してより簡潔にかけるようにするなど、さらに洗練したものとなっています。しかし、ベースとなっている概念は今回ご紹介させていただいたとおりです。

また、実装例としてrubyのコードを使用しましたが、他の言語でも同様の実装が可能です。実際、社内には別の言語で実装されたプロダクトがありますが、同様の概念で実装がなされています。

プロダクト間の連携には、今回取り上げたこと以外にも考慮事項が多く、難易度の高い問題です。しかしそれだけ面白みもあり、スキルアップにも繋がる課題だと思っています。このブログを読んで弊社に興味を持ってくだされう方が一人でも増えてくださると嬉しいです。


  1. subscribe処理の実装にあたって考慮すべき点は他にもありますが、publishの実装方針の検討には関係ない部分になるため詳細には踏み込みません。
  2. 弊社のユースケースだと問題無いケースも多いのですが、汎用的にしようすることを考えるとあまり好ましい挙動ではありません。