FLINTERS Engineer's Blog

FLINTERSのエンジニアによる技術ブログ

AirflowのDAG実行時にパラメータを渡す

こんにちは、株式会社FLINTERSに出向中の松田です。 10周年記念として133日間ブログを書き続けるチャレンジの27日目の投稿になります。

本記事はAirflow Paramsを利用して各DAG実行に対してパラメータを渡す・参照する方法について紹介する入門的な記事になります。

※ 本記事のスクリーンショット、コードはAirflow 2.5で確認しています。

はじめに

AirflowのDAGを実行する際に、実行毎に何らかのパラメータを与えて実行したい場面があるのではないでしょうか。AirflowにはDAG外からパラメータを渡す方法として Airflow VariablesやParamsがあります。Airflow VariablesはDAG実行ごとに基本的には変更がない情報を格納することに適していますが、ParamsはDAG実行の際にTrigger DAG UI (後述)で入力した情報を反映させることが可能で実行毎に変わる情報を格納するのに適しています。

DAG実行時にパラメータを渡す

DAG実行時にパラメータを渡す方法としては以下の4つがあります。ここではAirflow UIから渡す方法について取り上げます。

  • Airflow UI のDAG実行ボタンの Trigger DAG w/ config を使う
  • Airflow CLIのairflow dag trigger--config, -c optionを使う
  • TriggerDAGRunOperatorconf 引数を使う
  • Airflow REST APIの Trigger a new DAG run エンドポイントで conf パラメータを使う

Airflow UIでDAGの実行のボタンをクリックするとTrigger DAG w/ config が表示されます(図1a)。それを選択するとTrigger DAG UIが開くのでその中にあるConfiguration Jsonの項目で渡すパラメータを設定することができます(図1b, ここでは {"param1": "param-1", "param2": "param-2"} を設定しています)。

図1a) Trigger DAG w/config. 図1b) Trigger DAG UI

設定後にDAGを実行するとパラメータはAirflow contextに渡されDAG内で参照できるようになります。

パラメータの仕様を設定する

DAGに渡したいパラメータの仕様は Paramクラスを使って定義することが可能で、それを@dag デコレータもしくは DAG()params引数に渡して設定します。以下にコード例を示します。ここで、param1パラメータはデフォルトの値が"param-1", 型がstringであることを、param2はデフォルト値が1000, 型がingeter, 最大値が5000であることをそれぞれ定義しています。このパラメータ定義は Trigger DAG UI に反映され図2のように表示されるようになります。

with DAG(
    dag_id=f"test_params",
    default_args=default_args,
    catchup=False,
    start_date=datetime(2023, 9, 1),
    schedule_interval=None,
    params={
        "param1": Param(default="param-1", type="string"),
        "param2": Param(default=1000, type="integer", maximum=5000),
    },
    ...

図2) パラメータを定義したときのTrigger DAG UI

また、設定したParamの値はJSON Schemaで検証されるため上記例で設定した型以外の値を渡してDAGを実行しようとすると以下のようにUI上でエラーが表示されるようになります(図3)。この例では値の型がintegerであるparam2"1000" という文字列を入れています。

図3) Trigger DAG UIでのvalidationエラー

パラメータを参照する

設定したパラメータはAirflow contextのparamsdag_run.conf に入ってきます。ここではparamsを使ったパラメータの参照について取り上げます。 以下はTaskFlow APIを使った場合とPythonOperatorを使ったコード例になります。

    ### TaskFlow APIを使った書き方 ###
    @task()
    def print_param1(params=None):
        print(params.get("param1"))

    p1 = print_param1()

    ### PythonOperatorを使った書き方 ###
    def print_param2(params=None):
        print(params.get("param2"))

    p2 = PythonOperator(
        task_id="print_param2",
        python_callable=print_param2,
        op_kwargs={
            "params": "{{ params }}"
        }
    )
パラメータ参照で躓いた点

実際に使ってみて少し躓いた点があったので以下2点取り上げます。

  • Airflow UI等から渡したパラメータが反映されない(DAG中で設定したパラメータのデフォルト値のままになる): この場合はairflow.cfgdag_run_conf_overrides_paramsの値を確認する必要があります。この値がFalseだとDAG中に設定したデフォルトのパラメータが実質的に定数となります。

  • パラメータを数値で渡しているのに文字列として入ってくる: この場合は @dagデコレータもしくはDAG()の引数render_template_as_native_objの値を確認する必要があります。デフォルトはFalseになっておりタスクに文字列が渡されます。

おわりに

AirflowのDAGレベルの実行時パラメータについて躓いた点を含め説明してみました。お読みいただきありがとうございました。

参考

Params — Airflow Documentation
Create and use params in Airflow | Astronomer Documentation
Command Line Interface and Environment Variables Reference — Airflow Documentation
Airflow REST API
Configuration Reference — Airflow Documentation