Skip to content

Commit 7fe0bea

Browse files
committed
Add import aggegation helper
1 parent 0755d47 commit 7fe0bea

File tree

4 files changed

+116
-6
lines changed

4 files changed

+116
-6
lines changed
Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
# Copyright 2026 Google LLC
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
import functions_framework
16+
from google.cloud import bigquery
17+
import logging
18+
from flask import jsonify
19+
import os
20+
21+
# Initialize BigQuery Client
22+
try:
23+
bq_client = bigquery.Client()
24+
except Exception as e:
25+
logging.warning(f"Failed to initialize BigQuery client: {e}")
26+
bq_client = None
27+
28+
BQ_DATASET_ID = os.environ.get('BQ_DATASET_ID')
29+
SPANNER_PROJECT_ID = os.environ.get('SPANNER_PROJECT_ID')
30+
SPANNER_INSTANCE_ID = os.environ.get('SPANNER_INSTANCE_ID')
31+
SPANNER_DATABASE_ID = os.environ.get('SPANNER_DATABASE_ID')
32+
GCS_BUCKET_ID = os.environ.get('GCS_BUCKET_ID')
33+
34+
35+
@functions_framework.http
36+
def aggregation_helper(request):
37+
"""
38+
HTTP Cloud Function that takes importName and runs a BQ query.
39+
"""
40+
if not bq_client:
41+
return ('BigQuery client not initialized', 500)
42+
43+
request_json = request.get_json(silent=True)
44+
if not request_json:
45+
return ('Request is not a valid JSON', 400)
46+
47+
import_list = request_json.get('importList')
48+
if not import_list:
49+
return ("'importList' parameter is missing", 400)
50+
51+
logging.info(f"Received request for importList: {import_list}")
52+
53+
results = []
54+
55+
try:
56+
for import_item in import_list:
57+
import_name = import_item.get('importName')
58+
if not import_name:
59+
logging.warning(
60+
f"Skipping item without importName: {import_item}")
61+
continue
62+
63+
query = None
64+
# Define specific queries based on importName
65+
if "india_census" in import_name:
66+
# Placeholder for India Census specific logic
67+
query = """
68+
SELECT @import_name as import_name, CURRENT_TIMESTAMP() as execution_time
69+
"""
70+
elif "us_census" in import_name:
71+
# Placeholder for US Census specific logic
72+
query = """
73+
SELECT @import_name as import_name, CURRENT_TIMESTAMP() as execution_time
74+
"""
75+
else:
76+
logging.info(
77+
f"No specific aggregation logic for import: {import_name}")
78+
continue
79+
80+
if query:
81+
job_config = bigquery.QueryJobConfig(query_parameters=[
82+
bigquery.ScalarQueryParameter("import_name", "STRING",
83+
import_name),
84+
])
85+
query_job = bq_client.query(query, job_config=job_config)
86+
query_results = query_job.result()
87+
for row in query_results:
88+
results.append(dict(row))
89+
90+
return jsonify({"status": "success"}), 200
91+
92+
except Exception as e:
93+
logging.error(f"Aggregation failed: {e}")
94+
return (f"Aggregation failed: {str(e)}", 500)
Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
functions-framework==3.*
2+
google-cloud-bigquery

import-automation/workflow/cloudbuild.yaml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ substitutions:
2222
_GCS_BUCKET_ID: 'datcom-prod-imports'
2323
_LOCATION: 'us-central1'
2424
_GCS_MOUNT_BUCKET: 'datcom-volume-mount'
25+
_BQ_DATASET_ID: 'datacommons'
2526

2627
steps:
2728
- id: 'import-automation-workflow'
@@ -39,6 +40,11 @@ steps:
3940
args: ['functions', 'deploy', 'spanner-ingestion-helper', '--runtime', 'python312', '--source', 'ingestion-helper', '--no-allow-unauthenticated', '--trigger-http', '--entry-point', 'ingestion_helper', '--project', '${_PROJECT_ID}', '--set-env-vars', 'PROJECT_ID=${_PROJECT_ID},SPANNER_PROJECT_ID=${_SPANNER_PROJECT_ID},SPANNER_INSTANCE_ID=${_SPANNER_INSTANCE_ID},SPANNER_DATABASE_ID=${_SPANNER_DATABASE_ID},GCS_BUCKET_ID=${_GCS_BUCKET_ID},LOCATION=${_LOCATION}']
4041
dir: 'import-automation/workflow'
4142

43+
- id: 'import-aggregation-helper'
44+
name: 'gcr.io/cloud-builders/gcloud'
45+
args: ['functions', 'deploy', 'import-aggregation-helper', '--runtime', 'python312', '--source', 'aggregation-helper', '--no-allow-unauthenticated', '--trigger-http', '--entry-point', 'aggregation_helper', '--project', '${_PROJECT_ID}', '--set-env-vars', 'PROJECT_ID=${_PROJECT_ID},SPANNER_PROJECT_ID=${_SPANNER_PROJECT_ID},SPANNER_INSTANCE_ID=${_SPANNER_INSTANCE_ID},SPANNER_DATABASE_ID=${_SPANNER_DATABASE_ID},GCS_BUCKET_ID=${_GCS_BUCKET_ID},LOCATION=${_LOCATION},BQ_DATASET_ID=${_BQ_DATASET_ID}']
46+
dir: 'import-automation/workflow'
47+
4248
- id: 'import-automation-helper'
4349
name: 'gcr.io/cloud-builders/gcloud'
4450
args: ['functions', 'deploy', 'import-automation-helper', '--runtime', 'python312', '--source', 'import-helper', '--trigger-topic', 'import-automation-trigger' , '--entry-point', 'handle_feed_event', '--project', '${_PROJECT_ID}', '--set-env-vars', 'PROJECT_ID=${_PROJECT_ID},LOCATION=${_LOCATION},SPANNER_PROJECT_ID=${_SPANNER_PROJECT_ID},SPANNER_INSTANCE_ID=${_SPANNER_INSTANCE_ID},SPANNER_DATABASE_ID=${_SPANNER_DATABASE_ID},WORKFLOW_ID=spanner-ingestion-workflow']

import-automation/workflow/spanner-ingestion-workflow.yaml

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -10,14 +10,14 @@ main:
1010
- dataflow_gcs_path: 'gs://datcom-templates/templates/flex/ingestion.json'
1111
- location: '${sys.get_env("LOCATION")}'
1212
- spanner_database_id: '${sys.get_env("SPANNER_DATABASE_ID")}'
13-
- ingestion_helper: "spanner-ingestion-helper"
14-
- function_url: ${"https://" + location + "-" + project_id + ".cloudfunctions.net/" + ingestion_helper}
13+
- ingestion_function: ${"https://" + location + "-" + project_id + ".cloudfunctions.net/" + "spanner-ingestion-helper"}
14+
- aggregation_function: ${"https://" + location + "-" + project_id + ".cloudfunctions.net/" + "import-aggregation-helper"}
1515
- import_list: ${default(map.get(args, "imports"), [])}
1616
- execution_error: null
1717
- acquire_ingestion_lock:
1818
call: http.post
1919
args:
20-
url: ${function_url}
20+
url: ${ingestion_function}
2121
auth:
2222
type: OIDC
2323
body:
@@ -31,7 +31,7 @@ main:
3131
- get_import_list:
3232
call: http.post
3333
args:
34-
url: ${function_url}
34+
url: ${ingestion_function}
3535
auth:
3636
type: OIDC
3737
body:
@@ -49,10 +49,18 @@ main:
4949
spanner_database_id: ${spanner_database_id}
5050
wait_period: ${wait_period}
5151
result: dataflow_job_id
52+
- run_aggregation:
53+
call: http.post
54+
args:
55+
url: ${aggregation_function}
56+
auth:
57+
type: OIDC
58+
body:
59+
importList: ${import_info.body}
5260
- update_ingestion_status:
5361
call: http.post
5462
args:
55-
url: ${function_url}
63+
url: ${ingestion_function}
5664
auth:
5765
type: OIDC
5866
body:
@@ -70,7 +78,7 @@ main:
7078
- release_ingestion_lock:
7179
call: http.post
7280
args:
73-
url: ${function_url}
81+
url: ${ingestion_function}
7482
auth:
7583
type: OIDC
7684
body:

0 commit comments

Comments
 (0)