透過Cloud Function執行BigQuery 的Scheduled Query。
- 建立範例資料,這邊運用public dataset所提供的citibike
(1) 搜尋citibike
(2) Broaden search to all projects.
(3) 選citibike_trips
(4) copy
(5) 選自己的project,dataset (須為us (multiple regions in United States)), table name= citibike_trips
2. 建立一個scheduled query
貼上以下query:
create or replace table citibike.trips_summary
as
select
tripduration,
starttime,
stoptime,
start_station_name,
end_station_name,
gender,
birth_year
from
`citibike.citibike_trips`
LIMIT 10
Schedule > Create new scheduled query
3. 命名scheduled query > save
4. 複製transer id
至scheduled query,點選剛剛建立的query,複製上方url configs 後面的 id
(剛剛新增scheduled query 時會自動執行一次,可先刪除新增的table以便後續測試)
5. 進 cloud function > Create Function
6. 因為測試,簡單的運用新增檔案至cloud storage的bucket 來觸發 > next
7. 選python
main.py (修改projectid 及transferid步驟4.複製的id, entry point =runQuery )
import time
from google.protobuf.timestamp_pb2 import Timestamp
from google.cloud import bigquery_datatransfer_v1
def runQuery (parent, requested_run_time):
client = bigquery_datatransfer_v1.DataTransferServiceClient()
projectid = ‘mock2–359902’ # Enter your projectID here
transferid = ‘633bb0c6–0000–2d27–8dad-2405886feaa4’ # Enter your transferId here
parent = client.project_transfer_config_path(projectid, transferid)
start_time = bigquery_datatransfer_v1.types.Timestamp(seconds=int(time.time() + 10))
response = client.start_manual_transfer_runs(parent, requested_run_time=start_time)
print(response)
requirement.txt
google-cloud-bigquery-datatransfer==1
然後deploy。
測試方式: 至剛剛cloud function trigger留下的bucket上傳任一檔案
結果: 會再create 一個table: trips_summary