こんにちは、石橋です。FLINTERSで開発者をしています。 本記事はFLINTERSブログ祭りの一環で、Cloud Composer(Airflow)1の代わりにCloud Workflows2でETLフローを作成した話です。
Cloud Workflowsなんぞや?については、公式がCloud Composerとの比較を行っていますので3、そちらが参考になるかと思います。
私の所属するチームでは広告データのETLフロー管理にCloud Composerを使っていますが、近年の円安も相まってコストに対する課題感が高まっています。 弊チームだとCloud Composer1環境あたり~8万/月、3環境で年額300万!近くになります。
そのような状況もあり、ちょうど始まった新規プロダクトの技術選定において、Cloud Workflowsを採用しました。 サーバーレスかつStep課金のCloud Workflowsでコスト削減4が期待されます。
ちなみに弊チームのCloud Composerは以下のような使われ方をしています。
- データ基盤はGoogle Cloud
- BigQuery(以下BQ)
- Cloud Storage(以下GCS)
- Cloud SQL
- バッチ処理の実行基盤もGoogle Cloud
- Batch
- Cloud Run Job
- BQへのSQLクエリ発行がAirflowのTaskとして実装されている
- 日次実行DAGが主
- DAGに依存関係あり(ExternalTaskSensor5)
- Jinja Template多用(環境依存変数の埋め込み、SQLクエリ内に複雑な条件分岐)
- Pythonで実装されたロジックも多少はある(日付計算、GCSの掃除やobjectの統合)
言ってしまえばGoogle Cloudの内部APIを適切な順序で呼び出しているだけなので、同じ要領でCloud Workflowsを使えるように見えます。 内部APIの利用が多い点はCloud Workflowsの費用計算的にも有利です。
また一方で、Workflowの定期実行や依存関係の構築、SQLクエリやテンプレートの取り扱いには工夫が必要になります。
Cloud Workflowsには日付や文字列処理の組み込み関数がいくつかありますが6、Pythonで実装されたロジックのうち複雑なものはCloud Functions等に逃がす必要があります。
以上を踏まえてCloud WorkflowsでETLフローを実装しました。
依存関係のあるWorkflowを定期実行する
実際に作成したWorkflowの構成です。
Workflowの実行方法は公式ドキュメントで詳しく解説されています7。今回は定期実行にCloud Schedulerを用い、またExternalTaskSensorの置き換えとして、Pub/SubとEventarcを用いて親Workflowが完了したらそれに依存する子Workflowが開始される構成にしました。
Workflowと関連リソース管理にはTerraformを使っています。Workflowとそのトリガー、完了時にpublishするPub/Sub TopicをまとめてTerraformのモジュール単位としています。これを連結してWorkflowの依存関係を構築しています。 WorkflowがPub/Sub Topicをpublishするかは実行時引数で指定できるので、特定のWorkflowだけを再実行することも可能です。
Terraformを使うとWorkflowの依存関係も簡単に管理できますが、Web UI上では可視化されないため、実際の運用は認知と記憶に頼るところが大きくなりそうです。
また、プロダクトのインフラ全般を管理するTerraformとの棲み分けも課題です。現状は以下のようにCloud Workflowsまわりのリソース管理を分担しています。
Cloud Workflows用Terraform | インフラ全般Terraform | Terraform管理外 |
---|---|---|
Cloud Workflows、 Cloud Scheduler、 Pub/Sub、 Eventarc | 各種サービスアカウント、 各種データ基盤、各種Cloud Runサービス (要するにCloud Workflows用Terraform以外のすべて) |
Cloud Functions |
Cloud Workflows関連のサービスアカウントがインフラ全般Terraformの管理下にあるため、サービスアカウントに付与される権限の一部がプロジェクト単位の過大なものになってしまいます。ここはうまく解決したいところです。
Workflow YAMLやSQLクエリへの変数埋め込み
環境依存変数のWorkflow YAMLやSQLクエリへの埋め込みも、Terraformのテンプレート機能を使って解決しました。 SQLクエリ文字列については、テンプレートレンダリング後に改行を除去し、これをさらにWorkflowのYAMLに埋め込む方法を取っています。
ただこの方法だと、クエリ内でうっかり1行コメントを使ったときに改行除去によって危険なクエリに化けてしまう可能性があります。 SQLクエリはテンプレートレンダリング後にGCSに配置し、WorkflowからはGETして使うように変更する予定です。
実際にWorkflowを実装していると複数箇所で使いまわしたい処理がでてきます。このようなときはサブワークフロー8呼び出しで解決できるのですが、さらにこれを複数のWorkflow YAMLで利用するために、サブワークフローを集めたutility YAMLを作成しました。 つまり下のようなYAMLテンプレートになります。
main: steps: - set_terraform_template_variables: assign: - project_id: ${project_id} - wf_bucket: ${wf_bucket} - query_path: ${query_path} - publication_topic: ${publication_topic} - upsert: call: run_query args: project_id: $${project_id} query: $${get_gcs_object(wf_bucket, query_path)} # terraformによってサブワークフロー(run_query, get_gcs_object)が埋め込まれる ${utility}
Terraformのテンプレートレンダリングによって${utility}
にサブワークフロー定義が埋め込まれます。ただこれも人間の記憶力に頼った開発になります。
Cloud Workflowsのローカル開発支援は、現状JSON Schema9による構文サポートのみです。 例えば組み込み関数やサブワークフローの引数間違いをローカルで検証する手段はないため、デプロイがちゃんと成功するかは実際に試してみるまでわかりません。CI/CDにおける最大の難点です。
おわりに
最後に雑多な寸感を並べて終わります。
デプロイは高速です。また個人の感想ですが、Cloud Composerよりキビキビ動く印象があります。
プロダクトが開発段階なので費用については評価しづらいのですが、作成したWorkflowをdev環境で1ヶ月動かしてみたところCloud Workflowsの費用は8円でした。本格運用では1環境当たり30ドル/月に収まるのではないかと雑に見積もっています。今回のWorkflowは利用状況に応じてイテレーション数が増減するループがあるので、実行Step数や費用は定期的に監視していきたいです。
Web UIについては、ダッシュボードで課金状態やWorkflowごとの失敗履歴がさくっと確認できたり、WorkflowのStepのツリー表示ができたりと意外に便利!です。
ちなみによく言われることですが、Cloud Workflowsで非常によく使われるDAG内の特定のTaskからの再実行
が、Cloud Workflowsでは行えません。常にWorkflow単位の実行になるので、実装においてはWorkflowの冪等性と再実行時の効率化に気を配る必要があります。
ログに関するTipsとして、Cloud Run Job等を呼び出すときはWorkflowのexecution_id10を渡し、Job側のログ出力でユーザーラベルに追加するとトラブル時の調査が捗ります11。
{ ... "logging.googleapis.com/labels": { ... "workflows.googleapis.com/execution_id": "{execution_id}" } }
Cloud Workflowsには各種制限があります12。今回の作業では特別の留意が必要な項目はなさそうですが(並列Stepでブランチ数の上限10はなにかの拍子に引っかかるかもしれませんが、今回使った並列Stepはfor
のみ)、つねに頭の片隅に置いておく必要はあります。
- https://cloud.google.com/composer/docs↩
- https://cloud.google.com/workflows/docs↩
- https://cloud.google.com/workflows/docs/choose-orchestration↩
- https://cloud.google.com/workflows/pricing↩
- https://airflow.apache.org/docs/apache-airflow/2.7.0/_api/airflow/sensors/external_task/index.html#airflow.sensors.external_task.ExternalTaskSensor↩
- https://cloud.google.com/workflows/docs/reference/stdlib/sys/now など↩
- Cloud Scheduler、 Eventarc、 Cloud Tasks、 他のWorkflowから同期実行、 REST APIで非同期実行もできそう↩
- https://cloud.google.com/workflows/docs/samples/workflows-subworkflow↩
- https://cloud.google.com/workflows/docs/use-workflows-json-schema-with-ide↩
- https://cloud.google.com/workflows/docs/reference/stdlib/sys/get_env↩
- Cloud Workflows 内でコールしたCloud Functions, Cloud Runのログを紐付けて調査効率を高める↩
- https://cloud.google.com/workflows/quotas↩