Skip to content

Commit e7c35ae

Browse files
Read release_date at the beginning of workflow.py. Add copy_only option to workflow.py (#283)
* Read release_date at the beginning of workflow.py. Add copy_only option to workflow.py * Enable get_release_date_and_iterate_type to read directly from http/ftp URLs. Exit if copy_only set. * Documentation and clean up of pytest warnings * Updated stored_prcoedures with procs per LB * More updates to stored procedures * Add try/except around initial retrieval of release_date. Add delete() method for processing_history and call if bq_ingester fails during table creation * Update test RCV file Dated to 2024-07-30 to match test VCV file * Track list of file_type=bq rows to rollback in bq_ingester, and delete them in the global exception handler. * Remove try/except from bq_ingester now that it is caught * Remove release_date is NULL check from SQL query for processed_entries_ready_for_bq_ingest --------- Co-authored-by: Terry ONeill <toneill@broadinstitute.org>
1 parent 2fac790 commit e7c35ae

File tree

9 files changed

+550
-341
lines changed

9 files changed

+550
-341
lines changed

clinvar_ingest/api/model/requests.py

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@
1414
ValidationInfo,
1515
field_serializer,
1616
field_validator,
17-
validator,
1817
)
1918

2019
from clinvar_ingest.status import StepName, StepStatus
@@ -140,7 +139,7 @@ class GcsBlobPath(RootModel):
140139

141140
root: str
142141

143-
@validator("root")
142+
@field_validator("root")
144143
def _validate(cls, v): # pylint: disable=E0213
145144
if not v.startswith("gs://"):
146145
raise ValueError(f"Must be a gs:// URL: {v}")
@@ -157,7 +156,7 @@ class PurePathStr(RootModel):
157156

158157
root: str
159158

160-
@validator("root")
159+
@field_validator("root")
161160
def _validate(cls, v): # pylint: disable=E0213
162161
PurePath(v)
163162
return v

clinvar_ingest/cloud/bigquery/processing_history.py

Lines changed: 118 additions & 78 deletions
Original file line numberDiff line numberDiff line change
@@ -36,8 +36,8 @@ def _internal_model_dump(obj):
3636

3737

3838
def create_processing_history_table(
39-
client: bigquery.Client,
40-
table_reference: bigquery.TableReference,
39+
client: bigquery.Client,
40+
table_reference: bigquery.TableReference,
4141
) -> bigquery.Table:
4242
"""
4343
Similar to create_tables.create_table, but without the external file importing.
@@ -53,8 +53,8 @@ def create_processing_history_table(
5353

5454

5555
def ensure_history_view_exists(
56-
processing_history_table: bigquery.Table,
57-
client: bigquery.Client | None = None,
56+
processing_history_table: bigquery.Table,
57+
client: bigquery.Client | None = None,
5858
):
5959
"""
6060
Creates the view of the processing history table linking
@@ -149,8 +149,8 @@ def ensure_history_view_exists(
149149

150150

151151
def ensure_initialized(
152-
client: bigquery.Client | None = None,
153-
# storage_client: storage.Client | None = None,
152+
client: bigquery.Client | None = None,
153+
# storage_client: storage.Client | None = None,
154154
) -> bigquery.Table:
155155
"""
156156
Ensures that the bigquery clinvar-ingest metadata dataset and processing_history
@@ -191,12 +191,12 @@ def ensure_initialized(
191191

192192

193193
def check_started_exists(
194-
processing_history_table: bigquery.Table,
195-
release_date: str,
196-
release_tag: str,
197-
file_type: ClinVarIngestFileFormat,
198-
bucket_dir: str,
199-
client: bigquery.Client | None = None,
194+
processing_history_table: bigquery.Table,
195+
release_date: str,
196+
release_tag: str,
197+
file_type: ClinVarIngestFileFormat,
198+
bucket_dir: str,
199+
client: bigquery.Client | None = None,
200200
):
201201
sql = f"""
202202
SELECT COUNT(*) as c
@@ -225,18 +225,58 @@ def check_started_exists(
225225
# return row.c > 0
226226

227227

228+
def delete(
229+
processing_history_table: bigquery.Table,
230+
# release_date: str | None,
231+
release_tag: str,
232+
file_type: ClinVarIngestFileFormat,
233+
xml_release_date: str | None = None,
234+
client: bigquery.Client | None = None,
235+
) -> int:
236+
"""
237+
Deletes processing_history rows which match every parameter.
238+
"""
239+
stmt = f"""
240+
DELETE FROM {processing_history_table}
241+
WHERE pipeline_version = @release_tag
242+
AND file_type = @file_type
243+
AND xml_release_date = @xml_release_date
244+
""" # noqa: S608
245+
job_config = bigquery.QueryJobConfig(
246+
query_parameters=[
247+
# bigquery.ScalarQueryParameter("release_date", "STRING", release_date),
248+
bigquery.ScalarQueryParameter("release_tag", "STRING", release_tag),
249+
bigquery.ScalarQueryParameter("file_type", "STRING", file_type),
250+
bigquery.ScalarQueryParameter(
251+
"xml_release_date", "STRING", xml_release_date
252+
),
253+
]
254+
)
255+
if client is None:
256+
client = bigquery.Client()
257+
258+
_logger.info(
259+
f"Deleting rows from processing_history: {stmt} (release_tag={release_tag}, file_type={file_type}, xml_release_date={xml_release_date})"
260+
)
261+
query_job = client.query(stmt, job_config=job_config)
262+
_ = query_job.result()
263+
deleted_count = query_job.dml_stats.deleted_row_count
264+
_logger.info(f"Deleted {deleted_count} rows from processing_history.")
265+
return deleted_count
266+
267+
228268
def write_started( # noqa: PLR0913
229-
processing_history_table: bigquery.Table,
230-
release_date: str | None,
231-
release_tag: str,
232-
schema_version: str,
233-
file_type: ClinVarIngestFileFormat,
234-
bucket_dir: str | None = None, # TODO - Causes problems due to SQl Lookup?
235-
client: bigquery.Client | None = None,
236-
ftp_released: str | None = None,
237-
ftp_last_modified: str | None = None,
238-
xml_release_date: str | None = None,
239-
error_if_exists=True,
269+
processing_history_table: bigquery.Table,
270+
release_date: str | None,
271+
release_tag: str,
272+
schema_version: str,
273+
file_type: ClinVarIngestFileFormat,
274+
bucket_dir: str | None = None, # TODO - Causes problems due to SQl Lookup?
275+
client: bigquery.Client | None = None,
276+
ftp_released: str | None = None,
277+
ftp_last_modified: str | None = None,
278+
xml_release_date: str | None = None,
279+
error_if_exists=True,
240280
):
241281
"""
242282
Writes the status of processing to the processing_history table.
@@ -325,7 +365,7 @@ def write_started( # noqa: PLR0913
325365

326366
# Run a synchronous query job and get the results
327367
query_job = client.query(sql, job_config=job_config)
328-
_ = query_job.result()
368+
result = query_job.result()
329369
_logger.info(
330370
(
331371
"processing_history record written for job started event."
@@ -334,16 +374,17 @@ def write_started( # noqa: PLR0913
334374
release_date,
335375
file_type,
336376
)
377+
return result, query_job
337378

338379

339380
def write_finished(
340-
processing_history_table: bigquery.Table,
341-
release_date: str,
342-
release_tag: str,
343-
file_type: ClinVarIngestFileFormat,
344-
bucket_dir: str,
345-
parsed_files: dict | None = None, # ParseResponse.parsed_files
346-
client: bigquery.Client | None = None,
381+
processing_history_table: bigquery.Table,
382+
release_date: str,
383+
release_tag: str,
384+
file_type: ClinVarIngestFileFormat,
385+
bucket_dir: str,
386+
parsed_files: dict | None = None, # ParseResponse.parsed_files
387+
client: bigquery.Client | None = None,
347388
):
348389
"""
349390
Writes the status of the VCV processing to the processing_history table.
@@ -425,8 +466,8 @@ def write_finished(
425466
f"Error occurred during update operation: {query_job.errors}"
426467
)
427468
if (
428-
query_job.dml_stats.updated_row_count > 1
429-
or query_job.dml_stats.inserted_row_count > 1
469+
query_job.dml_stats.updated_row_count > 1
470+
or query_job.dml_stats.inserted_row_count > 1
430471
):
431472
msg = (
432473
"More than one row was updated while updating processing_history "
@@ -437,8 +478,8 @@ def write_finished(
437478
_logger.error(msg)
438479
raise RuntimeError(msg)
439480
if (
440-
query_job.dml_stats.updated_row_count == 0
441-
and query_job.dml_stats.inserted_row_count == 0
481+
query_job.dml_stats.updated_row_count == 0
482+
and query_job.dml_stats.inserted_row_count == 0
442483
):
443484
msg = (
444485
"No rows were updated during the write_finished. "
@@ -463,14 +504,14 @@ def write_finished(
463504

464505

465506
def update_final_release_date( # noqa: PLR0913
466-
processing_history_table: bigquery.Table,
467-
xml_release_date: str,
468-
release_tag: str,
469-
file_type: ClinVarIngestFileFormat,
470-
bucket_dir: str,
471-
final_release_date: str,
472-
final_dataset_id: str,
473-
client: bigquery.Client | None = None,
507+
processing_history_table: bigquery.Table,
508+
xml_release_date: str,
509+
release_tag: str,
510+
file_type: ClinVarIngestFileFormat,
511+
bucket_dir: str,
512+
final_release_date: str,
513+
final_dataset_id: str,
514+
client: bigquery.Client | None = None,
474515
):
475516
"""
476517
Updates the final release date and final dataset id field
@@ -510,8 +551,8 @@ def update_final_release_date( # noqa: PLR0913
510551
f"Error occurred during update operation: {query_job.errors}"
511552
)
512553
if (
513-
query_job.dml_stats.updated_row_count > 1
514-
or query_job.dml_stats.inserted_row_count > 1
554+
query_job.dml_stats.updated_row_count > 1
555+
or query_job.dml_stats.inserted_row_count > 1
515556
):
516557
msg = (
517558
"More than one row was updated while updating processing_history "
@@ -522,8 +563,8 @@ def update_final_release_date( # noqa: PLR0913
522563
_logger.error(msg)
523564
raise RuntimeError(msg)
524565
if (
525-
query_job.dml_stats.updated_row_count == 0
526-
and query_job.dml_stats.inserted_row_count == 0
566+
query_job.dml_stats.updated_row_count == 0
567+
and query_job.dml_stats.inserted_row_count == 0
527568
):
528569
msg = (
529570
"No rows were updated during the update_final_release_date. "
@@ -544,8 +585,8 @@ def update_final_release_date( # noqa: PLR0913
544585

545586

546587
def read_processing_history_entries(
547-
processing_history_view_table: bigquery.Table,
548-
client: bigquery.Client | None = None,
588+
processing_history_view_table: bigquery.Table,
589+
client: bigquery.Client | None = None,
549590
) -> google.cloud.bigquery.table.RowIterator:
550591
"""
551592
Reads the pairwise view of the processing history table linking
@@ -578,8 +619,8 @@ def read_processing_history_entries(
578619

579620

580621
def processed_entries_ready_for_bq_ingest(
581-
processing_history_view_table: bigquery.Table,
582-
client: bigquery.Client | None = None,
622+
processing_history_view_table: bigquery.Table,
623+
client: bigquery.Client | None = None,
583624
) -> google.cloud.bigquery.table.RowIterator:
584625
"""
585626
Reads the pairwise view of the processing history table linking
@@ -611,7 +652,6 @@ def processed_entries_ready_for_bq_ingest(
611652
FROM {processing_history_view_table}
612653
WHERE vcv_processing_finished IS NOT NULL
613654
AND rcv_processing_finished IS NOT NULL
614-
AND release_date IS NULL
615655
AND bq_release_date IS NULL
616656
AND bq_processing_started IS NULL
617657
ORDER BY vcv_xml_release_date
@@ -621,8 +661,8 @@ def processed_entries_ready_for_bq_ingest(
621661

622662

623663
def processed_entries_ready_for_sp_processing(
624-
processing_history_view_table: bigquery.Table,
625-
client: bigquery.Client | None = None,
664+
processing_history_view_table: bigquery.Table,
665+
client: bigquery.Client | None = None,
626666
) -> google.cloud.bigquery.table.RowIterator:
627667
"""
628668
Reads the pairwise view of the processing history table linking
@@ -669,32 +709,32 @@ def processed_entries_ready_for_sp_processing(
669709
return query_job.result()
670710

671711

672-
def ingested_entries_ready_to_be_processed(
673-
processing_history_view_table: bigquery.Table,
674-
client: bigquery.Client | None = None,
675-
) -> google.cloud.bigquery.table.RowIterator:
676-
pass
712+
# def ingested_entries_ready_to_be_processed(
713+
# processing_history_view_table: bigquery.Table,
714+
# client: bigquery.Client | None = None,
715+
# ) -> google.cloud.bigquery.table.RowIterator:
716+
# pass
677717

678718

679-
def update_bq_ingest_processing(
680-
processing_history_table: bigquery.Table,
681-
pipeline_version: str,
682-
xml_release_date: str,
683-
bq_ingest_processing: bool | None = True,
684-
client: bigquery.Client | None = None,
685-
):
686-
if client is None:
687-
client = bigquery.Client()
688-
fully_qualified_table_id = str(processing_history_table)
689-
query = f"""
690-
UPDATE {fully_qualified_table_id}
691-
SET bq_ingest_processing = {bq_ingest_processing}
692-
WHERE file_type = '{ClinVarIngestFileFormat.VCV}'
693-
AND pipeline_version = '{pipeline_version}'
694-
AND xml_release_date = '{xml_release_date}'
695-
""" # TODO prepared statement # noqa: S608
696-
query_job = client.query(query)
697-
return query_job.result()
719+
# def update_bq_ingest_processing(
720+
# processing_history_table: bigquery.Table,
721+
# pipeline_version: str,
722+
# xml_release_date: str,
723+
# bq_ingest_processing: bool | None = True,
724+
# client: bigquery.Client | None = None,
725+
# ):
726+
# if client is None:
727+
# client = bigquery.Client()
728+
# fully_qualified_table_id = str(processing_history_table)
729+
# query = f"""
730+
# UPDATE {fully_qualified_table_id}
731+
# SET bq_ingest_processing = {bq_ingest_processing}
732+
# WHERE file_type = '{ClinVarIngestFileFormat.VCV}'
733+
# AND pipeline_version = '{pipeline_version}'
734+
# AND xml_release_date = '{xml_release_date}'
735+
# """ # TODO prepared statement
736+
# query_job = client.query(query)
737+
# return query_job.result()
698738

699739
# TODO - Insert a BQ record type entry in this method above vs updating the flag
700740

clinvar_ingest/cloud/bigquery/stored_procedures.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,13 +28,16 @@
2828
_logger = logging.getLogger("clinvar_ingest")
2929

3030
stored_procedures = [
31-
"CALL `clinvar_ingest.dataset_preparation_v2`({0});",
31+
"CALL `clinvar_ingest.dataset_preparation`({0});",
32+
"CALL `clinvar_ingest.temporal_data_collection`({0});",
33+
"CALL `clinvar_ingest.temporal_data_summation`();",
34+
"CALL `clinvar_ingest.variation_tracker`();",
3235
]
3336

3437

3538
def execute_each(
3639
client: bigquery.Client, project_id: str, release_date: str | None
37-
) -> RowIterator:
40+
) -> list[RowIterator]:
3841
"""Execute each procedure in the list of stored procedures individualy,
3942
substituting the release_date date if provided.
4043

0 commit comments

Comments
 (0)