情境: 假設每天會上傳一份最新更新資料(csv)至cloud storage,怎麼應用cloud function 來觸發流程將資料自動更新至BigQuery的table呢?
簡易資料流程如下圖:
需先設置好的GCP服務: (最後附錄提供簡易建置步驟)
- Cloud Storage: 請先建立一個bucket
- Composer: 請先建立一個Composer 2
- BigQuery: 請先建立一個 dataset
以下內容主要會說明:
如何上傳一個DAG至Composer
如何建立一個trigger這DAG的Cloud Function
測試完整流程
實作:
- 上傳DAG
建立Composer時,會自動建立一個Cloud Storage bucket 來存放資料,以下DAGs Folder 可以直接開啟對應的資料夾。
DAG Sample Code (save it as a python file and upload it)
import airflow
from datetime import datetime
from airflow import DAG
from airflow.contrib.operators.gcs_to_bq import GoogleCloudStorageToBigQueryOperatordag = DAG(‘trigger_bucket_to_bq’,
start_date=datetime(2022,7,9),
schedule_interval=None
)#Change to your ID
BQ_connectionID = “project-0627”
BQ_projectID = “project-0627”
BQ_dataset = “composerv2”
CS_bucket = “bucket_composerv2”t1 = GoogleCloudStorageToBigQueryOperator(
task_id=’bucket_to_bq’,
bucket= CS_bucket,#the updated csv file should be named as “test_TODAY(YYYYMMDD).csv”
source_objects = [‘test_’+datetime.today().strftime(‘%Y%m%d’)+’.csv’],#Sample data schema
schema_fields=[
{‘name’: ‘ID’, ‘type’: ‘STRING’, ‘mode’: ‘NULLABLE’},
{‘name’: ‘Date’, ‘type’: ‘STRING’, ‘mode’: ‘NULLABLE’},
{‘name’: ‘Value’, ‘type’: ‘STRING’, ‘mode’: ‘NULLABLE’},
],#Output table in BigQuery
destination_project_dataset_table= BQ_projectID +’.’+ BQ_dataset+ ‘.CSData’,
write_disposition=’WRITE_APPEND’,
skip_leading_rows=1,
google_cloud_storage_conn_id=BQ_connectionID,
bigquery_conn_id=BQ_connectionID,
dag = dag
)t1
2. 建立Cloud Function 來觸發上面建立的DAG
請參考以下網址:
以下提供實作步驟圖片:
Event type 選 On(finalizing/creating)file in the selected bucket
Bucket 選之後要上傳csv檔案的地方,這邊觸發最小單位只能選到bucket,不能指定bucket內的folder。
Save 後至下一步,以下Runtime 選Python 3.9,並直接貼上網站內所提供的requirements.txt跟composer2_airflow_rest_api.py這兩的code。
main.py則須修改以下2個部分:
請至Airflow webserver上找到這兩的值填入:
完成以上三個檔案就可以部屬。
3. 測試觸發流程
上傳csv檔至上面指定的bucket
檔名DAG裡面目前設定只找 “test_今天日期.csv” (ex, test_20220709.csv)
若需修改檔案名稱,請至DAG找到以下部分進行修改: (目前設定是找不到一樣的檔案名稱會一直retry)
可至View logs查看流程是否有觸發。
接下來可以去Airflow workserver上查看DAG流程是否成功。
最後流程跑晚後,資料會直接出現在BigQuery的table上。
目前是用Append的方式將每天的資料接續下去,若要改為將之前的資料刪除(truncate)的話,請至DAG修改 WRITE_APPEND 改成 WRITE_TRUNCATE。
以上完成資料自動化流程將csv 檔 從Cloud Storage 新增至 BigQuery。
*******************************************************************
附錄:
- Cloud Storage 建 Bucket,請參考以下值:
2. 建立Composer,請參考以下值: (建立可能會須10、20分鐘)
3. BigQuery 建Dataset