FLINTERS Engineer's Blog

FLINTERSのエンジニアによる技術ブログ

Dagster University に入学してみた

こんにちは、株式会社FLINTERSに出向中の松田です。

本記事はFLINTERSブログリレー企画「梅雨にも負けないブログ祭り」の記事になります。データのオーケストレーションツールの1つである Dagster には Dagster University という公式のチュートリアルがあり、本記事ではその内容について紹介してみます。

テーマ: #オーケストレーション #dagster #データパイプライン

Dagsterとは

Dagster はテーブル、データセット、機械学習モデル、レポートといったデータアセットを開発、管理するために設計されたオーケストレータで主に以下の特徴があります。

  • OSS ( 2024年6月時点で Stars: 10.5 K, Contributors: 420 ) github.com

  • Python で Asset-centric なパイプラインを記述 (※ Asset-centric については後述)

  • マネージドサービス提供がある dagster.io

Dagster は独自の概念が多く学習コストがかかる印象がありますが公式からわかりやすいチュートリアルの Dagster University が提供されています。

Dagster University

Dagster University は無料の公式チュートリアルで2024年6月3日時点で以下のコースがあります。

  • Dagster Essentials
  • Dagster & dbt

これらのコースで実際にコードを書いたり各 Lesson に用意された問題を解いていくことなどを通して、Dagster が持つ概念やパイプラインの実装方法、Dagster でどういったことができるのかを学んでいくことができます。本記事では Dagster Essentials のコースについて記載します。

Dagster Essentials

Dagster Essentials のコースでは以下のレッスンからなり、主に Dagster を利用する上で抑えるべき概念やデータパイプラインの実装方法を学んでいくことになります。

  • Lesson 1: Introduction
  • Lesson 2: Prerequisites & setup
  • Lesson 3: Software-defined Assets
  • Lesson 4: Asset dependencies
  • Lesson 5: Definitions and code locations
  • Lesson 6: Resources
  • Lesson 7: Schedules
  • Lesson 8: Partitions and backfills
  • Lesson 9: Sensors
  • Capstone
  • Extra credit: Metadata

各 Lesson 概要

Lesson 1: Introduction

最初の導入ではデータエンジニアリングやオーケストレーションの概要について紹介しています。 特にワークフローのオーケストレーションには2つのアプローチ、

  1. Task-centric オーケストレーション: タスクの実行を管理・調整すること、つまりHowに焦点を当てたもの。
  2. Asset-centric オーケストレーション: 生成されるデータ(テーブルやファイルなど: Asset)の管理、つまりWhatに焦点を当てたもの。

があることを述べていて、Task-centric オーケストレーションの場合と比較しながらデータの再利用性を高めることができるなどの Asset-centric オーケストレーションがもたらすメリットについて知ることができるようになっています。

Lesson 2: Prerequisites & setup

ここではローカルで Dagster を動かすためのセットアップ方法と Dagster プロジェクトのディレクトリ、ファイル構成についての説明があります。

Lesson 3: Software-defined Assets

Asset はデータベースのテーブルやビュー、ファイルといった永続化ストレージ内のオブジェクトのことを指します。この Lesson では実際に Asset を定義するコードを書いて立ち上げた Dagster UI からの処理の実行 (materialize) やその結果を見る方法を学びます。 Asset 定義は以下のコード例のように記述し、Asset を作成・更新する関数に @asset デコレータ で修飾して Dagster に その関数が Asset を生成するものであることを伝えます。

from dagster import asset

@asset
def taxi_trips_file() -> None:
    ...
Lesson 4: Asset dependencies

この Lesson では 依存関係のある複数の Asset とその Asset 間の依存関係を定義してデータパイプラインを実装する方法を学びます。 以下のコード例は taxi_trips asset が taxi_trips_file asset に依存する関係を定義しています。

@asset(deps=["taxi_trips_file"])
def taxi_trips() -> None:
    ...
Lesson 5: Definitions and code locations

Definitions Object でデータパイプラインを構成するために必要な要素(Asset や Resources の集まりなど)を定義します。また Code Locations により環境を分離することが可能で、各チームがそれぞれの Code Location を利用することにより使用するPython のバージョンや Python ライブラリのバージョンを分離できます。

Lesson 6: Resources

Resources は Asset を作成するために利用する外部サービスやツールのことで、具体例としてはデータ取得API, データ保存先S3バケット等が挙げられています。 以下は duckdb 接続の Resources を定義した例になります。

from dagster_duckdb import DuckDBResource
from dagster import EnvVar

database_resource = DuckDBResource(
    database=EnvVar("DUCKDB_DATABASE")  # 環境変数 DUCKDB_DATABASE を取得し database 引数に渡す
)
Lesson 7: Schedules

cron 式をつかってデータパイプラインを実行する間隔を定義します。また Job という Asset の集まりを定義してそれぞれに異なるスケジュールを当てることが可能です。 以下は trips_by_week Asset を抜き出した trips_by_week_job という Job を定義し、毎週月曜の深夜実行のスケジュールを trips_by_week_job に設定している例になります。

from dagster import define_asset_job, AssetSelection

trips_by_week = AssetSelection.asset(["trips_by_week"])  # trips_by_week asset のみの Assetの集まりを定義

trips_by_week_job = define_asset_job(
    name="weekly_update_job",
    selection=trips_by_week,
)
from dagster import ScheduleDefinition

weekly_update_schedule = ScheduleDefinition(
    job=trips_by_week_job,
    cron_schedule="0 0 * * 1",
)
Lesson 8: Partitions and backfills

Asset 定義は独立して追跡および materialize できる Partitions の集まりを表すことができます。Lesson 7 で実装したスケジュール頻度に沿った Partitions を定義してスケジュール実行ごとに Partitionを追加したりその Asset で backfill 実行を試すことができます。

Lesson 9: Sensors

Sensors は特定のイベントを監視してそれに基づいて実行するためのもので、イベントの例としては Amazon S3 にファイルが置かれた などがあります。

Extra credit: Metadata

最後にこの Lesson では Asset へ description を追加、 Asset をグルーピングして UI に表示させる、レコード数をUIに表示させるといったことを学びます。 Asset へ description を追加するには Python docstrings か、@asset デコレータの description パラメータを使います。以下は taxi_trips_file Asset に docstrings で Asset の説明を記述した例になります。

@asset
def taxi_trips_file() -> None:
    """
    The raw parquet files for the taxi trip dataset ...
    """
    ...

Dagster UI 上で taxi_trips_file Asset は以下のように表示されます。

まとめ

Dagster Essentials を一通りやってみた感想としては、Dagster の概念を丁寧なステップで抑えていくことができるため私のような初めて学んだり触ってみようとする方にとても良いチュートリアルだと思いました。次は Dagster & dbt のコースを完了させたいと思います。

参考