FLINTERS Engineer's Blog

FLINTERSのエンジニアによる技術ブログ

DigdagからBigQueryを動かす - ChatGPTを使用してエラーを解決した話

こんにちは、植村です。今回は弊チームで使用しているワークフローエンジンのdigdag(AWS, EC2)からGCPへアクセスしてBigQueryを動かした時のお話を書きます。

やりたいこと:DigdagからBigQueryのExport機能を使用してGCSにデータを格納する

GCPの環境は既にできているとします。手順は以下の通りになります。

  1. BigQueryからデータを取得してGCSに格納するクエリを作成する
  2. BigQuery API クライアント ライブラリを使用してPythonから1のSQLを使用する
  3. Digdag secretsによりGCPへサービスアカウントキー(Json)を登録
  4. digdagからPythonにサービスアカウントキーを渡す
  5. pyオペレーターを使用してPythonを動かす


1,2でまずはPython経由でGoogleのAPIを叩いてBigQueryからGCSに結果を吐き出す処理を作り、3,4,5でdigdagからその処理を呼び出すという流れですね。

1.BigQueryからデータを取得してGCSに格納するクエリを作成する

BigQueryのEXPORT DATAを使用します。

EXPORT DATA
    OPTIONS(
        uri='gs://<結果を吐き出すディレクトリのパス>/sql_*.csv.gz',
        format='CSV',
        overwrite=true,
        header=true,
        field_delimiter=',',
        compression='GZIP'
        )
    AS WITH tmp AS (
        SELECT * `<DBとテーブル名>` #ここに処理を行うSQLを書く
    )
    SELECT
    *
    FROM tmp
2.BigQuery API クライアント ライブラリを使用してPythonから1のSQLを使用する

先ほど作成したSQLをPythonから読み込んで使用してあげましょう。GCPのサービスアカウントキーを発行しておいてくださいね。

from google.cloud import bigquery
import os

def generate_sql(sql_file):
    with open(sql_file, 'r') as f:
        query = f.read()
    query_result = query
    return query_result
os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = "gcp_key.json" # 同じディレクトリにGCPのサービスアカウントキーを置いておく
GCP_PROJECT_ID = "<プロジェクトID>"
client = bigquery.Client(project=GCP_PROJECT_ID)
file_path = "sample.sql" # 同じディレクトリに1のsqlを配置しておく
query_job = client.query(load_sql(sql_path))

これでBigQueryのExportを使用してGCSにデータ格納する処理ができました。次にこいつをDigdagから呼び出します。

3. Digdag secretsによりGCPへサービスアカウントキー(Json)を登録

まずサービスアカウントキーをdigdag secretを使用してdigdagで使えるようにします。

$ digdag secret --project [YOUR_PROJECT_NAME] --set gcp.credential=@/path/to/key.json
4.digdagからPythonにサービスアカウントキーを渡す

これでdigdagサーバーが環境変数としてGCPのサービスアカウントキーを参照できます。

timezone: Asia/Tokyo

_export:
  docker:
    image: <使用するpythonイメージのECRのURI>
+bq:
  _export:
    secret_gcp_credential: ${secret:gcp.credential}
  py>: scripts.bigquery_to_gcs.excute
  _env:
    secret_gcp_credential: ${secret:gcp.credential}
5. pyオペレーターを使用してPythonを動かす

2によりGCPのサービスアカウントキーを環境変数に登録しているので、そちらを一度Jsonに書き出して生成されたJsonのパスを渡してあげれば良いはず。
なので以下のようにコードの修正を行いました。

from google.cloud import bigquery
import os
import json


def load_sql(sql_file) -> str:
    with open(sql_file, 'r') as f:
        query = f.read()
    query_result = query

    return query_result


def excute() -> None:
    gcp_cred = os.environ["secret_gcp_credential"]
    with open("gcp_key.json", "w") as outputFile:
        json.dump(gcp_cred, outputFile, indent=2)

    os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = "gcp_key.json"
    GCP_PROJECT_ID = "<プロジェクトID>"
    client = bigquery.Client(project=GCP_PROJECT_ID)

    sql_path = "./prediction/queries/sample.sql"  
    query_job = client.query(load_sql(sql_path))
    print(query_job)


if __name__ == '__main__':
    excute()

はい、これで終了のはずですが、エラーが発生しました。

2023-03-14 13:23:35.413 +0000 [ERROR] io.digdag.core.agent.OperatorManager: Task failed with unexpected error: Python command failed with code 1
Error messages from python: 'str' object has no attribute 'get' (AttributeError)

これは正しくJson.dumpが出来ていなく、strの型としてサービスアカウントキーを読み込んでしまっているエラーですね。

原因を特定するために、

  • サーバー上で行っていたJson生成の処理をローカルで再現する。
  • GCPからダウンロードした認証が通るJsonと比べる。

を行います。すると以下のことがわかりました。

①. 成功するJsonをダンプする文字列

gcp_cred = {"type": "serivice_acount", "project_id": "my_project"} ※ 実際のgcpのキーとは異なります

②. 失敗するJsonをダンプする文字列

gcp_cred = '{"type": "serivice_acount", "project_id": "my_project"}'

③. ①で作られたJson

{ 
  "type": "serivice_acount", 
  "project_id": "my_project"
}

④. ②で作られたJson

"{ \"type \":  \"serivice_acount \",  \"project_id \":  \"my_project \"}"

ふむ。こんなの渡したら怒られて当然。digdag secret → 環境変数登録 → os.environを使用すると②の渡されて④が生成されるようですね。


方向性としては ②.失敗するJsonを①.成功するJsonに変えれば良いと考えました。stripとかを使って両端のシングルクオテーションを消してあげればいいかな?(人間の目視の限界)
ここまでくればあとはコーディング方法の問題ですね。せっかく弊社のCTOがChatGPTをslackで使用できるようにしてくれていたので、試しに使用しました。






なので、pythonの最終コードはこのようになります。無事これで解決しました。

from google.cloud import bigquery
import os
import json


def load_sql(sql_file) -> str:
    with open(sql_file, 'r') as f:
        query = f.read()
    query_result = query

    return query_result


def excute() -> None:
    gcp_cred = os.environ["secret_gcp_credential"]
    with open("gcp_key.json", "w") as outputFile:
        json.dump(json.loads(gcp_cred), outputFile, indent=2)

    os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = "gcp_key.json"
    GCP_PROJECT_ID = "<プロジェクトID>"
    client = bigquery.Client(project=GCP_PROJECT_ID)

    sql_path = "./prediction/queries/sample.sql"  
    query_job = client.query(load_sql(sql_path))
    print(query_job)


if __name__ == '__main__':
    excute()


羽生善治先生が、AIは人間が初めから捨てている手を打ってくる、という文脈で話していたのを思い出しました(出典出てこず)。
目的は正しい形でJsonを渡すことなので、シングルクオテーションを外すという発想が陳腐でしたね。

AIそのものの進化と共に、どのようにしてAIが解ける形に課題を持っていくか、がこれからのエンジニアの仕事の在り方なのかも知れませんね。(了)