88import json
99import logging
1010import sys
11+
1112from google .cloud import bigquery
1213from google .cloud .storage import Client as GCSClient
1314
@@ -72,8 +73,10 @@ def _get_bq_client() -> bigquery.Client:
7273 processing_history_table = processing_history_table ,
7374 client = _get_bq_client (),
7475)
75- processing_history_needing_bq_ingest = processing_history .processed_entries_ready_for_bq_ingest (
76- processing_history_view , client = _get_bq_client ()
76+ processing_history_needing_bq_ingest = (
77+ processing_history .processed_entries_ready_for_bq_ingest (
78+ processing_history_view , client = _get_bq_client ()
79+ )
7780)
7881msg = f"Found { processing_history_needing_bq_ingest .total_rows } VCV/RCV datasets to ingest."
7982_logger .info (msg )
@@ -83,6 +86,13 @@ def _get_bq_client() -> bigquery.Client:
8386
8487send_slack_message (msg )
8588
89+
90+ def json_parse_if_string (value ):
91+ if isinstance (value , str ):
92+ return json .loads (value )
93+ return value
94+
95+
8696# update processing_history to set the bq ingest as having started
8797rows_to_ingest = []
8898for row in processing_history_needing_bq_ingest :
@@ -100,7 +110,7 @@ def _get_bq_client() -> bigquery.Client:
100110 client = _get_bq_client (),
101111 bucket_dir = vcv_bucket_dir ,
102112 xml_release_date = str (vcv_xml_release_date ),
103- error_if_exists = True ,
113+ error_if_exists = False ,
104114 )
105115 _logger .info (
106116 f"Created bq_ingest_processing record with version { vcv_pipeline_version } and "
@@ -119,7 +129,7 @@ def _get_bq_client() -> bigquery.Client:
119129 vcv_release_date = row .get ("vcv_release_date" , None )
120130 vcv_xml_release_date = row .get ("vcv_xml_release_date" , None )
121131 vcv_bucket_dir = row .get ("vcv_bucket_dir" , None )
122- vcv_parsed_files = json . loads (row .get ("vcv_parsed_files" , None ))
132+ vcv_parsed_files = json_parse_if_string (row .get ("vcv_parsed_files" , None ))
123133
124134 rcv_file_type = row .get ("rcv_file_type" , None )
125135 rcv_pipeline_version = row .get ("rcv_pipeline_version" , None )
@@ -129,7 +139,7 @@ def _get_bq_client() -> bigquery.Client:
129139 rcv_release_date = row .get ("rcv_release_date" , None )
130140 rcv_xml_release_date = row .get ("rcv_xml_release_date" , None )
131141 rcv_bucket_dir = row .get ("rcv_bucket_dir" , None )
132- rcv_parsed_files = json . loads (row .get ("rcv_parsed_files" , None ))
142+ rcv_parsed_files = json_parse_if_string (row .get ("rcv_parsed_files" , None ))
133143
134144 # We will use the VCV file's release date as the "release date" for both
135145 # And the VCV pipeline version as the "pipeline version"
@@ -173,9 +183,7 @@ def _get_bq_client() -> bigquery.Client:
173183 vcv_create_tables_request
174184 )
175185 vcv_ext_resp_json = json .dumps (
176- walk_and_replace (
177- vcv_create_tables_response , processing_history ._dump_fn
178- )
186+ walk_and_replace (vcv_create_tables_response , processing_history ._dump_fn )
179187 )
180188 _logger .info (f"VCV Create External Tables response: { vcv_ext_resp_json } " )
181189
@@ -191,9 +199,7 @@ def _get_bq_client() -> bigquery.Client:
191199 rcv_create_tables_request
192200 )
193201 rcv_ext_resp_json = json .dumps (
194- walk_and_replace (
195- rcv_create_tables_response , processing_history ._dump_fn
196- )
202+ walk_and_replace (rcv_create_tables_response , processing_history ._dump_fn )
197203 )
198204 _logger .info (f"RCV Create External Tables response: { rcv_ext_resp_json } " )
199205
0 commit comments