FLINTERS Engineer's Blog

FLINTERSのエンジニアによる技術ブログ

Snowpipe + Streamを用いたSnowflakeデータパイプラインの構築

データチームの出口です。
この記事はFLINTERSブログ祭りの記事です。テーマは #Snowflake #技術 #データパイプライン です。

今回はデータチームが現在取り組んでいるSnowflakeを活かしたデータパイプラインの構築について紹介したいと思います。

現状の問題点

現状の構成では以下の点に問題が生じていました。

  • 格納データに不正なデータが混ざり(レコードの重複や主キーのNULLなど)、後続での利用に影響が生じるリスクがある
  • 加工された値の格納のため、過去のAPIバージョンで取得した値をDWH上で確認できない
    • そのため、取得データを確認するためにはS3上のAPI取得データを確認する必要がある
  • バッチ処理で決まった時間にS3からロードしているが、S3のファイルのデータの異常によるデータ再取得やファイルの格納遅れなどで、ワークフローを再実行して取り込む手間がある

一連の構成

現状の問題点を解決するため、下図のようなデータパイプラインを構築するように取り組んでいます。

メダリオンアーキテクチャ

Bronze LayerやSilver Layerという表記がありますが、これはメダリオンアーキテクチャという概念によるものです。 メダリオンアーキテクチャはデータ層を3つの層に分け、段階的に処理することでデータの品質と整合性を高めることができます。

  • ブロンズ層(ローデータ層):一切の加工を施されない生データのまま保持する層
  • シルバー層(クレンジング層):重複データの削除、NULL値の処理、形式の正規化など、適度なクレンジングや整形処理が行われる層
  • ゴールド層: BIツールやダッシュボードなどに直接利用されるような、高度なデータ処理が行われ、分析やレポートに直接使用できる最終形態のデータが保存される層(ゴールド層にあたるテーブルはデータチームで扱わないのでここでは対象外)

現状の構成ではメダリオンアーキテクチャでいうシルバー層のテーブルしか存在しなかったため、過去のAPIバージョンで取得した値を調べたり、データ異常でAPIから取得したデータが誤っているのかを調べるために、S3上のAPI取得データを確認する必要がありました。メダリオンアーキテクチャ導入によってブロンズ層のテーブルができるため、S3ではなくブロンズ層のテーブルを調べることができ、調査の負荷が下がるようになります。

Snowpipe

SnowpipeはSnowflakeの機能で、ファイルストレージに追加されたファイルを検知し、テーブルに自動的にロードします。継続的なロードが行われるので、ほぼリアルタイムでデータ取り込みが可能になります。
今回はSnowpipeを使ってS3 -> ブロンズ層のロードを行います。

Snowpipe | Snowflake Documentation

Stream

StreamはSnowflakeの機能で、挿入、更新、削除などのテーブルに加えられたデータ操作言語(DML)の変更を追跡する機能です。
今回はブロンズ層に新規に挿入されたデータを追跡し、そのデータを対象にしてシルバー層に挿入するようにします。

ストリームの紹介 | Snowflake Documentation

StreamはTask(クエリを定期的に実行するSnowflakeの機能)と組み合わせて利用されているところを見ることが多いですが、TaskではなくSnowflakeのクエリを使えるワークフローツール(ここではDigdag)と組み合わせても問題ないようです。

ブロンズ層 -> シルバー層についてはDigdagのワークフローをマイクロバッチ的に動かし、ストリームにデータがある状態のときだけシルバー層への格納処理を行うようにします。それを達成するためにSnowflake関数SYSTEM$STREAM_HAS_DATA()を用います。
SYSTEM$STREAM_HAS_DATA()のドキュメントには

この関数は、タスクの定義の WHEN 式で使用することを目的としています とありますが、SELECT SYSTEM$STREAM_HAS_DATA();のように普通にクエリとして実行できます。また、システム関数なのでウェアハウスを利用せずにクエリを実行することができます。そのため、ブロンズ層に新規に挿入されたデータがなければ、その時点で格納処理を実行しないようにすることでウェアハウスを動かさずにコストを抑えることができます。

SYSTEM$STREAM_HAS_DATA | Snowflake Documentation

Taskを使わないメリットとして、Snowflake SQL以外のタスクが実行できます。
ここでは格納処理にdbtのインクリメンタルモデルを利用します。これにより、ストリームのデータを利用して効率的に増分更新でき、クレンジングを行うことが可能です。

終わりに

今回の記事では、Snowflakeを活用したデータパイプラインの構築に関する取り組みを紹介しました。メダリオンアーキテクチャやSnowpipe、StreamといったSnowflakeの機能を駆使することで、不正データのクレンジングや効率的なデータ処理が可能となり、データの一貫性と信頼性が向上します。また、Snowflakeのプレビュー機能にはDynamic Tableという機能があり、より簡素化されたデータパイプラインの作成に役立ちそうです。今後もこれらの技術を駆使して、より最適なデータパイプラインの構築を目指していきます。