1818import logging
1919import os
2020from datetime import datetime , timezone
21- from google .cloud import spanner
22- from google .cloud . spanner_v1 import Transaction
21+ from google .auth . transport . requests import Request
22+ from google .oauth2 import id_token
2323import google .cloud .workflows
24+ from google .cloud .workflows import executions_v1
25+ import requests
2426
2527logging .getLogger ().setLevel (logging .INFO )
2628
2729PROJECT_ID = os .environ .get ('PROJECT_ID' )
2830LOCATION = os .environ .get ('LOCATION' )
29- WORKFLOW_ID = os .environ .get ('WORKFLOW_ID' , 'spanner-ingestion-workflow' )
30- SPANNER_PROJECT_ID = os .environ .get ('SPANNER_PROJECT_ID' )
31- SPANNER_INSTANCE_ID = os .environ .get ('SPANNER_INSTANCE_ID' )
32- SPANNER_DATABASE_ID = os .environ .get ('SPANNER_DATABASE_ID' )
33- DEFAULT_GRAPH_PATH = "/**/*.mcf*"
31+ WORKFLOW_ID = 'spanner-ingestion-workflow'
32+ INGESTION_HELPER_URL = f"https://{ LOCATION } -{ PROJECT_ID } .cloudfunctions.net/spanner-ingestion-helper"
3433
3534
3635def invoke_ingestion_workflow (import_name ):
@@ -39,7 +38,7 @@ def invoke_ingestion_workflow(import_name):
3938 parent = f"projects/{ PROJECT_ID } /locations/{ LOCATION } /workflows/{ WORKFLOW_ID } "
4039 workflow_args = {"imports" : [import_name ]}
4140 try :
42- execution_req = workflows . executions_v1 .Execution (
41+ execution_req = executions_v1 .Execution (
4342 argument = json .dumps (workflow_args ))
4443 response = execution_client .create_execution (parent = parent ,
4544 execution = execution_req )
@@ -50,52 +49,31 @@ def invoke_ingestion_workflow(import_name):
5049 logging .error (f"Error triggering workflow: { e } " )
5150
5251
53- def update_import_status (import_name , dest_dir , status ):
52+ def update_import_status (import_name , request_json ):
5453 """Updates the status for the specified import job."""
55- logging .info (f"Updating import status for { import_name } to { status } " )
5654 try :
57- spanner_client = spanner .Client (
58- project = SPANNER_PROJECT_ID ,
59- client_options = {'quota_project_id' : SPANNER_PROJECT_ID })
60- instance = spanner_client .instance (SPANNER_INSTANCE_ID )
61- database = instance .database (SPANNER_DATABASE_ID )
62- job_id = ''
63- exec_time = 0
64- data_volume = 0
65- version = dest_dir
66- next_refresh = datetime .now (timezone .utc )
67- graph_paths = [DEFAULT_GRAPH_PATH ]
68-
69- def _record (transaction : Transaction ):
70- columns = [
71- "ImportName" , "State" , "JobId" , "ExecutionTime" , "DataVolume" ,
72- "NextRefreshTimestamp" , "LatestVersion" , "GraphDataPaths" ,
73- "StatusUpdateTimestamp"
74- ]
75- row_values = [
76- import_name , status , job_id , exec_time , data_volume ,
77- next_refresh , version , graph_paths , spanner .COMMIT_TIMESTAMP
78- ]
79- if status == 'READY' :
80- columns .append ("DataImportTimestamp" )
81- row_values .append (spanner .COMMIT_TIMESTAMP )
82- transaction .insert_or_update (table = "ImportStatus" ,
83- columns = columns ,
84- values = [row_values ])
85- logging .info (f"Marked { import_name } as { status } ." )
86-
87- database .run_in_transaction (_record )
88- logging .info (f"Updated Spanner status for { import_name } " )
55+ auth_req = Request ()
56+ token = id_token .fetch_id_token (auth_req , INGESTION_HELPER_URL )
57+ headers = {'Authorization' : f'Bearer { token } ' }
58+ response = requests .post (INGESTION_HELPER_URL ,
59+ json = request_json ,
60+ headers = headers )
61+ response .raise_for_status ()
62+ logging .info (f"Updated status for { import_name } " )
8963 except Exception as e :
9064 logging .error (f'Error updating import status for { import_name } : { e } ' )
9165
9266
9367# Triggered from a message on a Cloud Pub/Sub topic.
94- @functions_framework .cloud_event
95- def handle_feed_event (cloud_event ):
68+ @functions_framework .http
69+ def handle_feed_event (request ):
9670 # Updates status in spanner and triggers ingestion workflow
9771 # for an import using CDA feed
98- pubsub_message = cloud_event .data ['message' ]
72+ request_json = request .get_json (silent = True )
73+ if not request_json or 'message' not in request_json :
74+ return 'Invalid Pub/Sub message format' , 400
75+
76+ pubsub_message = request_json ['message' ]
9977 logging .info (f"Received Pub/Sub message: { pubsub_message } " )
10078 try :
10179 data_bytes = base64 .b64decode (pubsub_message ["data" ])
@@ -106,12 +84,25 @@ def handle_feed_event(cloud_event):
10684
10785 attributes = pubsub_message .get ('attributes' , {})
10886 if attributes .get ('transfer_status' ) == 'TRANSFER_COMPLETED' :
109- feed_type = attributes .get ('feed_type' )
11087 import_name = attributes .get ('import_name' )
111- dest_dir = attributes .get ('dest_dir' )
112- if feed_type == 'cns_to_gcs' :
113- logging .info (f'Updating { import_name } import status' )
114- update_import_status (import_name , dest_dir , 'READY' )
88+ import_status = attributes .get ('import_status' , 'STAGING' )
89+ import_version = attributes .get (
90+ 'import_version' ,
91+ datetime .now (timezone .utc ).strftime ("%Y-%m-%d" ))
92+ graph_path = attributes .get ('graph_paths' , "/**/*mcf*" )
93+ request = {
94+ 'actionType' : 'update_import_status' ,
95+ 'importName' : import_name ,
96+ 'status' : import_status ,
97+ 'latestVersion' : import_version ,
98+ 'nextRefresh' : datetime .now (timezone .utc ).isoformat (),
99+ 'graphPaths' : [graph_path ]
100+ }
101+
102+ logging .info (
103+ f"Updating import status for { import_name } to { import_status } " )
104+ update_import_status (import_name , request )
105+ if import_status == 'READY' :
115106 invoke_ingestion_workflow (import_name )
116- else :
117- logging . info ( f'Unknown feed type: { feed_type } ' )
107+
108+ return 'OK' , 200
0 commit comments