こんにちは、株式会社FLINTERSに出向中の松田です。 10周年記念として133日間ブログを書き続けるチャレンジの27日目の投稿になります。
本記事はAirflow Paramsを利用して各DAG実行に対してパラメータを渡す・参照する方法について紹介する入門的な記事になります。
※ 本記事のスクリーンショット、コードはAirflow 2.5で確認しています。
はじめに
AirflowのDAGを実行する際に、実行毎に何らかのパラメータを与えて実行したい場面があるのではないでしょうか。AirflowにはDAG外からパラメータを渡す方法として Airflow VariablesやParamsがあります。Airflow VariablesはDAG実行ごとに基本的には変更がない情報を格納することに適していますが、ParamsはDAG実行の際にTrigger DAG UI (後述)で入力した情報を反映させることが可能で実行毎に変わる情報を格納するのに適しています。
DAG実行時にパラメータを渡す
DAG実行時にパラメータを渡す方法としては以下の4つがあります。ここではAirflow UIから渡す方法について取り上げます。
- Airflow UI のDAG実行ボタンの
Trigger DAG w/ config
を使う - Airflow CLIの
airflow dag trigger
で--config
,-c
optionを使う TriggerDAGRunOperator
でconf
引数を使う- Airflow REST APIの Trigger a new DAG run エンドポイントで conf パラメータを使う
Airflow UIでDAGの実行のボタンをクリックするとTrigger DAG w/ config
が表示されます(図1a)。それを選択するとTrigger DAG UIが開くのでその中にあるConfiguration Jsonの項目で渡すパラメータを設定することができます(図1b, ここでは {"param1": "param-1", "param2": "param-2"}
を設定しています)。
設定後にDAGを実行するとパラメータはAirflow contextに渡されDAG内で参照できるようになります。
パラメータの仕様を設定する
DAGに渡したいパラメータの仕様は Param
クラスを使って定義することが可能で、それを@dag
デコレータもしくは DAG()
のparams
引数に渡して設定します。以下にコード例を示します。ここで、param1パラメータはデフォルトの値が"param-1", 型がstringであることを、param2はデフォルト値が1000, 型がingeter, 最大値が5000であることをそれぞれ定義しています。このパラメータ定義は Trigger DAG UI に反映され図2のように表示されるようになります。
with DAG( dag_id=f"test_params", default_args=default_args, catchup=False, start_date=datetime(2023, 9, 1), schedule_interval=None, params={ "param1": Param(default="param-1", type="string"), "param2": Param(default=1000, type="integer", maximum=5000), }, ...
また、設定したParamの値はJSON Schemaで検証されるため上記例で設定した型以外の値を渡してDAGを実行しようとすると以下のようにUI上でエラーが表示されるようになります(図3)。この例では値の型がintegerであるparam2
に "1000"
という文字列を入れています。
パラメータを参照する
設定したパラメータはAirflow contextのparams
や dag_run.conf
に入ってきます。ここではparams
を使ったパラメータの参照について取り上げます。
以下はTaskFlow APIを使った場合とPythonOperatorを使ったコード例になります。
### TaskFlow APIを使った書き方 ### @task() def print_param1(params=None): print(params.get("param1")) p1 = print_param1() ### PythonOperatorを使った書き方 ### def print_param2(params=None): print(params.get("param2")) p2 = PythonOperator( task_id="print_param2", python_callable=print_param2, op_kwargs={ "params": "{{ params }}" } )
パラメータ参照で躓いた点
実際に使ってみて少し躓いた点があったので以下2点取り上げます。
Airflow UI等から渡したパラメータが反映されない(DAG中で設定したパラメータのデフォルト値のままになる): この場合は
airflow.cfg
のdag_run_conf_overrides_params
の値を確認する必要があります。この値がFalse
だとDAG中に設定したデフォルトのパラメータが実質的に定数となります。パラメータを数値で渡しているのに文字列として入ってくる: この場合は
@dag
デコレータもしくはDAG()
の引数render_template_as_native_obj
の値を確認する必要があります。デフォルトはFalseになっておりタスクに文字列が渡されます。
おわりに
AirflowのDAGレベルの実行時パラメータについて躓いた点を含め説明してみました。お読みいただきありがとうございました。
参考
Params — Airflow Documentation
Create and use params in Airflow | Astronomer Documentation
Command Line Interface and Environment Variables Reference — Airflow Documentation
Airflow REST API
Configuration Reference — Airflow Documentation