先月11月中にCourseraで「Data Engineering on Google Cloud Platform 日本語版 」を受講したので、復習を兼ねて自分でもティックデータをBigQueryに格納する仕組みを作ってみました。自宅サーバを利用して株や先物、為替のデータを収集していますが、今回はGCPを使って実現していきます。

全体像

今回作っていくデータの流れです。

(A) ストリーミングAPIクライアント
(B) Cloud Pub/Sub
(C) Cloud Functions
(D) BigQuery

各ポイントの概要

(A) ストリーミングAPIクライアント

プログラムとしては、oandapyV20というREST APIのV20用のクライアントライブラリを利用してストリーミングAPIにアクセスしています。Tickデータをdict型で取得できるので、これをJSON形式に変換し、GoogleのCloud Pub/Subのクライアントライブラリを使ってCloud Pub/SubにPublishしています。

このプログラムをGoogle Compute Engine上に無料枠対象になるUSリージョン、マシンタイプはmicroのインスタンスを立ち上げて、そのインスタンス上のDockerコンテナで動かしています。

(B) Cloud Pub/Sub

Pub/Sub自体はパブリッシャー(公開側)が特定のトピックに対して公開したデータをそのトピックを購読しているサブスクライバー(購読側)に配信する仕組みです。データを送信側と受信側を疎結合にできることがメリットかなと思います。今回のケースでは、Pub/Subを挟むことのメリットは以下の2点にあると考えています。

Pub/Subを経由させる理由

  • Big Queryへの書込が非同期になるため、(A)の処理時間が短縮できる
  • データ配信先をあとから容易に追加できる

Pub/Subがなくても(A)ストリーミングAPIクライアントの中でCloud Functionsを呼び出せば1点目は実現できるかもしれませんが、Pub/Subを挟むことで(A)や(C)に手を加えることなく、配信先を追加できます。

(C) Cloud Functions

Pub/Subにデータが公開されたことをトリガーにBig QueryへのINSERT処理を実行するように作ってあります。

当初はサブスクライバーとして、Apache BeamのサービスであるCloud DataFlowを利用していました。DataFlowにはPub/SubからBigQueryへデータをINSERTするテンプレートが用意されており、コーディングなしで実現できますが、現状はデータ加工もなくBigQueryへのINSERTのみのため、多機能なDataFlowは不要だと判断しました。

DataFlowからFunctionsに切り替えた理由

  • Pub/SubからPullでデータを取得するため、処理開始までタイムラグがある
    ※Cloud FunctionsはPushで起動される
  • クラスタを構成するGCEインスタンスの維持費がかかる
    ※Cloud Functionsは呼出回数と実際に利用したリソースのみ
  • OANDAのタイムスタンプとBig QueryへのINSERTまで約7秒のタイムラグがある
    ※Cloud Functionsは約4秒

Cloud Functionsの改善

Cloud FunctionsでBigQueryへのINSERTを行う際は、client = bigquery.Client()を実行する関数の外に出して、呼出のたびに作らないようにすることで処理時間とメモリ割当容量を削減できました。

稼働後、10通貨ペア約190万件のTickデータをINSERTしていますが、発生したエラーは3件です。エラー内容は、BigQueryのREST API呼び出しでHTTPコネクションが切れたという内容でした。Cloud Functionsの設定で「失敗時に再試行する」をONにしたあとは発生していません。

(D) BigQuery

Cloud FunctionsでほぼリアルタイムにINSERTされていき、いわゆるデータレイクにあたるテーブルができました。データがここまで入ってくれば、SQLで操作できます。DataStudioでダッシュボードを作るも良し、Datalabを使ってPython、pandasで操作するも良し、CSVにExportだってできます。

データの活用状況

現状は、BigQueryのデータをDatalab(jupyter notebook)を使って加工・分析し、予測モデルの構築を試行錯誤しています。うまく予測できるモデルができたら、実際の取引に役立てて行きたいと考えています。

目視での確認では、OANDA側でセットされたタイムスタンプからBigQueryで確認できるようになるまで約4秒かかっています。分析する分には充分なスピードですが、実際の取引に使うのであれば、もう少しタイムラグを小さくする方法を考える必要があります。

感じたこと

Courseraの復習も兼ねた簡単なものでしたが、呆気なくほぼリアルタイム(目視では4秒)の分析基盤が構築できてしまいました。難易度は高くないですが、学習したことを活かして、自分が使うものを自分で作れたのは良かったと思います。

今回作った為替レートと同じようなデータ取得プログラムが自宅サーバで動いているのですが、電気代、メンテナンスコスト、火災リスクなどの維持コスト・リスクを考えると自宅サーバから、少しずつGCPに移行していくのもありだな、と思うようになりました。少なくとも常時稼働するような仕組みは、全面的にGCPでも良いかもしれません。個人プロジェクトとして検討を続けたいと思います。

実務では、アプリのログやセンサーデータの収集に活用する機会が多いと思いますので、Fluentd、Fluent Bitとの連携も試してみたいと思います。

Twitter