Skip to content

Commit d097b77

Browse files
committed
Import automation fixes
- Update version history for spanner ingestion - Use update_import_status cloud function for feed based imports - Update staging version file for feed based imports - Update import param names for consistent processing - Use http type handler for feed based imports - Use a single graph path instead of list
1 parent 2d60e25 commit d097b77

File tree

10 files changed

+182
-152
lines changed

10 files changed

+182
-152
lines changed

import-automation/workflow/cloudbuild.yaml

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -26,21 +26,22 @@ substitutions:
2626
steps:
2727
- id: 'import-automation-workflow'
2828
name: 'gcr.io/cloud-builders/gcloud'
29-
args: ['workflows', 'deploy', 'import-automation-workflow', '--source', 'import-automation-workflow.yaml', '--project', '${_PROJECT_ID}', '--set-env-vars', 'LOCATION=${_LOCATION},GCS_BUCKET_ID=${_GCS_BUCKET_ID},GCS_MOUNT_BUCKET=${_GCS_MOUNT_BUCKET}']
29+
args: ['workflows', 'deploy', 'import-automation-workflow', '--project', '${_PROJECT_ID}', '--location', '${_LOCATION}', '--source', 'import-automation-workflow.yaml', '--set-env-vars', 'LOCATION=${_LOCATION},GCS_BUCKET_ID=${_GCS_BUCKET_ID},GCS_MOUNT_BUCKET=${_GCS_MOUNT_BUCKET}']
3030
dir: 'import-automation/workflow'
3131

3232
- id: 'spanner-ingestion-workflow'
3333
name: 'gcr.io/cloud-builders/gcloud'
34-
args: ['workflows', 'deploy', 'spanner-ingestion-workflow', '--source', 'spanner-ingestion-workflow.yaml', '--project', '${_PROJECT_ID}', '--set-env-vars', 'LOCATION=${_LOCATION},PROJECT_ID=${_PROJECT_ID},SPANNER_PROJECT_ID=${_SPANNER_PROJECT_ID},SPANNER_INSTANCE_ID=${_SPANNER_INSTANCE_ID},SPANNER_DATABASE_ID=${_SPANNER_DATABASE_ID}']
34+
args: ['workflows', 'deploy', 'spanner-ingestion-workflow', '--project', '${_PROJECT_ID}', '--location', '${_LOCATION}', '--source', 'spanner-ingestion-workflow.yaml', '--set-env-vars', 'LOCATION=${_LOCATION},PROJECT_ID=${_PROJECT_ID},SPANNER_PROJECT_ID=${_SPANNER_PROJECT_ID},SPANNER_INSTANCE_ID=${_SPANNER_INSTANCE_ID},SPANNER_DATABASE_ID=${_SPANNER_DATABASE_ID}']
3535
dir: 'import-automation/workflow'
3636

3737
- id: 'spanner-ingestion-helper'
3838
name: 'gcr.io/cloud-builders/gcloud'
39-
args: ['functions', 'deploy', 'spanner-ingestion-helper', '--runtime', 'python312', '--source', 'ingestion-helper', '--no-allow-unauthenticated', '--trigger-http', '--entry-point', 'ingestion_helper', '--project', '${_PROJECT_ID}', '--set-env-vars', 'PROJECT_ID=${_PROJECT_ID},SPANNER_PROJECT_ID=${_SPANNER_PROJECT_ID},SPANNER_INSTANCE_ID=${_SPANNER_INSTANCE_ID},SPANNER_DATABASE_ID=${_SPANNER_DATABASE_ID},GCS_BUCKET_ID=${_GCS_BUCKET_ID},LOCATION=${_LOCATION}']
39+
args: ['functions', 'deploy', 'spanner-ingestion-helper', '--gen2', '--project', '${_PROJECT_ID}', '--region', '${_LOCATION}', '--runtime', 'python312', '--source', 'ingestion-helper', '--no-allow-unauthenticated', '--trigger-http', '--entry-point', 'ingestion_helper', '--set-env-vars', 'PROJECT_ID=${_PROJECT_ID},SPANNER_PROJECT_ID=${_SPANNER_PROJECT_ID},SPANNER_INSTANCE_ID=${_SPANNER_INSTANCE_ID},SPANNER_DATABASE_ID=${_SPANNER_DATABASE_ID},GCS_BUCKET_ID=${_GCS_BUCKET_ID},LOCATION=${_LOCATION}']
4040
dir: 'import-automation/workflow'
4141

42+
# gcloud pubsub subscriptions create import-automation-sub --topic=import-automation-trigger --message-filter='attributes.transfer_status="TRANSFER_COMPLETED"' --push-endpoint=https://us-central1-datcom-import-automation-prod.cloudfunctions.net/import-automation-helper --push-auth-service-account=965988403328-compute@developer.gserviceaccount.com --project=datcom-import-automation-prod
4243
- id: 'import-automation-helper'
4344
name: 'gcr.io/cloud-builders/gcloud'
44-
args: ['functions', 'deploy', 'import-automation-helper', '--runtime', 'python312', '--source', 'import-helper', '--trigger-topic', 'import-automation-trigger' , '--entry-point', 'handle_feed_event', '--project', '${_PROJECT_ID}', '--set-env-vars', 'PROJECT_ID=${_PROJECT_ID},LOCATION=${_LOCATION},SPANNER_PROJECT_ID=${_SPANNER_PROJECT_ID},SPANNER_INSTANCE_ID=${_SPANNER_INSTANCE_ID},SPANNER_DATABASE_ID=${_SPANNER_DATABASE_ID},WORKFLOW_ID=spanner-ingestion-workflow']
45+
args: ['functions', 'deploy', 'import-automation-helper', '--gen2', '--project', '${_PROJECT_ID}', '--region', '${_LOCATION}', '--runtime', 'python312', '--source', 'import-helper', '--no-allow-unauthenticated', '--trigger-http', '--entry-point', 'handle_feed_event', '--set-env-vars', 'PROJECT_ID=${_PROJECT_ID},LOCATION=${_LOCATION},GCS_BUCKET_ID=${_GCS_BUCKET_ID}']
4546
dir: 'import-automation/workflow'
4647

import-automation/workflow/import-automation-workflow.yaml

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ main:
55
assign:
66
- projectId: ${sys.get_env("GOOGLE_CLOUD_PROJECT_ID")}
77
- region: ${sys.get_env("LOCATION")}
8-
- imageUri: "gcr.io/datcom-ci/dc-import-executor:stable"
8+
- imageUri: ${default(map.get(args, "imageUri"), "gcr.io/datcom-ci/dc-import-executor:stable")}
99
- jobId: ${text.substring(args.jobName, 0, 50) + "-" + string(int(sys.now()))}
1010
- importName: ${args.importName}
1111
- importConfig: ${args.importConfig}
@@ -73,10 +73,10 @@ main:
7373
body:
7474
actionType: 'update_import_status'
7575
jobId: ${jobId}
76-
importName: ${text.split(args.importName, ":")[1]}
76+
importName: ${args.importName}
7777
status: 'FAILED'
78-
execTime: ${int(sys.now() - startTime)}
79-
version: ${"gs://" + gcsImportBucket + "/" + text.replace_all(args.importName, ":", "/")}
78+
executionTime: ${int(sys.now() - startTime)}
79+
latestVersion: ${"gs://" + gcsImportBucket + "/" + text.replace_all(args.importName, ":", "/")}
8080
schedule: ${default(map.get(args, "schedule"), "")}
8181
result: functionResponse
8282
- failWorkflow:
@@ -91,7 +91,7 @@ main:
9191
actionType: 'update_import_version'
9292
importName: ${args.importName}
9393
version: 'staging'
94-
comment: 'import-automation'
94+
comment: '${"import-workflow:" + sys.get_env("GOOGLE_CLOUD_WORKFLOW_EXECUTION_ID")}'
9595
result: functionResponse
9696
- returnResult:
9797
return:

import-automation/workflow/import-helper/main.py

Lines changed: 45 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -18,26 +18,25 @@
1818
import logging
1919
import os
2020
from datetime import datetime, timezone
21-
from google.cloud import spanner
22-
from google.cloud.spanner_v1 import Transaction
21+
from google.auth.transport.requests import Request
22+
from google.oauth2 import id_token
2323
import google.cloud.workflows
24+
import requests
2425

2526
logging.getLogger().setLevel(logging.INFO)
2627

2728
PROJECT_ID = os.environ.get('PROJECT_ID')
2829
LOCATION = os.environ.get('LOCATION')
29-
WORKFLOW_ID = os.environ.get('WORKFLOW_ID', 'spanner-ingestion-workflow')
30-
SPANNER_PROJECT_ID = os.environ.get('SPANNER_PROJECT_ID')
31-
SPANNER_INSTANCE_ID = os.environ.get('SPANNER_INSTANCE_ID')
32-
SPANNER_DATABASE_ID = os.environ.get('SPANNER_DATABASE_ID')
33-
DEFAULT_GRAPH_PATH = "/**/*.mcf*"
30+
GCS_BUCKET_ID = os.environ.get('GCS_BUCKET_ID')
31+
WORKFLOW_ID = 'spanner-ingestion-workflow'
32+
INGESTION_HELPER_URL = f"https://{LOCATION}-{PROJECT_ID}.cloudfunctions.net/spanner-ingestion-helper"
3433

3534

3635
def invoke_ingestion_workflow(import_name):
3736
"""Invokes the spanner ingestion workflow."""
38-
execution_client = executions_v1.ExecutionsClient()
37+
execution_client = workflows.executions_v1.ExecutionsClient()
3938
parent = f"projects/{PROJECT_ID}/locations/{LOCATION}/workflows/{WORKFLOW_ID}"
40-
workflow_args = {"imports": [import_name]}
39+
workflow_args = {"importList": [import_name]}
4140
try:
4241
execution_req = workflows.executions_v1.Execution(
4342
argument=json.dumps(workflow_args))
@@ -50,52 +49,31 @@ def invoke_ingestion_workflow(import_name):
5049
logging.error(f"Error triggering workflow: {e}")
5150

5251

53-
def update_import_status(import_name, dest_dir, status):
52+
def update_import_status(import_name, request_json):
5453
"""Updates the status for the specified import job."""
55-
logging.info(f"Updating import status for {import_name} to {status}")
5654
try:
57-
spanner_client = spanner.Client(
58-
project=SPANNER_PROJECT_ID,
59-
client_options={'quota_project_id': SPANNER_PROJECT_ID})
60-
instance = spanner_client.instance(SPANNER_INSTANCE_ID)
61-
database = instance.database(SPANNER_DATABASE_ID)
62-
job_id = ''
63-
exec_time = 0
64-
data_volume = 0
65-
version = dest_dir
66-
next_refresh = datetime.now(timezone.utc)
67-
graph_paths = [DEFAULT_GRAPH_PATH]
68-
69-
def _record(transaction: Transaction):
70-
columns = [
71-
"ImportName", "State", "JobId", "ExecutionTime", "DataVolume",
72-
"NextRefreshTimestamp", "LatestVersion", "GraphDataPaths",
73-
"StatusUpdateTimestamp"
74-
]
75-
row_values = [
76-
import_name, status, job_id, exec_time, data_volume,
77-
next_refresh, version, graph_paths, spanner.COMMIT_TIMESTAMP
78-
]
79-
if status == 'READY':
80-
columns.append("DataImportTimestamp")
81-
row_values.append(spanner.COMMIT_TIMESTAMP)
82-
transaction.insert_or_update(table="ImportStatus",
83-
columns=columns,
84-
values=[row_values])
85-
logging.info(f"Marked {import_name} as {status}.")
86-
87-
database.run_in_transaction(_record)
88-
logging.info(f"Updated Spanner status for {import_name}")
55+
auth_req = Request()
56+
token = id_token.fetch_id_token(auth_req, INGESTION_HELPER_URL)
57+
headers = {'Authorization': f'Bearer {token}'}
58+
response = requests.post(INGESTION_HELPER_URL,
59+
json=request_json,
60+
headers=headers)
61+
response.raise_for_status()
62+
logging.info(f"Updated status for {import_name}")
8963
except Exception as e:
9064
logging.error(f'Error updating import status for {import_name}: {e}')
9165

9266

9367
# Triggered from a message on a Cloud Pub/Sub topic.
94-
@functions_framework.cloud_event
95-
def handle_feed_event(cloud_event):
68+
@functions_framework.http
69+
def handle_feed_event(request):
9670
# Updates status in spanner and triggers ingestion workflow
9771
# for an import using CDA feed
98-
pubsub_message = cloud_event.data['message']
72+
request_json = request.get_json(silent=True)
73+
if not request_json or 'message' not in request_json:
74+
return 'Invalid Pub/Sub message format', 400
75+
76+
pubsub_message = request_json['message']
9977
logging.info(f"Received Pub/Sub message: {pubsub_message}")
10078
try:
10179
data_bytes = base64.b64decode(pubsub_message["data"])
@@ -106,12 +84,26 @@ def handle_feed_event(cloud_event):
10684

10785
attributes = pubsub_message.get('attributes', {})
10886
if attributes.get('transfer_status') == 'TRANSFER_COMPLETED':
109-
feed_type = attributes.get('feed_type')
11087
import_name = attributes.get('import_name')
111-
dest_dir = attributes.get('dest_dir')
112-
if feed_type == 'cns_to_gcs':
113-
logging.info(f'Updating {import_name} import status')
114-
update_import_status(import_name, dest_dir, 'READY')
88+
import_status = attributes.get('import_status', 'STAGING')
89+
import_version = 'gs://' + os.path.join(
90+
GCS_BUCKET_ID, 'google3' + import_name.replace(':', '/'),
91+
attributes.get('import_version',
92+
datetime.now(timezone.utc).strftime("%Y-%m-%d")))
93+
graph_path = attributes.get('graph_path', "/**/*mcf*")
94+
request = {
95+
'actionType': 'update_import_status',
96+
'importName': import_name,
97+
'status': import_status,
98+
'latestVersion': import_version,
99+
'nextRefresh': datetime.now(timezone.utc).isoformat(),
100+
'graphPath': graph_path
101+
}
102+
103+
logging.info(
104+
f"Updating import status for {import_name} to {import_status}")
105+
update_import_status(import_name, request)
106+
if import_status == 'READY':
115107
invoke_ingestion_workflow(import_name)
116-
else:
117-
logging.info(f'Unknown feed type: {feed_type}')
108+
109+
return 'OK', 200
Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
11
functions-framework==3.*
2-
google-cloud-spanner
3-
google-cloud-workflows
2+
google-cloud-workflows
3+
google-auth
4+
requests

import-automation/workflow/ingestion-helper/README.md

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,10 +41,13 @@ Updates the status of a specific import job.
4141
* `importName` (Required): The name of the import.
4242
* `status` (Required): The new status to set.
4343
* `jobId` (Optional): The Dataflow job ID.
44-
* `execTime` (Optional): Execution time in seconds.
44+
* `executionTime` (Optional): Execution time in seconds.
4545
* `dataVolume` (Optional): Data volume in bytes.
46-
* `version` (Optional): The version string.
46+
* `latestVersion` (Optional): Latest version string.
47+
* `graphPath` (Optional): Graph path regex.
4748
* `schedule` (Optional): A cron schedule string.
49+
* `nextRefresh` (Optional): Next refresh timestamp.
50+
4851

4952
#### `update_import_version`
5053
Updates the version of an import, records an audit log, and marks the import as `READY`.

import-automation/workflow/ingestion-helper/import_utils.py

Lines changed: 33 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515

1616
import logging
1717
import croniter
18+
import re
1819
from datetime import datetime, timezone
1920
from googleapiclient.discovery import build
2021
from googleapiclient.errors import HttpError
@@ -41,7 +42,7 @@ def get_caller_identity(request):
4142
logging.warning(
4243
f"Could not decode unverified token for debugging: {e}")
4344
email = unverified_claims.get('email', 'unknown_email')
44-
return f"{email} (unverified)"
45+
return f"{email}"
4546
return 'decode_error'
4647
else:
4748
logging.warning(
@@ -51,7 +52,7 @@ def get_caller_identity(request):
5152
return 'no_auth_header'
5253

5354

54-
def get_import_params(request_json) -> dict:
55+
def get_import_params(request) -> dict:
5556
"""Extracts and calculates import parameters from the request JSON.
5657
5758
Args:
@@ -60,68 +61,47 @@ def get_import_params(request_json) -> dict:
6061
Returns:
6162
A dictionary with import params.
6263
"""
63-
import_name = request_json.get('importName', '')
64-
status = request_json.get('status', '')
65-
job_id = request_json.get('jobId', '')
66-
exec_time = request_json.get('execTime', 0)
67-
data_volume = request_json.get('dataVolume', 0)
68-
version = request_json.get('version', '')
69-
graph_paths = request_json.get('graph_paths', [])
70-
schedule = request_json.get('schedule', '')
71-
next_refresh = datetime.now(timezone.utc)
72-
try:
73-
next_refresh = croniter.croniter(schedule, datetime.now(
74-
timezone.utc)).get_next(datetime)
75-
except (croniter.CroniterError) as e:
76-
logging.error(
77-
f"Error calculating next refresh from schedule '{schedule}': {e}")
78-
return {
79-
'import_name': import_name,
80-
'status': status,
81-
'job_id': job_id,
82-
'exec_time': exec_time,
83-
'data_volume': data_volume,
84-
'version': version,
85-
'graph_paths': graph_paths,
86-
'next_refresh': next_refresh
64+
# Convert CamelCase or mixedCase to snake_case.
65+
request_json = {
66+
re.sub(r'(?<!^)(?=[A-Z])', '_', k).lower(): v
67+
for k, v in request.items()
8768
}
8869

89-
90-
def create_import_params(summary) -> dict:
91-
"""Creates import parameters from the import summary.
92-
93-
Args:
94-
summary: A dictionary containing import summary details.
95-
96-
Returns:
97-
A dictionary with import params.
98-
"""
99-
import_name = summary.get('import_name', '')
100-
status = summary.get('status', '').removeprefix('ImportStatus.')
101-
job_id = summary.get('job_id', '')
102-
exec_time = summary.get('execution_time', 0)
103-
data_volume = summary.get('data_volume', 0)
104-
version = summary.get('latest_version', '')
105-
graph_paths = summary.get('graph_paths', [])
106-
next_refresh_str = summary.get('next_refresh', '')
107-
next_refresh = None
70+
import_name = request_json.get('import_name', '').split(':')[-1]
71+
status = request_json.get('status', '').removeprefix('ImportStatus.')
72+
job_id = request_json.get('job_id', '')
73+
execution_time = request_json.get('execution_time', 0)
74+
data_volume = request_json.get('data_volume', 0)
75+
latest_version = request_json.get('latest_version', '')
76+
graph_path = request_json.get('graph_path', '')
77+
schedule = request_json.get('schedule', '')
78+
next_refresh_str = request_json.get('next_refresh', '')
79+
next_refresh = datetime.now(timezone.utc)
10880
if next_refresh_str:
10981
try:
11082
next_refresh = datetime.fromisoformat(next_refresh_str)
11183
except ValueError:
11284
logging.error(f"Error parsing next_refresh: {next_refresh_str}")
113-
85+
if schedule:
86+
try:
87+
next_refresh = croniter.croniter(schedule, datetime.now(
88+
timezone.utc)).get_next(datetime)
89+
except (croniter.CroniterError) as e:
90+
logging.error(
91+
f"Error calculating next refresh from schedule '{schedule}': {e}"
92+
)
11493
return {
11594
'import_name': import_name,
11695
'status': status,
11796
'job_id': job_id,
118-
'exec_time': exec_time,
97+
'execution_time': execution_time,
11998
'data_volume': data_volume,
120-
'version': version,
121-
'graph_paths': graph_paths,
122-
'next_refresh': next_refresh,
99+
'latest_version': latest_version,
100+
'graph_path': graph_path,
101+
'next_refresh': next_refresh
123102
}
124103

104+
125105
def get_ingestion_metrics(project_id, location, job_id):
126106
"""Fetches graph metrics (nodes, edges, observations) and execution time from a Dataflow job.
127107
@@ -163,11 +143,11 @@ def get_ingestion_metrics(project_id, location, job_id):
163143
for metric in metrics.get('metrics', []):
164144
name = metric['name']['name']
165145
if name == 'graph_node_count':
166-
node_count = int(metric['scalar'])
146+
node_count += int(metric['scalar'])
167147
elif name == 'graph_edge_count':
168-
edge_count = int(metric['scalar'])
148+
edge_count += int(metric['scalar'])
169149
elif name == 'graph_observation_count':
170-
obs_count = int(metric['scalar'])
150+
obs_count += int(metric['scalar'])
171151
except HttpError as e:
172152
logging.error(
173153
f"Error fetching dataflow metrics for job {job_id}: {e}")

0 commit comments

Comments
 (0)