Skip to content

Commit 9d7ea2d

Browse files
authored
Use dataset in dataset_preparation (#303)
1 parent af1eac4 commit 9d7ea2d

File tree

2 files changed

+16
-10
lines changed

2 files changed

+16
-10
lines changed

clinvar_ingest/cloud/bigquery/stored_procedures.py

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -28,15 +28,15 @@
2828
_logger = logging.getLogger("clinvar_ingest")
2929

3030
stored_procedures = [
31-
"CALL `clinvar_ingest.dataset_preparation`({0});",
32-
"CALL `clinvar_ingest.temporal_data_collection`({0});",
31+
"CALL `clinvar_ingest.dataset_preparation`({dataset});",
32+
"CALL `clinvar_ingest.temporal_data_collection`({release_date});",
3333
"CALL `clinvar_ingest.temporal_data_summation`();",
3434
"CALL `clinvar_ingest.tracker_report_update`();",
35-
"CALL `clinvar_ingest.variation_identity`({0});",
35+
"CALL `clinvar_ingest.variation_identity`({release_date});",
3636
]
3737

3838

39-
def execute_each(client: bigquery.Client, project_id: str, release_date: str | None) -> list[RowIterator]:
39+
def execute_each(client: bigquery.Client, project_id: str, release_date: str | None, dataset: str) -> list[RowIterator]:
4040
"""Execute each procedure in the list of stored procedures individualy,
4141
substituting the release_date date if provided.
4242
@@ -45,9 +45,10 @@ def execute_each(client: bigquery.Client, project_id: str, release_date: str | N
4545
:param release_date: yyyy_mm_dd, the yyyy_mm_dd formatted date or None to use the BQ `CURRENT_DATE()`
4646
"""
4747
as_of_date = "CURRENT_DATE()" if release_date is None else f"'{release_date}'"
48+
dataset_str = f"'{dataset}'"
4849
results = []
4950
for query in stored_procedures:
50-
query_with_args = query.format(as_of_date)
51+
query_with_args = query.format(release_date=as_of_date, dataset=dataset_str)
5152
try:
5253
logging.info(f"Executing stored procedure: {query_with_args}")
5354
job = client.query(query_with_args, project=project_id)
@@ -61,7 +62,7 @@ def execute_each(client: bigquery.Client, project_id: str, release_date: str | N
6162
return results
6263

6364

64-
def execute_all(client: bigquery.Client, project_id: str, release_date: str | None) -> RowIterator:
65+
def execute_all(client: bigquery.Client, project_id: str, release_date: str | None, dataset: str) -> RowIterator:
6566
"""Execute the list of stored procedures as one single script,
6667
substituting the release_date date if provided.
6768
@@ -70,7 +71,8 @@ def execute_all(client: bigquery.Client, project_id: str, release_date: str | No
7071
:param release_date: yyyy_mm_dd, the yyyy_mm_dd formatted date or None to use the BQ `CURRENT_DATE()`
7172
"""
7273
as_of_date = "CURRENT_DATE()" if release_date is None else f"'{release_date}'"
73-
query_with_args = [query.format(as_of_date) for query in stored_procedures]
74+
dataset_str = f"'{dataset}'"
75+
query_with_args = [query.format(release_date=as_of_date, dataset=dataset_str) for query in stored_procedures]
7476
query = "\n".join(query_with_args)
7577
_logger.info(f"Executing stored procedures via query: {query}")
7678
try:

misc/bin/stored-procedures-workflow.py

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -143,14 +143,20 @@ def sp_rollback_exception_handler(exc_type, exc_value, exc_traceback):
143143
vcv_pipeline_version = row["vcv_pipeline_version"]
144144
vcv_xml_release_date = row["vcv_xml_release_date"]
145145
vcv_bucket_dir = row["vcv_bucket_dir"]
146+
dataset_id = row["final_dataset_id"]
146147
# optional
147148
schema_version = row.get("vcv_schema_version")
148149

149150
msg = f"Executing stored procedures on dataset dated {release_date}"
150151
_logger.info(msg)
151152
send_slack_message(msg)
152153
try:
153-
result = execute_all(client=_get_bq_client(), project_id=env.bq_dest_project, release_date=release_date)
154+
result = execute_all(
155+
client=_get_bq_client(),
156+
project_id=env.bq_dest_project,
157+
release_date=release_date,
158+
dataset=dataset_id,
159+
)
154160

155161
processing_history.write_finished(
156162
processing_history_table=processing_history_table,
@@ -181,8 +187,6 @@ def sp_rollback_exception_handler(exc_type, exc_value, exc_traceback):
181187
if row["xml_release_date"] != str(vcv_xml_release_date) or row["release_tag"] != env.release_tag
182188
]
183189

184-
dataset_id = row["final_dataset_id"]
185-
186190
vi_gs_url = f"gs://{env.clinvar_gks_bucket}/{release_date}/dev/vi.jsonl.gz"
187191
try:
188192
client = _get_bq_client()

0 commit comments

Comments
 (0)