STORES Product Blog

こだわりを持ったお商売を支える「STORES」のテクノロジー部門のメンバーによるブログです。

複数プロダクトの成長を支える STORES のデータ基盤

こんにちは!
STORES でデータエンジニアとして、データ基盤の開発運用を担当している@ssxotaです。
STORES には2024年の1月に入社しました。

前職ではデータレイクの立ち上げや、データを利用したプロダクトの開発には携わってきましたが、 本格的なデータ基盤を扱うのは STORES に来てからが初めてでした。

チームのメンバーに助けて貰いながら業務に取り組んでいたら、光陰矢の如し!あっという間に8ヶ月が経っていました。

今回は私が入社してからの間でキャッチアップした STORES のデータ基盤の全体像を説明したうえで、今後の展望を紹介し、STORES のデータ基盤の今をお伝え出来たらと思います。

データ基盤の構成

こちらが現在のデータ基盤の概略図です。

STORES のデータ基盤では、STORES が提供するプロダクト、プロダクトを支える技術基盤、業務用ツール、SpreadsheetやGoogle Analyticsといった 様々なデータソースからのデータを管理し、データ分析、重要指標のモニタリング、業務オペレーション、顧客向けの分析機能へのデータ提供等、多岐に渡るデータ利用の要望に応える仕組みを提供しています。

以前のデータ基盤に関しては、こちらの記事で紹介があるのですが product.st.inc

  • データウェアハウスにはBigQueryを採用し、利用ユーザーのロールに応じて、IAMで各データセット毎のアクセス権限を管理
  • データソースからのデータ抽出、読み込み、変換(いわゆるELT処理)やBIツールのホスティング環境として、GKE Autopilotを採用
  • 社内向けのBIとしてはMetabaseをGKE上にホスティングし、IAPでの認証を行い、社内ユーザーに限定して公開
  • Google Apps Script(GAS)を利用したSpreadsheetのデータ転送

といった点は基本的な構成として変わりませんが、上記の記事時点からの変化点や、説明の無い項目に関しても紹介します。

DigdagからArgo Workflowsへの完全移行

ELT処理の管理の為のワークフローエンジンとして、Argo Workflowsを採用しています。 元々はDigdagを利用していましたが、2023年にDigdag利用部分は全てArgo Workflowsに移行が完了しています。

product.st.inc

kubernetesネイティブでpod単位で動作し、GKEのAutoPilotと組み合わせることで、オートスケールしてくれるので、 あまりリソースの調整をしないで良いのは楽で助かっています。
また、workflowの制御のオプションが豊富で、色々なユースケースに対応出来るのも魅力な気がします。

個人的には前職でArgoCDを使っていたので、宇宙人?のアイコンにまた出会って少し懐かしい気持ちになりました。

 データソースの引き込み処理

データソースとして主にプロダクトデータと業務用ツールに分類されますが、 それぞれ以下のような構成でデータソースの引き込みを行っています。

プロダクトデータの引き込み

STORES が展開するプロダクトとして、これまでのネットショップ/レジ/決済/予約に加えて2022年7月にはブランドアプリもリリースされ、 データ基盤に取り込むプロダクトデータもどんどんと増加しています。

AWSを採用しているプロダクトからの引き込みに関しては、プロダクト側でembulkをECS上で実行しS3にデータを抽出する構成がこれまでは主でしたが、データ抽出の際のワークアラウンドが複雑となり管理面での課題がありました。

RDSを利用しているプロダクトに関しては、下記の図のように、SnapshotをS3にExportし、GCSにCopy、BigQueryにLoadする構成への移行を進め、引き込み処理の共通化を図っています。

一部、ECSでの取り込みが残っていますが、これらもDBからのExport機能によるデータ抽出への置き換えを進める予定で考えています。

また、STORES ではEvent RepositoryというAPIの非同期イベント送受信機能が運用されており、 各プロダクト側のClientから送信したEventを、Server側で非同期にWebhookで受信することで、 プロダクト間のデータ連携に利用されています。
送信されたEventはGoogle CloudのPub/SubにPushされているため、データ基盤への引き込みにおいては、 発生したeventをPub/Subから受信することで、BigQueryへの引込を実現しています。
データの更新頻度としては、プロダクト側から直接引き込むデータよりも、高い頻度で更新されるため、 リアルタイム性の求められるデータ利用に適したデータといえます。

STORES の各プロダクトを共通に利用管理する仕組みである、ID基盤、組織管理基盤のデータも引き込み、 プロダクトを横断した分析に利用されています。
組織管理基盤に関しては、Google Cloudを採択していますが、プロダクト側のBigQueryでCloudSQLのリードレプリカに対してへ外部接続を作成し、 データ基盤に外部接続を共有したうえで、データ基盤側のプロジェクトから連携クエリ関数を発行することで、データ連携を行っています。

業務用ツール

Salesforce、Kintoneといったセールス組織の利用するCRMツール、カスタマーサクセスにおける顧客からの問い合わせ管理ツール, メルマガ配信用ツール、請求書管理ツール等の業務用ツール上のデータもデータ基盤に引き込み、セールス状況のモニタリングや業務オペレーションで利用されています。

各ツールからの引き込み処理には、ツールの提供するAPIに対して、データ取得を行うスクリプトを、自前で実装して引き込みを実現しています。
スクリプトの言語は開発者の得意な言語が採用されることが多いのですが、現在の使用言語はPython/Ruby/ShellScriptがよく使われています。

dbt Coreの採用

データソースからのデータ引き込み後の変換処理の管理ツールとしては、dbt Coreを採用しています。 dbt Coreの導入の背景に関しては、同じチームのアナリティクスエンジニア @k-0120さんが、こちらのFindy様のレビュー記事で紹介しておりますので、是非ご覧下さい。 findy-tools.io

データ変換処理のdbt移行

.sqlファイル上にCREATE OR REPLACE TABLEを記載し実行したり、embluk内でデータ引き込みのタイミングで実行したりと、 引き込み対象のプロダクト毎に手法が異なっていたデータ変換処理を全てdbtに移管しています。

BigQueryでの並列処理の恩恵が受けれること、また、dbtの管理下に変換処理が置かれることで、データソースからリネージの把握や、 テスト実施の容易さ等、データ品質の管理面でも利点があります。

データモデリングの整備

データ変換処理においては、staging層、preparation層、warehouse層、mart層ごとにデータ階層を分け、 整備ルールに沿って実装を進めることで、今後の新しいデータの追加や、変更に対して頑健なデータ構造を目指しています。

incrementalモデルによる差分更新

データ基盤上のデータの大半は、障害復旧時の容易さ等の観点で、全データを洗い替えしているのですが、 Google Analytics等のデータサイズの大きいデータソースに関しては、全データを洗い替えすると、加工時のコストが掛かります。 このようなデータに対しては、dbtのincrementalモデルを用いて増分更新を行っています。 incrementalモデルを用いることで、dbt共通の記法で、容易に差分更新が実現でき、加工処理のコスト削減に寄与しています。

schema定義の整備

dbt導入以前はesaに各データの説明が記載されており、データ変換処理の実装とschemaに対する説明が別々に管理され、メンテナンスコストが高い状況でした。 dbt採用にあたって、データの説明はschemaファイルに記載する方針に移行し、変換処理ととscheamの説明をdbt上で管理することで、schemaの説明文に対するメンテナンスコストの低減を図りました。

作成したschemaファイルからdbtが自動生成していくれるdbt DocsをCloud Run上にホスティングし、データカタログとしても利用しています。 また、個人情報が含まれるデータはIAMでのデータセット単位でのアクセス制限をかけていますが、より柔軟なカラム単位での個人情報の管理のため、 BigQueryの機能であるpolicy tagを利用して、schemaファイル上の各カラムに対してpolicy tagを設定することで、 読み取り権限の無いユーザーに対しては秘匿化された状態で個人情報が表示されるように管理しています。

モニタリング環境

データ基盤上のモニタリング環境としては、Cloud Monitoringのアラート機能を使って

  • scheduled queryの失敗検知
  • BigQueryへの社外アクセスの検知
  • GKEのpodの実行時間の監視

等の監視を行っています。

Argo Workflows上でのワークフローの失敗に関しては、exit-handlerを用いて、 ワークフローが失敗した際にslack通知するように実装しています。

BigQueryの料金プランとしては、BigQuery Editionsを採用し、AutoScaler機能により、利用可能なSlotの上限値を設定して利用していますが、 BIツールから必要以上に実行時間がかかるアドホックなクエリが発行された場合に、Slot上限まで消費してしまい、他のクエリが実行できない状況も発生してしまいます。

そのような状況を避けるため、BigQueryのscheduled queryで、INFORMATION_SCHEMAから一定時間以上実行時間のかかっているjobを抽出、キャンセルする仕組みも実装しています。

データの利用範囲の拡大

データの利用用途としては、これまでのBIツールやスプレッドシートでのデータ分析や重要指標のモニタリングに加えて、 以下のような用途での活用事例も増えています。

リバースETL処理/Slack通知

データ基盤上に引き込んだデータを変換処理し、再度業務ツールや分析ツールにデータを挿入するリバースETLの事例も最近は多いです。

以下は実際の社内での利用事例の例ですが

  • プロダクト側のデータベースに新規追加された顧客情報をCRMツールに挿入し顧客対応を行いたい
  • 組織管理基盤上の存在する顧客の属性情報を、カスタマーサポート用にツールに反映させ、属性情報に応じた顧客対応を実施したい
  • 購入履歴のデータ内に含まれる不正な購入履歴を機会学習ベースのモデルで推論し、リスクを検知したい

このように一度データ基盤に引き込んだデータを、データ基盤上のその他のデータと紐づけたり、機械学習を用いた高度な変換処理をしたうえで、 業務ツールに再度反映するためのワークフローがデータ基盤上では動いています。

また、データ基盤上で集計した事業指標のデータをslackに送信し、定期的にレポートする処理も動作しています。

これらのリバースETLやSlack通知の処理は、dbtでの変換処理後に実行される処理になりますが、 現在、dbtでの変換処理は一つのdbtプロジェクト内で実行され、データ基盤内の全てのデータ変換処理を一つのワークフローで担っているため、 変換処理やデータソース側のschemaに変更が発生した場合に、処理が失敗するケースも少なくありません。

以前は、変換処理の失敗後は、後続の処理が実行されず、手動での再実行を行う必要があり運用負荷が高い状況でしたが、 リバースETL/Slack通知処理が利用するデータの更新が成功したかのチェック処理を入れたうえで、変換処理全体の成否によらず実行されるように変更を行い、 運用負荷を下げる工夫をしています。

顧客向けデータ分析機能へのデータ提供

2023年9月に STORES ネットショップ と STORES レジ、STORES 予約 の合わせた情報を横断で把握することができる分析ダッシュボード「データ分析β」 が対象プロダクトの機能としてβ版としてリリースされました。

このデータ分析βで利用されるデータとして、データ基盤上の集約した各プロダクトのデータを集約、変換処理し、専用のデータマートを生成しています。 顧客に提供されるデータでもあることから、dbtを用いたテストで重点的に品質を担保したうえで、データ提供を行っています。

今後の展望

これまでのデータ基盤メンバーの開発によって、沢山の課題は解決されてきましたが、やはりまだまだ課題が有ります。

入社してからデータ基盤の事を知っていく中で、色々と課題があることも段々と分かってきたが、正直どこから進めてよいのか全然分からない...と悩みもしましたが、 チームメンバーとも話をしながら、ロードマップを作成し、段階的に課題を解決できるように計画化を進めました。

直近のロードマップにおいては、データ基盤の利用者視点での課題ももちろんあるのですが、 まずは足元のデータ基盤内部の管理体制を強化することに軸足を置いて、実施項目を選定しています。

データ提供先の管理

BigQuery内でのデータのリネージに関しては、dbtを採用したことで観測性が向上しましたが、 BigQueryより先で提供されているSpreadsheetやBIツールに対して、どのデータがどこに接続されているかの一覧化が出来ていません。

社外へのレポーティングに利用されている重要度の高いデータに関しては、ドキュメント化されているものの、 網羅的にデータの提供先を把握出来ておらず、不要なデータ提供が残ったままになっている可能性もあります。

障害発生時の影響調査にも都度時間が掛かる状況であるため、データ提供先を一覧化管理する仕組みの構築は進めて行きたいです。

SpreadsheetやBI上にwarehouse層から参照され、コード管理されない状態で複雑なクエリが動いている状況もあり、 提供先の一覧化を進めることで、warehouse層への参照も把握でき、適宜mart化する等の対応も取れると考えています。

データモデリングの洗練

データモデリングの階層分けのルールを整備し、一部データセットには適用出来ているものの、 sourceから直接warehouseを参照しているようなデータセットも多く、データ基盤上のデータ全体に対してモデリングルールを適用出来ていません。

データの再利用性を高めるうえでも、重点的に取り組み整備していく必要があると感じています。

また、mart層に構築にあたっては、直近ではディメンショナルモデリングでのモデリングも検討を進めており、 fact、dimensionにデータ構造を分けることで、より効率的にmart作成が出来るよう検証を行っています。

データ品質の向上

データ基盤の利用者に信頼して貰えるデータを提供するうえで、データ品質を担保するための仕組みづくりも重要です。

データへのテスト

データに対するテストは大きく分けてデータ自体が定められた制約を満たすかどうかのテストと、データを生み出すロジックに対するテストに分かれますが、 STORES のデータ基盤におけるテストの取り組みとしては、データ自体に対するテストとしては、dbtのgeneric testを一部データには適用しているものの、 データ基盤で扱うデータ全体に対してのカバレッジが高いとは決して言えません。

ロジックに対するテストに関しても、dbtでは、unit testもリリースされ、 ロジックに対するテストもdbt単体で実施可能となりましたが、まだ本番環境には適用できていない状況です。

また、変換処理を本番環境に適用後にデータの不整合によりテストで失敗するケースも有るため、検証環境を上手く使って 本番環境適用前に事前に失敗を検知出来ないかも、議論を進めています。

テストに関しては、全てのデータに対してテストを適用するのは現実的では無いので、 どのようなデータに対して、どのような検証を実施すべきかの方針の整理から、改めて進めて行く必要があるとは感じています。

データパイプラインのモニタリング体制の強化

データパイプラインのモニタリング環境もより充実させて行きたいです。

現状、Argo Workflows上の実行時間や処理の成否の把握に関しても、定常的にモニタリングしやすい仕組みが構築できておらず、 アドホックな対応をしている部分が残っています。

dbt上の変換処理においても、dbt elementaryを導入はしたものの、 本格的な運用は開始出来ていないため、継続的なモニタリング環境として利用できるよう、整備を進めて行く予定です。

リアルタイムでのデータ連携

現在のデータ基盤のデータ更新の頻度は日次のデータが殆どですが、社内オペレーションや顧客向けの分析機能等においては、 より高い頻度でのデータ更新が求められるユースケースもあり、更新頻度を向上させる仕組みの構築も必要です。

さいごに

このブログを書くにあたって、STORES のデータ基盤の開発に携わる魅力は何か?について改めて考えてみました。

データの利用用途の多様さは、一つ挙げられると思います。
日々データチームに入ってくる様々な依頼のissueに対応したり、データアナリストが受け持っている案件の話を聞いていると、 まだまだこんなデータの使われ方があるのか...!と驚くことが沢山あります。 各プロダクト間の連携が加速していくなかで、今後より複雑なユースケースも発生することも予想され、 データ基盤側の視点としても、STORES は面白いフェーズにあると思います。

このような多様なデータ利用の要望に応えるための仕組みを考えていくことは、難しい部分はありつつも、 エンジニアとして非常にやりがいのある仕事だなと感じています。

今回はデータ基盤の全体感について紹介させて貰ったので、個々の内容に関しては概要レベルに留まっているものも多いです。 興味を持って頂いた方や、より詳しい内容を知りたい方は、是非ともカジュアル面談でお話できれば嬉しいです!

hrmos.co