FLINTERS Engineer's Blog

FLINTERSのエンジニアによる技術ブログ

AirflowのSensorをざっくり勉強した

こんにちは。杉山です。

こちらはFLINTERSで開催中の「梅雨にも負けないブログ祭り」企画の記事です。 テーマは #技術 です。

AirflowのSensorを使ったことがなくて気になったのでざっくり調べてhello worldしました

Sensorとは

早速Sensorのドキュメントを見てみる

airflow.apache.org

(Google翻訳でページごと翻訳)

センサーは、何かが起こるのを待つという 1 つのことだけを行うように設計された特殊なタイプのオペレーターです。時間ベース、ファイル、または外部イベントを待つこともできますが、センサーが行うことは、何かが起こるまで待機し、その後、下流のタスクを実行できるように成功することだけです。

うんうん

センサーは主にアイドル状態であるため、2 つの異なる実行モードがあり、より効率的に使用できます。

poke(デフォルト): センサーは実行時間全体にわたってワーカースロットを占有します

reschedule: センサーはチェックしているときのみワーカースロットを占有し、チェックの合間には一定時間スリープします。

ワーカースロット占有はちょっとやばいな

およびモードpokeはreschedule、センサーをインスタンス化するときに直接構成できます。通常、これらの間のトレードオフはレイテンシです。1 秒ごとにチェックするものはpokeモードで、1 分ごとにチェックするものは モードでなければなりませんreschedule。

Operator と同様に、Airflow には、コア Airflow とプロバイダーシステムの両方で使用できる、事前に構築されたセンサーの大規模なセットがあります。

なるほどね!

ChatGPT様にサンプルコードを作ってもらった

airflowのsensorってどうやって使うの?

from datetime import datetime, timedelta
from airflow import DAG
from airflow.sensors.filesystem import FileSensor
from airflow.operators.dummy_operator import DummyOperator


default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2023, 1, 1),
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

dag = DAG(
    'example_dag',
    default_args=default_args,
    description='An example DAG with FileSensor',
    schedule_interval=timedelta(days=1),
)

start = DummyOperator(
    task_id='start',
    dag=dag,
)

file_sensor = FileSensor(
    task_id='wait_for_file',
    fs_conn_id='fs_default',  # 'fs_default' はファイルシステム接続のID。必要に応じて変更。
    filepath='/path/to/your/file.txt',
    poke_interval=60,  # 60秒ごとにチェック
    timeout=600,  # 10分でタイムアウト
    dag=dag,
)

end = DummyOperator(
    task_id='end',
    dag=dag,
)

start >> file_sensor >> end

(ちょっと書き方古いかも?今はEmptyOperatorだったような?まぁ本筋と関係ないのでスルー)

なるほどFileSensorっていうオペレータがあるのか

file_sensor = FileSensor(
    task_id='wait_for_file',
    fs_conn_id='fs_default',  # 'fs_default' はファイルシステム接続のID。必要に応じて変更。
    filepath='/path/to/your/file.txt',
    poke_interval=60,  # 60秒ごとにチェック
    timeout=600,  # 10分でタイムアウト
    dag=dag,
)

60秒ごとにfilepathに指定したファイルをチェックできるっぽい雰囲気

filepathにはglobも指定できるらしい

airflow.apache.org

せっかく作ってもらったし動かしてみる

準備

業務でCloudComposerを使ってるので、とりあえずこの辺を見てローカル環境構築

cloud.google.com

ローカルでAirflowが動くようになったはずなので、デバッグ用に少しコードを書き換える

  • filepathを /home/airflow/gcs/data/input/hoge.txt
  • poke_intervalを5に
  • DAGの引数に schedule_interval='@once',catchup=False, も追加してなるべく余計な処理をさせないようにしておく

filepathに何を指定したらいいかわからなかったが、WebUIのAdmin > Configurationで設定一覧を見て、data_folderの値がそれっぽかった

実際に動かしてみた

ログを見ると5秒おきにpokeしてて、後続の処理をブロックしてる様子

対象のファイルを置いてみる

処理が終了してて後続の処理も成功してた

完全に理解した

他のSensor

ファイル以外にもコマンド、時間、プログラム、他のタスクによる制御ができそう

airflow.apache.org

今のプロジェクトではGoogleCloudStorageを使ってるので実際に使うとしたらこの辺になりそう

airflow.apache.org

まとめ

Sensorはイベント駆動のようなことが簡単にできる便利なやつということがわかった

ユースケース次第では採用が難しそうな印象

  • 不定期なファイルアップロードの検知(一生停止せずにポーリングし続けるdagを作れたらいけそうだが…処理中に新しいファイルがアップロードされた時とか面倒くさそう)
  • FileSensorのfilepathにglobを指定できるのでどんなファイル名でマッチしたか取得したくなりそうだが、ぱっと見できなさそう。

ここにSensorを使う上で重要なことが書いてありそうだった。

airflow.apache.org

ワーカーを占有する問題に対して非同期なトリガーを使ってうまく解決できるっぽい。Sensorを採用するならこれを理解してからの方が良さそうなので今度読んでみる。