Skip to content

Commit af1eac4

Browse files
authored
#299 fix export of variation_identity table. Parameterize output bucket. Add some additional error handling and log details. (#301)
1 parent ea0112c commit af1eac4

File tree

4 files changed

+46
-47
lines changed

4 files changed

+46
-47
lines changed

clinvar_ingest/.dev.env

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,3 +3,4 @@ BQ_DEST_PROJECT="clingen-dev"
33
CLINVAR_INGEST_BQ_META_DATASET="clinvar_ingest"
44
CLINVAR_INGEST_BUCKET="clinvar-ingest-dev"
55
CLINVAR_INGEST_RELEASE_TAG="v1_2_3" # note the underscore separators due to BQ naming limitations
6+
CLINVAR_GKS_BUCKET="clinvar-gks"

clinvar_ingest/cloud/bigquery/processing_history.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -650,6 +650,7 @@ def processed_entries_ready_for_sp_processing(
650650
query = f"""
651651
SELECT
652652
release_date,
653+
final_dataset_id,
653654
vcv_file_type,
654655
vcv_pipeline_version,
655656
vcv_schema_version,

clinvar_ingest/config.py

Lines changed: 10 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,15 @@
11
import os
22
import pathlib
3-
from typing import Literal
3+
from typing import Annotated, Literal
44

55
from dotenv import dotenv_values
6-
from pydantic import BaseModel, field_validator
6+
from pydantic import BaseModel, Field, field_validator
77

88
_dotenv_env = os.environ.get("DOTENV_ENV", "dev")
99
_dotenv_values = dotenv_values(pathlib.Path(__file__).parent / f".{_dotenv_env}.env")
1010

1111

12-
def env_or_dotenv_or(
13-
key_name: str, default: str | None = None, throw: bool = False
14-
) -> str:
12+
def env_or_dotenv_or(key_name: str, default: str | None = None, throw: bool = False) -> str:
1513
"""
1614
Retrieves a value from the environment.
1715
If not set, retrieve it from the dotenv file.
@@ -43,14 +41,10 @@ class BaseEnv(Env):
4341
def _get_base_env() -> BaseEnv:
4442
return BaseEnv(
4543
bq_dest_project=env_or_dotenv_or("BQ_DEST_PROJECT", throw=True),
46-
bq_meta_dataset=env_or_dotenv_or(
47-
"CLINVAR_INGEST_BQ_META_DATASET", default="clinvar_ingest"
48-
),
44+
bq_meta_dataset=env_or_dotenv_or("CLINVAR_INGEST_BQ_META_DATASET", default="clinvar_ingest"),
4945
slack_token=env_or_dotenv_or("CLINVAR_INGEST_SLACK_TOKEN"),
5046
# defaults to test "clinvar-message-test"
51-
slack_channel=env_or_dotenv_or(
52-
"CLINVAR_INGEST_SLACK_CHANNEL", default="C06QFR0278D"
53-
),
47+
slack_channel=env_or_dotenv_or("CLINVAR_INGEST_SLACK_CHANNEL", default="C06QFR0278D"),
5448
release_tag=env_or_dotenv_or("CLINVAR_INGEST_RELEASE_TAG", throw=True),
5549
schema_version=env_or_dotenv_or("CLINVAR_INGEST_SCHEMA_VERSION", default="v2"),
5650
location=env_or_dotenv_or("CLINVAR_INGEST_LOCATION", default="us-east1"),
@@ -77,28 +71,23 @@ def get_file_ingest_env() -> ClinVarEnv:
7771
env = ClinVarEnv(
7872
**_base_env.model_dump(),
7973
bucket_name=env_or_dotenv_or("CLINVAR_INGEST_BUCKET", throw=True),
80-
bucket_staging_prefix=env_or_dotenv_or(
81-
"CLINVAR_INGEST_STAGING_PREFIX", default="clinvar_xml"
82-
),
83-
parse_output_prefix=env_or_dotenv_or(
84-
"CLINVAR_INGEST_PARSED_PREFIX", default="clinvar_parsed"
85-
),
86-
executions_output_prefix=env_or_dotenv_or(
87-
"CLINVAR_INGEST_EXECUTIONS_PREFIX", default="executions"
88-
),
74+
bucket_staging_prefix=env_or_dotenv_or("CLINVAR_INGEST_STAGING_PREFIX", default="clinvar_xml"),
75+
parse_output_prefix=env_or_dotenv_or("CLINVAR_INGEST_PARSED_PREFIX", default="clinvar_parsed"),
76+
executions_output_prefix=env_or_dotenv_or("CLINVAR_INGEST_EXECUTIONS_PREFIX", default="executions"),
8977
)
9078
_set_env(env)
9179
return env
9280

9381

9482
class StoredProceduresEnv(BaseEnv):
95-
pass
83+
clinvar_gks_bucket: Annotated[str, Field(description="The GKS bucket where the VI files are stored.")]
9684

9785

9886
def get_stored_procedures_env() -> StoredProceduresEnv:
9987
_base_env = _get_base_env()
10088
env = StoredProceduresEnv(
10189
**_base_env.model_dump(),
90+
clinvar_gks_bucket=env_or_dotenv_or("CLINVAR_GKS_BUCKET", throw=True),
10291
)
10392
_set_env(env)
10493
return env

misc/bin/stored-procedures-workflow.py

Lines changed: 34 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@
44
# stored procedures against on or more datasets in the ingestion workflow.
55

66
import logging
7-
import subprocess
87
import sys
98
import traceback
109

@@ -73,6 +72,11 @@ def sp_rollback_exception_handler(exc_type, exc_value, exc_traceback):
7372
env = get_stored_procedures_env()
7473
_logger.info(f"Stored procedures execution environment: {env}")
7574

75+
if env.file_format_mode != ClinVarIngestFileFormat.SP.value:
76+
msg = f"stored-procedure workflow got unexpected file_format_mode: {env.file_format_mode}"
77+
_logger.warning(msg)
78+
raise ValueError(msg)
79+
7680
################################################################
7781
#
7882
processing_history_table = processing_history.ensure_initialized(client=_get_bq_client())
@@ -85,17 +89,20 @@ def sp_rollback_exception_handler(exc_type, exc_value, exc_traceback):
8589
processed_entries_needing_sp_run = processing_history.processed_entries_ready_for_sp_processing(
8690
processing_history_view, client=_get_bq_client()
8791
)
88-
msg = f"Found {processed_entries_needing_sp_run.total_rows} datasets to run stored procedures on."
92+
total_rows = processed_entries_needing_sp_run.total_rows
93+
rows_needing_sp_run = list(processed_entries_needing_sp_run)
94+
release_dates_str = ", ".join(r.get("release_date").isoformat() for r in rows_needing_sp_run)
95+
msg = f"Found {total_rows} datasets to run stored procedures on. ({release_dates_str})"
8996
_logger.info(msg)
9097

91-
if not processed_entries_needing_sp_run.total_rows:
98+
if not total_rows:
9299
sys.exit(0)
93100

94101
send_slack_message(msg)
95102

96103
# update processing_history.bq_ingest_started for ALL processing_history_view
97104
rows_to_ingest = []
98-
for row in processed_entries_needing_sp_run:
105+
for row in rows_needing_sp_run:
99106
rows_to_ingest.append(row)
100107
vcv_pipeline_version = row.get("vcv_pipeline_version", None)
101108
vcv_xml_release_date = row.get("vcv_xml_release_date", None)
@@ -131,12 +138,13 @@ def sp_rollback_exception_handler(exc_type, exc_value, exc_traceback):
131138
# Now process individual rows
132139
for row in rows_to_ingest:
133140
_logger.info(row)
134-
release_date = row.get("release_date", None)
135-
vcv_pipeline_version = row.get("vcv_pipeline_version", None)
136-
vcv_release_date = row.get("vcv_release_date", None)
137-
vcv_xml_release_date = row.get("vcv_xml_release_date", None)
138-
vcv_bucket_dir = row.get("vcv_bucket_dir", None)
139-
schema_version = row.get("vcv_schema_version", None)
141+
# required
142+
release_date = row["release_date"]
143+
vcv_pipeline_version = row["vcv_pipeline_version"]
144+
vcv_xml_release_date = row["vcv_xml_release_date"]
145+
vcv_bucket_dir = row["vcv_bucket_dir"]
146+
# optional
147+
schema_version = row.get("vcv_schema_version")
140148

141149
msg = f"Executing stored procedures on dataset dated {release_date}"
142150
_logger.info(msg)
@@ -154,13 +162,13 @@ def sp_rollback_exception_handler(exc_type, exc_value, exc_traceback):
154162
client=_get_bq_client(),
155163
)
156164
msg = f"""
157-
Stored procedure execution successful for release dated {vcv_xml_release_date=} {vcv_pipeline_version=} release_tag={env.release_tag}.
165+
Stored procedure execution successful for release dated vcv_xml_release_date={vcv_xml_release_date.isoformat()} {vcv_pipeline_version=} release_tag={env.release_tag}.
158166
"""
159167
_logger.info(msg)
160168
send_slack_message(msg)
161169
except Exception as e:
162170
msg = f"""
163-
Stored procedure execution failed for release dated {vcv_xml_release_date=} {vcv_pipeline_version=} release_tag={env.release_tag}.
171+
Stored procedure execution failed for release dated vcv_xml_release_date={vcv_xml_release_date.isoformat()} {vcv_pipeline_version=} release_tag={env.release_tag}.
164172
"""
165173
_logger.error(msg)
166174
send_slack_message(msg)
@@ -173,22 +181,22 @@ def sp_rollback_exception_handler(exc_type, exc_value, exc_traceback):
173181
if row["xml_release_date"] != str(vcv_xml_release_date) or row["release_tag"] != env.release_tag
174182
]
175183

176-
dataset_id = row.get("final_dataset_id")
184+
dataset_id = row["final_dataset_id"]
177185

178-
vi_gs_url = f"gs://clinvar-gks/{release_date}/dev/vi.jsonl.gz"
179-
cmd = f"""
180-
bq extract \
181-
--destination_format NEWLINE_DELIMITED_JSON \
182-
--compression GZIP \
183-
'{dataset_id}.variation_identity' \
184-
{vi_gs_url}
185-
"""
186+
vi_gs_url = f"gs://{env.clinvar_gks_bucket}/{release_date}/dev/vi.jsonl.gz"
186187
try:
187-
subprocess.run(cmd, shell=True, check=True, capture_output=True, text=True)
188+
client = _get_bq_client()
189+
table_id = f"{dataset_id}.variation_identity"
190+
job_config = bigquery.ExtractJobConfig(
191+
destination_format=bigquery.DestinationFormat.NEWLINE_DELIMITED_JSON, compression=bigquery.Compression.GZIP
192+
)
193+
extract_job = client.extract_table(table_id, vi_gs_url, job_config=job_config)
194+
extract_job.result(timeout=1800) # Wait for the job to complete (30 minute timeout)
188195
msg = f"Successfully exported variation_identity file to {vi_gs_url}"
189196
_logger.info(msg)
190197
send_slack_message(msg)
191-
except subprocess.CalledProcessError as e:
192-
raise RuntimeError(
193-
f"Command failed: {e.cmd}\nReturn code: {e.returncode}\nStdout: {e.stdout}\nStderr: {e.stderr}"
194-
)
198+
except Exception as e:
199+
error_msg = f"BigQuery extract job to {vi_gs_url} failed: {e}"
200+
_logger.error(error_msg)
201+
send_slack_message(error_msg)
202+
raise e

0 commit comments

Comments
 (0)