2121import scripts .jobs .housing .housing_nec_migration_contacts_data_load_gx_suite
2222import scripts .jobs .housing .housing_nec_migration_arrears_actions_data_load_gx_suite
2323import scripts .jobs .housing .housing_nec_migration_revenue_accounts_data_load_gx_suite
24+
2425# import scripts.jobs.housing.housing_nec_migration_transactions_data_load_gx_suite
2526import scripts .jobs .housing .housing_nec_migration_addresses_data_load_gx_suite
2627
@@ -66,49 +67,52 @@ def main():
6667 logger .info (f"{ data_load } loading..." )
6768
6869 for table in table_list .get (data_load ):
69- logger .info (f"{ table } loading ..." )
70+ logger .info (f"{ table } processing ..." )
7071
7172 try :
7273 sql_query , id_field = get_sql_query (
7374 sql_config = sql_config , data_load = data_load , table = table
7475 )
76+ conn = connect (
77+ s3_staging_dir = s3_staging_location , region_name = region_name
78+ )
79+ df = pd .read_sql_query (sql_query , conn )
80+ except Exception as e :
81+ logger .error (f"SQL/Connection error for { table } : { e } . Skipping." )
82+ continue # Skip to next table
7583
76- conn = connect (s3_staging_dir = s3_staging_location , region_name = region_name )
77-
78- try :
79- df = pd .read_sql_query (sql_query , conn )
80- except Exception as e :
81- logger .error (f"SQL Read Problem found with { table } : { e } , skipping table." )
82- continue
83-
84- # set up batch
84+ # --- STEP 2: GX ASSET SETUP ---
85+ try :
8586 data_source = context .data_sources .add_pandas (f"{ table } _pandas" )
8687 data_asset = data_source .add_dataframe_asset (name = f"{ table } _df_asset" )
8788 batch_definition = data_asset .add_batch_definition_whole_dataframe (
8889 "Athena batch definition"
8990 )
9091 batch_parameters = {"dataframe" : df }
92+ except Exception as e :
93+ logger .error (f"GX Asset Setup error for { table } : { e } . Skipping." )
94+ continue
95+ try :
96+ suite = context .suites .get (name = f"{ data_load } _data_load_suite" )
97+ expectations = suite .expectations # Get this now to ensure it exists
98+ except Exception as e :
99+ logger .error (
100+ f"GX Suite retrieval error for { data_load } : { e } . Skipping."
101+ )
102+ continue
91103
92- # get expectation suite for dataset
93- try :
94- suite = context .suites .get (name = f"{ data_load } _data_load_suite" )
95- except Exception as e :
96- logger .error (f"GX Suite Problem found with { data_load } : { e } , skipping suite." )
97- continue
98- else :
99- expectations = suite .expectations
100-
104+ # VALIDATION & CHECKPOINT
105+ try :
101106 validation_definition = gx .ValidationDefinition (
102107 data = batch_definition ,
103108 suite = suite ,
104109 name = f"validation_definition_{ table } " ,
105110 )
106-
111+ # Use add_or_update to avoid duplicates
107112 validation_definition = context .validation_definitions .add_or_update (
108113 validation_definition
109114 )
110115
111- # create and start checking data with checkpoints
112116 checkpoint = context .checkpoints .add_or_update (
113117 gx .checkpoint .checkpoint .Checkpoint (
114118 name = f"{ table } _checkpoint" ,
@@ -120,13 +124,25 @@ def main():
120124 },
121125 )
122126 )
123-
124127 checkpoint_result = checkpoint .run (batch_parameters = batch_parameters )
128+ except Exception as e :
129+ logger .error (f"Checkpoint Run error for { table } : { e } . Skipping." )
130+ continue
125131
126- # Logic to handle results
127- results_dict = list (checkpoint_result .run_results .values ())[0 ].to_json_dict ()
132+ # PROCESS RESULTS
133+ try :
134+ results_dict = list (checkpoint_result .run_results .values ())[
135+ 0
136+ ].to_json_dict ()
128137 table_results_df = pd .json_normalize (results_dict ["results" ])
129138
139+ # Guard clause: If table passed perfectly, results might be empty or structure different
140+ if table_results_df .empty :
141+ logger .info (
142+ f"No results to process for { table } (possibly empty batch)."
143+ )
144+ continue
145+
130146 cols_not_needed = ["result.unexpected_list" , "result.observed_value" ]
131147 cols_to_drop = [
132148 c
@@ -137,64 +153,68 @@ def main():
137153 table_results_df = table_results_df .drop (columns = cols_to_drop )
138154 table_results_df_list .append (table_results_df )
139155
140- # generate id lists for each unexpected result set
156+ # Filter for rows that actually have unexpected indices
141157 query_df = table_results_df .loc [
142158 (~ table_results_df ["result.unexpected_index_list" ].isna ())
143- & (table_results_df ["result.unexpected_index_list" ].values != "[]" )
144- ]
159+ & (
160+ table_results_df ["result.unexpected_index_list" ].astype (str )
161+ != "[]"
162+ )
163+ ]
145164
146165 table_results_df ["unexpected_id_list" ] = pd .Series (dtype = "object" )
166+
147167 for i , row in query_df .iterrows ():
148168 try :
149- # check this
150- list ( df [ id_field ]. iloc [ row [ "result.unexpected_index_list" ]])
151- except Exception as e :
152- logger . warning (
153- f"Problem mapping IDs for { table } : { e } . Proceeding without ID list."
154- )
155- continue
156- else :
157- table_results_df . loc [ i , "unexpected_id_list" ] = str (
158- list ( df [ id_field ]. iloc [ row [ "result.unexpected_index_list" ]])
169+ indices = row [ "result.unexpected_index_list" ]
170+ # Safety check: Ensure indices are integers
171+ if isinstance ( indices , list ) :
172+ mapped_ids = df [ id_field ]. iloc [ indices ]. tolist ()
173+ table_results_df . at [ i , "unexpected_id_list" ] = str (
174+ mapped_ids
175+ )
176+ except KeyError :
177+ logger . error (
178+ f"ID Field ' { id_field } ' not found in DataFrame for { table } ."
159179 )
180+ except IndexError :
181+ logger .error (f"Indices out of bounds for { table } ." )
182+ except Exception as e :
183+ logger .warning (f"Mapping error for { table } row { i } : { e } " )
160184
161- # drop columns not needed in metadata
185+ # METADATA PROCESSING
162186 cols_to_drop_meta = [
163187 "notes" ,
164188 "result_format" ,
165189 "catch_exceptions" ,
166190 "rendered_content" ,
167191 "windows" ,
168192 ]
169-
170193 suite_df = pd .DataFrame ()
171- for i in expectations :
172- temp_i = i
173- temp_df = pd .json_normalize (dict (temp_i ))
174- temp_df ["expectation_type" ] = temp_i .expectation_type
194+
195+ # 'expectations' variable is guaranteed to exist from Step 3
196+ for exp in expectations :
197+ temp_df = pd .json_normalize (dict (exp ))
198+ temp_df ["expectation_type" ] = exp .expectation_type
175199 temp_df ["dataset_name" ] = table
176- temp_df ["expectation_id_full" ] = temp_i .expectation_type + '_' + table
177- temp_df = temp_df .drop (columns = cols_to_drop_meta , errors = ' ignore' ) # errors='ignore' is safer
200+ temp_df ["expectation_id_full" ] = f" { exp .expectation_type } _ { table } "
201+ temp_df = temp_df .drop (columns = cols_to_drop_meta , errors = " ignore" )
178202 suite_df = pd .concat ([suite_df , temp_df ])
179203
180204 df_all_suite_list .append (suite_df )
181205
182206 except Exception as e :
183- logger .error (f"CRITICAL ERROR processing table '{ table } ': { str (e )} " )
184- logger .error ("Skipping this table and moving to the next." )
207+ logger .error (f"Result Processing Error for { table } : { e } " )
185208 continue
186209
210+ # END LOOPS
187211 if not table_results_df_list :
188212 logger .error ("No tables were processed successfully. Exiting." )
189213 return
190214
191215 results_df = pd .concat (table_results_df_list )
192216 metadata_df = pd .concat (df_all_suite_list )
193217
194- # add expectation_id
195- # metadata_df["expectation_id"] = (
196- # metadata_df["expectation_type"] + "_" + metadata_df["dataset_name"]
197- # )
198218 metadata_df ["import_date" ] = datetime .today ().strftime ("%Y%m%d" )
199219
200220 # set dtypes for Athena with default of string
@@ -213,10 +233,10 @@ def main():
213233 value = results_df .set_index (
214234 ["expectation_config.type" , "dataset_name" ]
215235 ).index .factorize ()[0 ]
216- + 1 ,
236+ + 1 ,
217237 )
218238 results_df ["expectation_id" ] = (
219- results_df ["expectation_config.type" ] + "_" + results_df ["dataset_name" ]
239+ results_df ["expectation_config.type" ] + "_" + results_df ["dataset_name" ]
220240 )
221241 results_df ["import_date" ] = datetime .today ().strftime ("%Y%m%d" )
222242
@@ -230,7 +250,7 @@ def main():
230250 "result.element_count" : "bigint" ,
231251 "result.unexpected_count" : "bigint" ,
232252 "result.missing_count" : "bigint" ,
233- "result.details_mismatched" : ' string' ,
253+ "result.details_mismatched" : " string" ,
234254 "result.partial_unexpected_list" : "array<string>" ,
235255 "result.unexpected_index_list" : "array<bigint>" ,
236256 "result.unexpected_index_query" : "string" ,
0 commit comments