Skip to content

Commit ea0112c

Browse files
authored
#296 Ignore vcv_pipeline_version in stored-procedures. Use env.release_tag. Update logs and slack messages to add more info. (#297)
1 parent a2423bc commit ea0112c

File tree

2 files changed

+43
-68
lines changed

2 files changed

+43
-68
lines changed

clinvar_ingest/cloud/bigquery/processing_history.py

Lines changed: 32 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -247,9 +247,7 @@ def delete(
247247
# bigquery.ScalarQueryParameter("release_date", "STRING", release_date),
248248
bigquery.ScalarQueryParameter("release_tag", "STRING", release_tag),
249249
bigquery.ScalarQueryParameter("file_type", "STRING", file_type),
250-
bigquery.ScalarQueryParameter(
251-
"xml_release_date", "STRING", xml_release_date
252-
),
250+
bigquery.ScalarQueryParameter("xml_release_date", "STRING", xml_release_date),
253251
]
254252
)
255253
if client is None:
@@ -261,7 +259,12 @@ def delete(
261259
query_job = client.query(stmt, job_config=job_config)
262260
_ = query_job.result()
263261
deleted_count = query_job.dml_stats.deleted_row_count
264-
_logger.info(f"Deleted {deleted_count} rows from processing_history.")
262+
msg = (
263+
f"Deleted {deleted_count} rows from processing_history. "
264+
f"file_type={file_type}, xml_release_date={xml_release_date}, "
265+
f"release_tag={release_tag}"
266+
)
267+
_logger.info(msg)
265268
return deleted_count
266269

267270

@@ -304,13 +307,9 @@ def write_started( # noqa: PLR0913
304307
bigquery.ScalarQueryParameter("schema_version", "STRING", schema_version),
305308
bigquery.ScalarQueryParameter("file_type", "STRING", file_type),
306309
bigquery.ScalarQueryParameter("bucket_dir", "STRING", bucket_dir),
307-
bigquery.ScalarQueryParameter(
308-
"xml_release_date", "STRING", xml_release_date
309-
),
310+
bigquery.ScalarQueryParameter("xml_release_date", "STRING", xml_release_date),
310311
bigquery.ScalarQueryParameter("ftp_released", "STRING", ftp_released),
311-
bigquery.ScalarQueryParameter(
312-
"ftp_last_modified", "STRING", ftp_last_modified
313-
),
312+
bigquery.ScalarQueryParameter("ftp_last_modified", "STRING", ftp_last_modified),
314313
bigquery.ScalarQueryParameter("pipeline_version", "STRING", release_tag),
315314
]
316315
)
@@ -366,14 +365,12 @@ def write_started( # noqa: PLR0913
366365
# Run a synchronous query job and get the results
367366
query_job = client.query(sql, job_config=job_config)
368367
result = query_job.result()
369-
_logger.info(
370-
(
371-
"processing_history record written for job started event."
372-
"release_date=%s, file_type=%s"
373-
),
374-
release_date,
375-
file_type,
368+
msg = (
369+
"processing_history record written for job started event. "
370+
f"file_type={file_type}, release_date={release_date}, "
371+
f"release_tag={release_tag}, bucket_dir={bucket_dir}"
376372
)
373+
_logger.info(msg)
377374
return result, query_job
378375

379376

@@ -424,7 +421,7 @@ def write_finished(
424421
for row in results:
425422
if row.c != 1:
426423
raise RuntimeError(
427-
f"Expected 1 row to exist for the finished event, but found {row.c}."
424+
f"Expected 1 row to exist for the finished event, but found {row.c}. "
428425
f"file_type={file_type}, release_date={release_date}, "
429426
f"release_tag={release_tag}, bucket_dir={bucket_dir}"
430427
)
@@ -443,9 +440,7 @@ def write_finished(
443440
# print(f"Query: {query}")
444441
job_config = bigquery.QueryJobConfig(
445442
query_parameters=[
446-
bigquery.ScalarQueryParameter(
447-
"parsed_files", "JSON", json.dumps(_internal_model_dump(parsed_files))
448-
),
443+
bigquery.ScalarQueryParameter("parsed_files", "JSON", json.dumps(_internal_model_dump(parsed_files))),
449444
bigquery.ScalarQueryParameter("release_date", "STRING", release_date),
450445
bigquery.ScalarQueryParameter("release_tag", "STRING", release_tag),
451446
bigquery.ScalarQueryParameter("file_type", "STRING", file_type),
@@ -462,13 +457,8 @@ def write_finished(
462457
_logger.error("Errors occurred during the update operation:")
463458
for error in query_job.errors:
464459
_logger.error(error)
465-
raise RuntimeError(
466-
f"Error occurred during update operation: {query_job.errors}"
467-
)
468-
if (
469-
query_job.dml_stats.updated_row_count > 1
470-
or query_job.dml_stats.inserted_row_count > 1
471-
):
460+
raise RuntimeError(f"Error occurred during update operation: {query_job.errors}")
461+
if query_job.dml_stats.updated_row_count > 1 or query_job.dml_stats.inserted_row_count > 1:
472462
msg = (
473463
"More than one row was updated while updating processing_history "
474464
f"for the finished event: dml_stats={query_job.dml_stats}, "
@@ -477,25 +467,20 @@ def write_finished(
477467
)
478468
_logger.error(msg)
479469
raise RuntimeError(msg)
480-
if (
481-
query_job.dml_stats.updated_row_count == 0
482-
and query_job.dml_stats.inserted_row_count == 0
483-
):
470+
if query_job.dml_stats.updated_row_count == 0 and query_job.dml_stats.inserted_row_count == 0:
484471
msg = (
485472
"No rows were updated during the write_finished. "
486473
f"file_type={file_type}, release_date={release_date}, "
487474
f"release_tag={release_tag}, bucket_dir={bucket_dir}"
488475
)
489476
_logger.error(msg)
490477
raise RuntimeError(msg)
491-
_logger.info(
492-
(
493-
"processing_history record written for job finished event."
494-
"release_date=%s, file_type=%s"
495-
),
496-
release_date,
497-
file_type,
478+
msg = (
479+
"processing_history record written for job finished event. "
480+
f"file_type={file_type}, release_date={release_date}, "
481+
f"release_tag={release_tag}, bucket_dir={bucket_dir}"
498482
)
483+
_logger.info(msg)
499484
return result, query_job
500485

501486
except RuntimeError as e:
@@ -547,13 +532,8 @@ def update_final_release_date( # noqa: PLR0913
547532
_logger.error("Errors occurred during the update operation:")
548533
for error in query_job.errors:
549534
_logger.error(error)
550-
raise RuntimeError(
551-
f"Error occurred during update operation: {query_job.errors}"
552-
)
553-
if (
554-
query_job.dml_stats.updated_row_count > 1
555-
or query_job.dml_stats.inserted_row_count > 1
556-
):
535+
raise RuntimeError(f"Error occurred during update operation: {query_job.errors}")
536+
if query_job.dml_stats.updated_row_count > 1 or query_job.dml_stats.inserted_row_count > 1:
557537
msg = (
558538
"More than one row was updated while updating processing_history "
559539
f"for the final release date: dml_stats={query_job.dml_stats}, "
@@ -562,25 +542,20 @@ def update_final_release_date( # noqa: PLR0913
562542
)
563543
_logger.error(msg)
564544
raise RuntimeError(msg)
565-
if (
566-
query_job.dml_stats.updated_row_count == 0
567-
and query_job.dml_stats.inserted_row_count == 0
568-
):
545+
if query_job.dml_stats.updated_row_count == 0 and query_job.dml_stats.inserted_row_count == 0:
569546
msg = (
570547
"No rows were updated during the update_final_release_date. "
571548
f"file_type={file_type}, xml_release_date={xml_release_date}, "
572549
f"release_tag={release_tag}, bucket_dir={bucket_dir}"
573550
)
574551
_logger.error(msg)
575552
raise RuntimeError(msg)
576-
_logger.info(
577-
(
578-
"processing_history record updated for final release date."
579-
"xml_release_date=%s, file_type=%s"
580-
),
581-
xml_release_date,
582-
file_type,
553+
msg = (
554+
"processing_history record updated for final release date. "
555+
f"file_type={file_type}, xml_release_date={xml_release_date}, "
556+
f"release_tag={release_tag}, bucket_dir={bucket_dir}"
583557
)
558+
_logger.info(msg)
584559
return result, query_job
585560

586561

misc/bin/stored-procedures-workflow.py

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,7 @@ def sp_rollback_exception_handler(exc_type, exc_value, exc_traceback):
104104
sp_processing_write_result = processing_history.write_started(
105105
processing_history_table=processing_history_table,
106106
release_date=str(vcv_xml_release_date),
107-
release_tag=vcv_pipeline_version,
107+
release_tag=env.release_tag,
108108
schema_version=schema_version,
109109
file_type=ClinVarIngestFileFormat(env.file_format_mode),
110110
client=_get_bq_client(),
@@ -114,19 +114,17 @@ def sp_rollback_exception_handler(exc_type, exc_value, exc_traceback):
114114
)
115115

116116
msg = f"""
117-
Initiated stored procedure processing for release dated {vcv_xml_release_date} version
118-
{vcv_pipeline_version}.
117+
Initiated stored procedure processing for release dated {vcv_xml_release_date=} {vcv_pipeline_version=} release_tag={env.release_tag}.
119118
"""
120119
_logger.info(msg)
121120

122121
# Add the started row to the rollback list
123122
rollback_rows.append(
124123
{
125124
"processing_history_table": processing_history_table,
126-
"release_tag": vcv_pipeline_version,
125+
"release_tag": env.release_tag,
127126
"file_type": env.file_format_mode,
128127
"xml_release_date": str(vcv_xml_release_date),
129-
"client": _get_bq_client(),
130128
}
131129
)
132130

@@ -156,15 +154,13 @@ def sp_rollback_exception_handler(exc_type, exc_value, exc_traceback):
156154
client=_get_bq_client(),
157155
)
158156
msg = f"""
159-
Stored procedure execution successful for release dated {vcv_xml_release_date} version
160-
{vcv_pipeline_version}.
157+
Stored procedure execution successful for release dated {vcv_xml_release_date=} {vcv_pipeline_version=} release_tag={env.release_tag}.
161158
"""
162159
_logger.info(msg)
163160
send_slack_message(msg)
164161
except Exception as e:
165162
msg = f"""
166-
Stored procedure execution failed for release dated {vcv_xml_release_date} version
167-
{vcv_pipeline_version}.
163+
Stored procedure execution failed for release dated {vcv_xml_release_date=} {vcv_pipeline_version=} release_tag={env.release_tag}.
168164
"""
169165
_logger.error(msg)
170166
send_slack_message(msg)
@@ -174,20 +170,24 @@ def sp_rollback_exception_handler(exc_type, exc_value, exc_traceback):
174170
rollback_rows = [
175171
row
176172
for row in rollback_rows
177-
if row["xml_release_date"] != str(vcv_xml_release_date) or row["release_tag"] != vcv_pipeline_version
173+
if row["xml_release_date"] != str(vcv_xml_release_date) or row["release_tag"] != env.release_tag
178174
]
179175

180176
dataset_id = row.get("final_dataset_id")
181177

178+
vi_gs_url = f"gs://clinvar-gks/{release_date}/dev/vi.jsonl.gz"
182179
cmd = f"""
183180
bq extract \
184181
--destination_format NEWLINE_DELIMITED_JSON \
185182
--compression GZIP \
186183
'{dataset_id}.variation_identity' \
187-
gs://clinvar-gks/{release_date}/dev/vi.json.gz
184+
{vi_gs_url}
188185
"""
189186
try:
190187
subprocess.run(cmd, shell=True, check=True, capture_output=True, text=True)
188+
msg = f"Successfully exported variation_identity file to {vi_gs_url}"
189+
_logger.info(msg)
190+
send_slack_message(msg)
191191
except subprocess.CalledProcessError as e:
192192
raise RuntimeError(
193193
f"Command failed: {e.cmd}\nReturn code: {e.returncode}\nStdout: {e.stdout}\nStderr: {e.stderr}"

0 commit comments

Comments
 (0)