[GCP]將csv檔案上傳至 Cloud Storage自動新增至BigQuery

Memo
3 min readJul 9, 2022

情境: 假設每天會上傳一份最新更新資料(csv)至cloud storage,怎麼應用cloud function 來觸發流程將資料自動更新至BigQuery的table呢?

簡易資料流程如下圖:

需先設置好的GCP服務: (最後附錄提供簡易建置步驟)

  1. Cloud Storage: 請先建立一個bucket
  2. Composer: 請先建立一個Composer 2
  3. BigQuery: 請先建立一個 dataset

以下內容主要會說明:

如何上傳一個DAG至Composer

如何建立一個trigger這DAG的Cloud Function

測試完整流程

實作:

  1. 上傳DAG

建立Composer時,會自動建立一個Cloud Storage bucket 來存放資料,以下DAGs Folder 可以直接開啟對應的資料夾。

Composer Environments
upload to the DAG bucket

DAG Sample Code (save it as a python file and upload it)

import airflow
from datetime import datetime
from airflow import DAG
from airflow.contrib.operators.gcs_to_bq import GoogleCloudStorageToBigQueryOperator

dag = DAG(‘trigger_bucket_to_bq’,
start_date=datetime(2022,7,9),
schedule_interval=None
)

#Change to your ID
BQ_connectionID = “project-0627”
BQ_projectID = “project-0627”
BQ_dataset = “composerv2”
CS_bucket = “bucket_composerv2”

t1 = GoogleCloudStorageToBigQueryOperator(
task_id=’bucket_to_bq’,
bucket= CS_bucket,

#the updated csv file should be named as “test_TODAY(YYYYMMDD).csv”
source_objects = [‘test_’+datetime.today().strftime(‘%Y%m%d’)+’.csv’],

#Sample data schema
schema_fields=[
{‘name’: ‘ID’, ‘type’: ‘STRING’, ‘mode’: ‘NULLABLE’},
{‘name’: ‘Date’, ‘type’: ‘STRING’, ‘mode’: ‘NULLABLE’},
{‘name’: ‘Value’, ‘type’: ‘STRING’, ‘mode’: ‘NULLABLE’},
],

#Output table in BigQuery
destination_project_dataset_table= BQ_projectID +’.’+ BQ_dataset+ ‘.CSData’,
write_disposition=’WRITE_APPEND’,
skip_leading_rows=1,
google_cloud_storage_conn_id=BQ_connectionID,
bigquery_conn_id=BQ_connectionID,
dag = dag
)

t1

2. 建立Cloud Function 來觸發上面建立的DAG

請參考以下網址:

以下提供實作步驟圖片:

Trigger by Cloud Storage

Event type 選 On(finalizing/creating)file in the selected bucket

Bucket 選之後要上傳csv檔案的地方,這邊觸發最小單位只能選到bucket,不能指定bucket內的folder。

Save 後至下一步,以下Runtime 選Python 3.9,並直接貼上網站內所提供的requirements.txt跟composer2_airflow_rest_api.py這兩的code。

requirements.txt
新增composer2_airflow_rest_api.py

main.py則須修改以下2個部分:

main.py

請至Airflow webserver上找到這兩的值填入:

Airflow webserver

完成以上三個檔案就可以部屬。

3. 測試觸發流程

上傳csv檔至上面指定的bucket

檔名DAG裡面目前設定只找 “test_今天日期.csv” (ex, test_20220709.csv)

若需修改檔案名稱,請至DAG找到以下部分進行修改: (目前設定是找不到一樣的檔案名稱會一直retry)

DAG

可至View logs查看流程是否有觸發。

View logs
logs

接下來可以去Airflow workserver上查看DAG流程是否成功。

最後流程跑晚後,資料會直接出現在BigQuery的table上。

BigQuery query table

目前是用Append的方式將每天的資料接續下去,若要改為將之前的資料刪除(truncate)的話,請至DAG修改 WRITE_APPEND 改成 WRITE_TRUNCATE。

DAG

以上完成資料自動化流程將csv 檔 從Cloud Storage 新增至 BigQuery。

*******************************************************************

附錄:

  1. Cloud Storage 建 Bucket,請參考以下值:
Create bucket

2. 建立Composer,請參考以下值: (建立可能會須10、20分鐘)

Create Composer

3. BigQuery 建Dataset

Create Dataset in BigQuery

--

--