Skip to content

Commit a2423bc

Browse files
authored
Implement #293, #292 (#294)
1 parent bde9a4d commit a2423bc

File tree

2 files changed

+28
-21
lines changed

2 files changed

+28
-21
lines changed

clinvar_ingest/cloud/bigquery/stored_procedures.py

Lines changed: 4 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -32,12 +32,11 @@
3232
"CALL `clinvar_ingest.temporal_data_collection`({0});",
3333
"CALL `clinvar_ingest.temporal_data_summation`();",
3434
"CALL `clinvar_ingest.tracker_report_update`();",
35+
"CALL `clinvar_ingest.variation_identity`({0});",
3536
]
3637

3738

38-
def execute_each(
39-
client: bigquery.Client, project_id: str, release_date: str | None
40-
) -> list[RowIterator]:
39+
def execute_each(client: bigquery.Client, project_id: str, release_date: str | None) -> list[RowIterator]:
4140
"""Execute each procedure in the list of stored procedures individualy,
4241
substituting the release_date date if provided.
4342
@@ -53,9 +52,7 @@ def execute_each(
5352
logging.info(f"Executing stored procedure: {query_with_args}")
5453
job = client.query(query_with_args, project=project_id)
5554
result = job.result()
56-
logging.info(
57-
f"Successfully ran stored procedure: {query_with_args}\nresult={result}"
58-
)
55+
logging.info(f"Successfully ran stored procedure: {query_with_args}\nresult={result}")
5956
results.append(result)
6057
except Exception as e:
6158
msg = f"Failed to execute stored procedure: {query_with_args} {e}"
@@ -64,9 +61,7 @@ def execute_each(
6461
return results
6562

6663

67-
def execute_all(
68-
client: bigquery.Client, project_id: str, release_date: str | None
69-
) -> RowIterator:
64+
def execute_all(client: bigquery.Client, project_id: str, release_date: str | None) -> RowIterator:
7065
"""Execute the list of stored procedures as one single script,
7166
substituting the release_date date if provided.
7267

misc/bin/stored-procedures-workflow.py

Lines changed: 24 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -4,18 +4,18 @@
44
# stored procedures against on or more datasets in the ingestion workflow.
55

66
import logging
7+
import subprocess
78
import sys
89
import traceback
910

1011
from google.cloud import bigquery
1112

1213
from clinvar_ingest.cloud.bigquery import processing_history
14+
from clinvar_ingest.cloud.bigquery.stored_procedures import execute_all
1315
from clinvar_ingest.config import get_stored_procedures_env
1416
from clinvar_ingest.slack import send_slack_message
1517
from clinvar_ingest.utils import ClinVarIngestFileFormat
1618

17-
from clinvar_ingest.cloud.bigquery.stored_procedures import execute_all
18-
1919
logging.basicConfig(
2020
level=logging.INFO,
2121
format="%(asctime)s.%(msecs)03d - %(levelname)s - %(name)s - %(funcName)s - %(message)s",
@@ -29,6 +29,7 @@ def _get_bq_client() -> bigquery.Client:
2929
setattr(_get_bq_client, "client", bigquery.Client())
3030
return getattr(_get_bq_client, "client")
3131

32+
3233
################################################################
3334
### rollback exception handler for deleting processing_history entries
3435

@@ -39,9 +40,7 @@ def sp_rollback_exception_handler(exc_type, exc_value, exc_traceback):
3940
"""
4041
https://docs.python.org/3/library/sys.html#sys.excepthook
4142
"""
42-
exception_details = "".join(
43-
traceback.format_exception(exc_type, exc_value, exc_traceback)
44-
)
43+
exception_details = "".join(traceback.format_exception(exc_type, exc_value, exc_traceback))
4544

4645
# Log the exception
4746
_logger.error("Uncaught exception:\n%s", exception_details)
@@ -61,6 +60,7 @@ def sp_rollback_exception_handler(exc_type, exc_value, exc_traceback):
6160
# Call the default exception handler
6261
sys.__excepthook__(exc_type, exc_value, exc_traceback)
6362

63+
6464
# Add the exception handler as the global exception handler.
6565
# NOTE: this modifies global state and will affect all subsequent exceptions
6666
# in this script or any other script which imports this script.
@@ -75,9 +75,7 @@ def sp_rollback_exception_handler(exc_type, exc_value, exc_traceback):
7575

7676
################################################################
7777
#
78-
processing_history_table = processing_history.ensure_initialized(
79-
client=_get_bq_client()
80-
)
78+
processing_history_table = processing_history.ensure_initialized(client=_get_bq_client())
8179

8280
processing_history_view = processing_history.ensure_history_view_exists(
8381
processing_history_table=processing_history_table,
@@ -172,11 +170,25 @@ def sp_rollback_exception_handler(exc_type, exc_value, exc_traceback):
172170
send_slack_message(msg)
173171
raise e
174172

175-
176173
# Remove the started row from the rollback list, since this ingest has succeeded
177174
rollback_rows = [
178175
row
179176
for row in rollback_rows
180-
if row["xml_release_date"] != str(vcv_xml_release_date)
181-
or row["release_tag"] != vcv_pipeline_version
182-
]
177+
if row["xml_release_date"] != str(vcv_xml_release_date) or row["release_tag"] != vcv_pipeline_version
178+
]
179+
180+
dataset_id = row.get("final_dataset_id")
181+
182+
cmd = f"""
183+
bq extract \
184+
--destination_format NEWLINE_DELIMITED_JSON \
185+
--compression GZIP \
186+
'{dataset_id}.variation_identity' \
187+
gs://clinvar-gks/{release_date}/dev/vi.json.gz
188+
"""
189+
try:
190+
subprocess.run(cmd, shell=True, check=True, capture_output=True, text=True)
191+
except subprocess.CalledProcessError as e:
192+
raise RuntimeError(
193+
f"Command failed: {e.cmd}\nReturn code: {e.returncode}\nStdout: {e.stdout}\nStderr: {e.stderr}"
194+
)

0 commit comments

Comments
 (0)