Skip to content

Commit adb0aca

Browse files
committed
Added global exception handler to workflow.py and stored-procedures-workflow.py to delete processing_history entries on error.
1 parent 273369a commit adb0aca

File tree

3 files changed

+118
-13
lines changed

3 files changed

+118
-13
lines changed

misc/bin/bq_ingester.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,8 @@ def _get_bq_client() -> bigquery.Client:
5757
setattr(_get_bq_client, "client", bigquery.Client())
5858
return getattr(_get_bq_client, "client")
5959

60+
################################################################
61+
### rollback exception handler for deleting processing_history entries
6062

6163
rollback_rows = []
6264

misc/bin/stored-procedures-workflow.py

Lines changed: 57 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55

66
import logging
77
import sys
8+
import traceback
89

910
from google.cloud import bigquery
1011

@@ -28,6 +29,42 @@ def _get_bq_client() -> bigquery.Client:
2829
setattr(_get_bq_client, "client", bigquery.Client())
2930
return getattr(_get_bq_client, "client")
3031

32+
################################################################
33+
### rollback exception handler for deleting processing_history entries
34+
35+
rollback_rows = []
36+
37+
38+
def sp_rollback_exception_handler(exc_type, exc_value, exc_traceback):
39+
"""
40+
https://docs.python.org/3/library/sys.html#sys.excepthook
41+
"""
42+
exception_details = "".join(
43+
traceback.format_exception(exc_type, exc_value, exc_traceback)
44+
)
45+
46+
# Log the exception
47+
_logger.error("Uncaught exception:\n%s", exception_details)
48+
49+
_logger.warning("Rolling back started SP ingest rows.")
50+
for row in rollback_rows:
51+
_logger.info(f"Rolling back row: {row}")
52+
c = processing_history.delete(
53+
processing_history_table=row["processing_history_table"],
54+
release_tag=row["release_tag"],
55+
file_type=row["file_type"],
56+
xml_release_date=row["xml_release_date"],
57+
client=_get_bq_client(),
58+
)
59+
_logger.info(f"Deleted {c} rows from processing_history.")
60+
61+
# Call the default exception handler
62+
sys.__excepthook__(exc_type, exc_value, exc_traceback)
63+
64+
# Add the exception handler as the global exception handler.
65+
# NOTE: this modifies global state and will affect all subsequent exceptions
66+
# in this script or any other script which imports this script.
67+
sys.excepthook = sp_rollback_exception_handler
3168

3269
################################################################
3370
### Initialization code
@@ -84,6 +121,17 @@ def _get_bq_client() -> bigquery.Client:
84121
"""
85122
_logger.info(msg)
86123

124+
# Add the started row to the rollback list
125+
rollback_rows.append(
126+
{
127+
"processing_history_table": processing_history_table,
128+
"release_tag": vcv_pipeline_version,
129+
"file_type": env.file_format_mode,
130+
"xml_release_date": str(vcv_xml_release_date),
131+
"client": _get_bq_client(),
132+
}
133+
)
134+
87135
# Now process individual rows
88136
for row in rows_to_ingest:
89137
_logger.info(row)
@@ -116,21 +164,19 @@ def _get_bq_client() -> bigquery.Client:
116164
_logger.info(msg)
117165
send_slack_message(msg)
118166
except Exception as e:
119-
processing_history.write_started(
120-
processing_history_table=processing_history_table,
121-
release_date=None,
122-
release_tag=vcv_pipeline_version,
123-
schema_version=schema_version,
124-
file_type=ClinVarIngestFileFormat(env.file_format_mode),
125-
client=_get_bq_client(),
126-
bucket_dir=vcv_bucket_dir,
127-
xml_release_date=str(vcv_xml_release_date),
128-
error_if_exists=False,
129-
)
130167
msg = f"""
131168
Stored procedure execution failed for release dated {vcv_xml_release_date} version
132169
{vcv_pipeline_version}.
133170
"""
134171
_logger.error(msg)
135172
send_slack_message(msg)
136173
raise e
174+
175+
176+
# Remove the started row from the rollback list, since this ingest has succeeded
177+
rollback_rows = [
178+
row
179+
for row in rollback_rows
180+
if row["xml_release_date"] != str(vcv_xml_release_date)
181+
or row["release_tag"] != vcv_pipeline_version
182+
]

misc/bin/workflow.py

Lines changed: 59 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
#!/usr/bin/env python3
2-
import argparse
32
import logging
43
import os
54
import sys
5+
import traceback
66
from pathlib import Path, PurePosixPath
77

88
from google.cloud import bigquery
@@ -67,6 +67,44 @@ def _get_bq_client() -> bigquery.Client:
6767
_get_bq_client.client = bigquery.Client()
6868
return _get_bq_client.client
6969

70+
################################################################
71+
### rollback exception handler for deleting processing_history entries
72+
73+
rollback_rows = []
74+
75+
def workflow_rollback_exception_handler(exc_type, exc_value, exc_traceback):
76+
"""
77+
https://docs.python.org/3/library/sys.html#sys.excepthook
78+
"""
79+
80+
exception_details = "".join(
81+
traceback.format_exception(exc_type, exc_value, exc_traceback)
82+
)
83+
84+
# Log the exception
85+
_logger.error("Uncaught exception:\n%s", exception_details)
86+
87+
88+
_logger.warning("Rolling back started processing_history rows.")
89+
for row in rollback_rows:
90+
_logger.info(f"Rolling back row: {row}")
91+
c = processing_history.delete(
92+
processing_history_table=row["processing_history_table"],
93+
release_tag=row["release_tag"],
94+
file_type=row["file_type"],
95+
xml_release_date=row["xml_release_date"],
96+
client=_get_bq_client(),
97+
)
98+
_logger.info(f"Deleted {c} rows from processing_history.")
99+
100+
# Call the default exception handler
101+
sys.__excepthook__(exc_type, exc_value, exc_traceback)
102+
103+
104+
# Add the exception handler as the global exception handler.
105+
# NOTE: this modifies global state and will affect all subsequent exceptions
106+
# in this script or any other script which imports this script.
107+
sys.excepthook = workflow_rollback_exception_handler
70108

71109
################################################################
72110
### Initialization code
@@ -140,6 +178,17 @@ def _get_bq_client() -> bigquery.Client:
140178
error_if_exists=False,
141179
)
142180

181+
# Add the started row to the rollback list
182+
rollback_rows.append(
183+
{
184+
"processing_history_table": processing_history_table,
185+
"release_tag": env.release_tag,
186+
"file_type": file_mode,
187+
"xml_release_date": str(release_date),
188+
"client": _get_bq_client(),
189+
}
190+
)
191+
143192

144193
################################################################
145194
# Run copy step. Copies a source XML file from an HTTP/FTP server to GCS
@@ -247,7 +296,7 @@ def parse(payload: ParseRequest, limit=None) -> ParseResponse:
247296
try:
248297
parse_response = parse(
249298
ParseRequest(input_path=copy_response.gcs_path),
250-
# limit=1000,
299+
#limit=1000,
251300
)
252301
_logger.info(f"Parse response: {parse_response.model_dump_json}")
253302
except Exception as e:
@@ -279,3 +328,11 @@ def parse(payload: ParseRequest, limit=None) -> ParseResponse:
279328
################################################################
280329
_logger.info("Workflow succeeded")
281330
send_slack_message(workflow_id_message + " - Workflow succeeded.")
331+
332+
# Remove the started row from the rollback list, since this ingest has succeeded
333+
rollback_rows = [
334+
row
335+
for row in rollback_rows
336+
if row["xml_release_date"] != release_date
337+
or row["release_tag"] != env.release_tag
338+
]

0 commit comments

Comments
 (0)