Skip to content

Commit 5baaa13

Browse files
authored
Merge pull request #290 from clingen-data-model/project-labels
Project labels and global exception handler
2 parents e7c35ae + 67dc4d9 commit 5baaa13

File tree

5 files changed

+138
-18
lines changed

5 files changed

+138
-18
lines changed

clinvar_ingest/cloud/bigquery/stored_procedures.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@
3131
"CALL `clinvar_ingest.dataset_preparation`({0});",
3232
"CALL `clinvar_ingest.temporal_data_collection`({0});",
3333
"CALL `clinvar_ingest.temporal_data_summation`();",
34-
"CALL `clinvar_ingest.variation_tracker`();",
34+
"CALL `clinvar_ingest.tracker_report_update`();",
3535
]
3636

3737

misc/bin/bq_ingester.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,8 @@ def _get_bq_client() -> bigquery.Client:
5757
setattr(_get_bq_client, "client", bigquery.Client())
5858
return getattr(_get_bq_client, "client")
5959

60+
################################################################
61+
### rollback exception handler for deleting processing_history entries
6062

6163
rollback_rows = []
6264

misc/bin/deploy-job.sh

Lines changed: 19 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -127,11 +127,24 @@ env_vars="$env_vars,BQ_DEST_PROJECT=${BQ_DEST_PROJECT}"
127127
# if instance_name contains stored-procedures make env vars
128128
if [[ $instance_name =~ ^.*stored-procedures.*$ ]]; then
129129
# Resetting env_vars here - not inheriting previous
130-
env_vars="BQ_DEST_PROJECT=${BQ_DEST_PROJECT},CLINVAR_INGEST_BQ_META_DATASET=${CLINVAR_INGEST_BQ_META_DATASET}"
130+
env_vars="file_format=${file_format}"
131+
env_vars="$env_vars,BQ_DEST_PROJECT=${BQ_DEST_PROJECT}"
132+
env_vars="$env_vars,CLINVAR_INGEST_BQ_META_DATASET=${CLINVAR_INGEST_BQ_META_DATASET}"
131133
env_vars="$env_vars,CLINVAR_INGEST_RELEASE_TAG=${CLINVAR_INGEST_RELEASE_TAG}"
132134
fi
133135

134-
### TODO - stored-procedures
136+
####
137+
# gcloud project level labels must exist before use
138+
####
139+
metadata=`gcloud projects describe $project --format=json`
140+
service=`echo $metadata | jq '.labels["service"]'`
141+
component=`echo $metadata | jq '.labels["component"]'`
142+
if [ $service == "null" || $component == "null" ]; then
143+
# set default labels on the project
144+
gcloud projects update $project --update-labels=service="service",component="component"
145+
fi
146+
#####
147+
135148
gcloud run jobs $command $instance_name \
136149
--cpu=2 \
137150
--memory=8Gi \
@@ -141,7 +154,8 @@ gcloud run jobs $command $instance_name \
141154
--command="$clinvar_ingest_cmd" \
142155
--service-account=$pipeline_service_account \
143156
--set-env-vars=$env_vars \
144-
--set-secrets=CLINVAR_INGEST_SLACK_TOKEN=clinvar-ingest-slack-token:latest
157+
--set-secrets=CLINVAR_INGEST_SLACK_TOKEN=clinvar-ingest-slack-token:latest \
158+
--labels=service="clinvar-ingest",component=$instance_name
145159

146160
if [[ $instance_name =~ ^.*bq-ingest.*$|^.*stored-procedures.*$ ]]; then
147161
# turn off file globbing
@@ -151,5 +165,6 @@ if [[ $instance_name =~ ^.*bq-ingest.*$|^.*stored-procedures.*$ ]]; then
151165
--uri=https://${region}-run.googleapis.com/apis/run.googleapis.com/v1/namespaces/${project}/jobs/${instance_name}:run \
152166
--http-method POST \
153167
--oauth-service-account-email=$pipeline_service_account \
154-
--schedule='*/15 * * * *'
168+
--schedule='*/15 * * * *' \
169+
--labels=service="clinvar-ingest",component=${instance_name}-scheduler
155170
fi

misc/bin/stored-procedures-workflow.py

Lines changed: 57 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55

66
import logging
77
import sys
8+
import traceback
89

910
from google.cloud import bigquery
1011

@@ -28,6 +29,42 @@ def _get_bq_client() -> bigquery.Client:
2829
setattr(_get_bq_client, "client", bigquery.Client())
2930
return getattr(_get_bq_client, "client")
3031

32+
################################################################
33+
### rollback exception handler for deleting processing_history entries
34+
35+
rollback_rows = []
36+
37+
38+
def sp_rollback_exception_handler(exc_type, exc_value, exc_traceback):
39+
"""
40+
https://docs.python.org/3/library/sys.html#sys.excepthook
41+
"""
42+
exception_details = "".join(
43+
traceback.format_exception(exc_type, exc_value, exc_traceback)
44+
)
45+
46+
# Log the exception
47+
_logger.error("Uncaught exception:\n%s", exception_details)
48+
49+
_logger.warning("Rolling back started SP ingest rows.")
50+
for row in rollback_rows:
51+
_logger.info(f"Rolling back row: {row}")
52+
c = processing_history.delete(
53+
processing_history_table=row["processing_history_table"],
54+
release_tag=row["release_tag"],
55+
file_type=row["file_type"],
56+
xml_release_date=row["xml_release_date"],
57+
client=_get_bq_client(),
58+
)
59+
_logger.info(f"Deleted {c} rows from processing_history.")
60+
61+
# Call the default exception handler
62+
sys.__excepthook__(exc_type, exc_value, exc_traceback)
63+
64+
# Add the exception handler as the global exception handler.
65+
# NOTE: this modifies global state and will affect all subsequent exceptions
66+
# in this script or any other script which imports this script.
67+
sys.excepthook = sp_rollback_exception_handler
3168

3269
################################################################
3370
### Initialization code
@@ -84,6 +121,17 @@ def _get_bq_client() -> bigquery.Client:
84121
"""
85122
_logger.info(msg)
86123

124+
# Add the started row to the rollback list
125+
rollback_rows.append(
126+
{
127+
"processing_history_table": processing_history_table,
128+
"release_tag": vcv_pipeline_version,
129+
"file_type": env.file_format_mode,
130+
"xml_release_date": str(vcv_xml_release_date),
131+
"client": _get_bq_client(),
132+
}
133+
)
134+
87135
# Now process individual rows
88136
for row in rows_to_ingest:
89137
_logger.info(row)
@@ -116,21 +164,19 @@ def _get_bq_client() -> bigquery.Client:
116164
_logger.info(msg)
117165
send_slack_message(msg)
118166
except Exception as e:
119-
processing_history.write_started(
120-
processing_history_table=processing_history_table,
121-
release_date=None,
122-
release_tag=vcv_pipeline_version,
123-
schema_version=schema_version,
124-
file_type=ClinVarIngestFileFormat(env.file_format_mode),
125-
client=_get_bq_client(),
126-
bucket_dir=vcv_bucket_dir,
127-
xml_release_date=str(vcv_xml_release_date),
128-
error_if_exists=False,
129-
)
130167
msg = f"""
131168
Stored procedure execution failed for release dated {vcv_xml_release_date} version
132169
{vcv_pipeline_version}.
133170
"""
134171
_logger.error(msg)
135172
send_slack_message(msg)
136173
raise e
174+
175+
176+
# Remove the started row from the rollback list, since this ingest has succeeded
177+
rollback_rows = [
178+
row
179+
for row in rollback_rows
180+
if row["xml_release_date"] != str(vcv_xml_release_date)
181+
or row["release_tag"] != vcv_pipeline_version
182+
]

misc/bin/workflow.py

Lines changed: 59 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
#!/usr/bin/env python3
2-
import argparse
32
import logging
43
import os
54
import sys
5+
import traceback
66
from pathlib import Path, PurePosixPath
77

88
from google.cloud import bigquery
@@ -67,6 +67,44 @@ def _get_bq_client() -> bigquery.Client:
6767
_get_bq_client.client = bigquery.Client()
6868
return _get_bq_client.client
6969

70+
################################################################
71+
### rollback exception handler for deleting processing_history entries
72+
73+
rollback_rows = []
74+
75+
def workflow_rollback_exception_handler(exc_type, exc_value, exc_traceback):
76+
"""
77+
https://docs.python.org/3/library/sys.html#sys.excepthook
78+
"""
79+
80+
exception_details = "".join(
81+
traceback.format_exception(exc_type, exc_value, exc_traceback)
82+
)
83+
84+
# Log the exception
85+
_logger.error("Uncaught exception:\n%s", exception_details)
86+
87+
88+
_logger.warning("Rolling back started processing_history rows.")
89+
for row in rollback_rows:
90+
_logger.info(f"Rolling back row: {row}")
91+
c = processing_history.delete(
92+
processing_history_table=row["processing_history_table"],
93+
release_tag=row["release_tag"],
94+
file_type=row["file_type"],
95+
xml_release_date=row["xml_release_date"],
96+
client=_get_bq_client(),
97+
)
98+
_logger.info(f"Deleted {c} rows from processing_history.")
99+
100+
# Call the default exception handler
101+
sys.__excepthook__(exc_type, exc_value, exc_traceback)
102+
103+
104+
# Add the exception handler as the global exception handler.
105+
# NOTE: this modifies global state and will affect all subsequent exceptions
106+
# in this script or any other script which imports this script.
107+
sys.excepthook = workflow_rollback_exception_handler
70108

71109
################################################################
72110
### Initialization code
@@ -140,6 +178,17 @@ def _get_bq_client() -> bigquery.Client:
140178
error_if_exists=False,
141179
)
142180

181+
# Add the started row to the rollback list
182+
rollback_rows.append(
183+
{
184+
"processing_history_table": processing_history_table,
185+
"release_tag": env.release_tag,
186+
"file_type": file_mode,
187+
"xml_release_date": str(release_date),
188+
"client": _get_bq_client(),
189+
}
190+
)
191+
143192

144193
################################################################
145194
# Run copy step. Copies a source XML file from an HTTP/FTP server to GCS
@@ -247,7 +296,7 @@ def parse(payload: ParseRequest, limit=None) -> ParseResponse:
247296
try:
248297
parse_response = parse(
249298
ParseRequest(input_path=copy_response.gcs_path),
250-
# limit=1000,
299+
#limit=1000,
251300
)
252301
_logger.info(f"Parse response: {parse_response.model_dump_json}")
253302
except Exception as e:
@@ -279,3 +328,11 @@ def parse(payload: ParseRequest, limit=None) -> ParseResponse:
279328
################################################################
280329
_logger.info("Workflow succeeded")
281330
send_slack_message(workflow_id_message + " - Workflow succeeded.")
331+
332+
# Remove the started row from the rollback list, since this ingest has succeeded
333+
rollback_rows = [
334+
row
335+
for row in rollback_rows
336+
if row["xml_release_date"] != release_date
337+
or row["release_tag"] != env.release_tag
338+
]

0 commit comments

Comments
 (0)