こんにちは。杉山です。
こちらはFLINTERSで開催中の「梅雨にも負けないブログ祭り」企画の記事です。 テーマは #技術 です。
AirflowのSensorを使ったことがなくて気になったのでざっくり調べてhello worldしました
Sensorとは
早速Sensorのドキュメントを見てみる
(Google翻訳でページごと翻訳)
センサーは、何かが起こるのを待つという 1 つのことだけを行うように設計された特殊なタイプのオペレーターです。時間ベース、ファイル、または外部イベントを待つこともできますが、センサーが行うことは、何かが起こるまで待機し、その後、下流のタスクを実行できるように成功することだけです。
うんうん
センサーは主にアイドル状態であるため、2 つの異なる実行モードがあり、より効率的に使用できます。
poke(デフォルト): センサーは実行時間全体にわたってワーカースロットを占有します
reschedule: センサーはチェックしているときのみワーカースロットを占有し、チェックの合間には一定時間スリープします。
ワーカースロット占有はちょっとやばいな
およびモードpokeはreschedule、センサーをインスタンス化するときに直接構成できます。通常、これらの間のトレードオフはレイテンシです。1 秒ごとにチェックするものはpokeモードで、1 分ごとにチェックするものは モードでなければなりませんreschedule。
Operator と同様に、Airflow には、コア Airflow とプロバイダーシステムの両方で使用できる、事前に構築されたセンサーの大規模なセットがあります。
なるほどね!
ChatGPT様にサンプルコードを作ってもらった
airflowのsensorってどうやって使うの?
from datetime import datetime, timedelta from airflow import DAG from airflow.sensors.filesystem import FileSensor from airflow.operators.dummy_operator import DummyOperator default_args = { 'owner': 'airflow', 'depends_on_past': False, 'start_date': datetime(2023, 1, 1), 'retries': 1, 'retry_delay': timedelta(minutes=5), } dag = DAG( 'example_dag', default_args=default_args, description='An example DAG with FileSensor', schedule_interval=timedelta(days=1), ) start = DummyOperator( task_id='start', dag=dag, ) file_sensor = FileSensor( task_id='wait_for_file', fs_conn_id='fs_default', # 'fs_default' はファイルシステム接続のID。必要に応じて変更。 filepath='/path/to/your/file.txt', poke_interval=60, # 60秒ごとにチェック timeout=600, # 10分でタイムアウト dag=dag, ) end = DummyOperator( task_id='end', dag=dag, ) start >> file_sensor >> end
(ちょっと書き方古いかも?今はEmptyOperatorだったような?まぁ本筋と関係ないのでスルー)
なるほどFileSensorっていうオペレータがあるのか
file_sensor = FileSensor( task_id='wait_for_file', fs_conn_id='fs_default', # 'fs_default' はファイルシステム接続のID。必要に応じて変更。 filepath='/path/to/your/file.txt', poke_interval=60, # 60秒ごとにチェック timeout=600, # 10分でタイムアウト dag=dag, )
60秒ごとにfilepathに指定したファイルをチェックできるっぽい雰囲気
filepathにはglobも指定できるらしい
せっかく作ってもらったし動かしてみる
準備
業務でCloudComposerを使ってるので、とりあえずこの辺を見てローカル環境構築
ローカルでAirflowが動くようになったはずなので、デバッグ用に少しコードを書き換える
- filepathを
/home/airflow/gcs/data/input/hoge.txt
に - poke_intervalを5に
- DAGの引数に
schedule_interval='@once',
とcatchup=False,
も追加してなるべく余計な処理をさせないようにしておく
filepathに何を指定したらいいかわからなかったが、WebUIのAdmin > Configurationで設定一覧を見て、data_folderの値がそれっぽかった
実際に動かしてみた
ログを見ると5秒おきにpokeしてて、後続の処理をブロックしてる様子
対象のファイルを置いてみる
処理が終了してて後続の処理も成功してた
完全に理解した
他のSensor
ファイル以外にもコマンド、時間、プログラム、他のタスクによる制御ができそう
今のプロジェクトではGoogleCloudStorageを使ってるので実際に使うとしたらこの辺になりそう
まとめ
Sensorはイベント駆動のようなことが簡単にできる便利なやつということがわかった
ユースケース次第では採用が難しそうな印象
- 不定期なファイルアップロードの検知(一生停止せずにポーリングし続けるdagを作れたらいけそうだが…処理中に新しいファイルがアップロードされた時とか面倒くさそう)
- FileSensorのfilepathにglobを指定できるのでどんなファイル名でマッチしたか取得したくなりそうだが、ぱっと見できなさそう。
ここにSensorを使う上で重要なことが書いてありそうだった。
ワーカーを占有する問題に対して非同期なトリガーを使ってうまく解決できるっぽい。Sensorを採用するならこれを理解してからの方が良さそうなので今度読んでみる。