Skip to content

Commit be84279

Browse files
committed
Update GX DQ suites so that each expectation is uniquely named; remove transactions loads from testing; add error handling for entire testing loop.
1 parent 1204282 commit be84279

10 files changed

+191
-289
lines changed

scripts/helpers/housing_nec_migration_gx_dq_inputs.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
"arrears_actions": {"id_field": "LACA_PAY_REF"},
77
"revenue_accounts": {"id_field": "LRAC_PAY_REF"},
88
"transactions": {"id_field": "LTRN_ALT_REF"},
9-
"addresses": {"id_field": "LAUS_LEGACY_REF"}
9+
"addresses": {"id_field": "LAUS_LEGACY_REF"},
1010
}
1111

1212
data_load_list = [
@@ -16,7 +16,7 @@
1616
"contacts",
1717
"arrears_actions",
1818
"revenue_accounts",
19-
"transactions",
19+
# "transactions",
2020
"addresses",
2121
]
2222

scripts/jobs/housing/housing_nec_migration_addresses_data_load_gx_suite.py

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -7,23 +7,23 @@
77
import great_expectations.expectations as gxe
88

99

10-
class ExpectPropRefColumnValuesToBeUnique(gxe.ExpectColumnValuesToBeUnique):
10+
class AddressesExpectPropRefColumnValuesToBeUnique(gxe.ExpectColumnValuesToBeUnique):
1111
column: str = "LAUS_LEGACY_REF"
1212
description: str = "Expect LAUS_LEGACY_REF values to be unique"
1313

14-
class ExpectPropRefColumnValuesToNotBeNull(gxe.ExpectColumnValuesToNotBeNull):
14+
class AddressesExpectPropRefColumnValuesToNotBeNull(gxe.ExpectColumnValuesToNotBeNull):
1515
column: str = "LAUS_LEGACY_REF"
1616
description: str = "Expect LAUS_LEGACY_REF values to not be Null"
1717

18-
class ExpectUPRNColumnValuesToBeUnique(gxe.ExpectColumnValuesToBeUnique):
18+
class AddressesExpectUPRNColumnValuesToBeUnique(gxe.ExpectColumnValuesToBeUnique):
1919
column: str = "LADR_UPRN"
2020
description: str = "Expect UPRN values to be unique"
2121

22-
class ExpectUPRNColumnValuesToNotBeNull(gxe.ExpectColumnValuesToNotBeNull):
22+
class AddressesExpectUPRNColumnValuesToNotBeNull(gxe.ExpectColumnValuesToNotBeNull):
2323
column: str = "LADR_UPRN"
2424
description: str = "Expect UPRN values to not be Null"
2525

26-
class ExpectAddressColumnsToMatchOrderedList(gxe.ExpectTableColumnsToMatchOrderedList):
26+
class AddressesExpectAddressColumnsToMatchOrderedList(gxe.ExpectTableColumnsToMatchOrderedList):
2727
column_list = [
2828
"LAUS_LEGACY_REF",
2929
"LAUS_AUT_FAO_CODE",
@@ -65,9 +65,9 @@ class ExpectAddressColumnsToMatchOrderedList(gxe.ExpectTableColumnsToMatchOrdere
6565

6666
suite = gx.ExpectationSuite(name="addresses_data_load_suite")
6767

68-
suite.add_expectation(ExpectPropRefColumnValuesToBeUnique())
69-
suite.add_expectation(ExpectUPRNColumnValuesToBeUnique())
70-
suite.add_expectation(ExpectAddressColumnsToMatchOrderedList())
71-
suite.add_expectation(ExpectPropRefColumnValuesToNotBeNull())
72-
suite.add_expectation(ExpectUPRNColumnValuesToNotBeNull())
68+
suite.add_expectation(AddressesExpectPropRefColumnValuesToBeUnique())
69+
suite.add_expectation(AddressesExpectUPRNColumnValuesToBeUnique())
70+
suite.add_expectation(AddressesExpectAddressColumnsToMatchOrderedList())
71+
suite.add_expectation(AddressesExpectPropRefColumnValuesToNotBeNull())
72+
suite.add_expectation(AddressesExpectUPRNColumnValuesToNotBeNull())
7373
suite = context.suites.add(suite)

scripts/jobs/housing/housing_nec_migration_apply_gx_dq_tests.py

Lines changed: 116 additions & 104 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
import scripts.jobs.housing.housing_nec_migration_contacts_data_load_gx_suite
2222
import scripts.jobs.housing.housing_nec_migration_arrears_actions_data_load_gx_suite
2323
import scripts.jobs.housing.housing_nec_migration_revenue_accounts_data_load_gx_suite
24-
import scripts.jobs.housing.housing_nec_migration_transactions_data_load_gx_suite
24+
# import scripts.jobs.housing.housing_nec_migration_transactions_data_load_gx_suite
2525
import scripts.jobs.housing.housing_nec_migration_addresses_data_load_gx_suite
2626

2727
logging.basicConfig(level=logging.INFO)
@@ -68,119 +68,133 @@ def main():
6868
for table in table_list.get(data_load):
6969
logger.info(f"{table} loading...")
7070

71-
sql_query, id_field = get_sql_query(
72-
sql_config=sql_config, data_load=data_load, table=table
73-
)
74-
75-
conn = connect(s3_staging_dir=s3_staging_location, region_name=region_name)
76-
7771
try:
78-
df = pd.read_sql_query(sql_query, conn)
79-
except Exception as e:
80-
logger.info(f"Problem found with {table}: {e}, skipping table.")
81-
continue
72+
sql_query, id_field = get_sql_query(
73+
sql_config=sql_config, data_load=data_load, table=table
74+
)
8275

83-
# set up batch
84-
data_source = context.data_sources.add_pandas(f"{table}_pandas")
85-
data_asset = data_source.add_dataframe_asset(name=f"{table}_df_asset")
86-
batch_definition = data_asset.add_batch_definition_whole_dataframe(
87-
"Athena batch definition"
88-
)
89-
batch_parameters = {"dataframe": df}
76+
conn = connect(s3_staging_dir=s3_staging_location, region_name=region_name)
9077

91-
# get expectation suite for dataset
92-
try:
93-
suite = context.suites.get(name=f"{data_load}_data_load_suite")
94-
except Exception as e:
95-
logger.info(f"Problem found with {data_load}: GX suite {e}, skipping suite.")
96-
continue
97-
else:
98-
expectations = suite.expectations
99-
100-
validation_definition = gx.ValidationDefinition(
101-
data=batch_definition,
102-
suite=suite,
103-
name=f"validation_definition_{table}",
104-
)
105-
validation_definition = context.validation_definitions.add(
106-
validation_definition
107-
)
108-
109-
# create and start checking data with checkpoints
110-
checkpoint = context.checkpoints.add(
111-
gx.checkpoint.checkpoint.Checkpoint(
112-
name=f"{table}_checkpoint",
113-
validation_definitions=[validation_definition],
114-
result_format={
115-
"result_format": "COMPLETE",
116-
"return_unexpected_index_query": False,
117-
"partial_unexpected_count": 0,
118-
},
78+
try:
79+
df = pd.read_sql_query(sql_query, conn)
80+
except Exception as e:
81+
logger.error(f"SQL Read Problem found with {table}: {e}, skipping table.")
82+
continue
83+
84+
# set up batch
85+
data_source = context.data_sources.add_pandas(f"{table}_pandas")
86+
data_asset = data_source.add_dataframe_asset(name=f"{table}_df_asset")
87+
batch_definition = data_asset.add_batch_definition_whole_dataframe(
88+
"Athena batch definition"
11989
)
120-
)
121-
122-
checkpoint_result = checkpoint.run(batch_parameters=batch_parameters)
123-
results_dict = list(checkpoint_result.run_results.values())[
124-
0
125-
].to_json_dict()
126-
table_results_df = pd.json_normalize(results_dict["results"])
127-
cols_not_needed = ["result.unexpected_list", "result.observed_value"]
128-
cols_to_drop = [
129-
c
130-
for c in table_results_df.columns
131-
if c.startswith("exception_info") or c in cols_not_needed
132-
]
133-
134-
table_results_df = table_results_df.drop(columns=cols_to_drop)
135-
table_results_df_list.append(table_results_df)
136-
137-
# generate id lists for each unexpected result set
138-
query_df = table_results_df.loc[
139-
(~table_results_df["result.unexpected_index_list"].isna())
140-
& (table_results_df["result.unexpected_index_list"].values != "[]")
141-
]
142-
143-
table_results_df["unexpected_id_list"] = pd.Series(dtype="object")
144-
for i, row in query_df.iterrows():
90+
batch_parameters = {"dataframe": df}
91+
92+
# get expectation suite for dataset
14593
try:
146-
list(df[id_field].iloc[row["result.unexpected_index_list"]])
94+
suite = context.suites.get(name=f"{data_load}_data_load_suite")
14795
except Exception as e:
148-
logger.info(
149-
f"Problem found with {table}: {e}, skipping making unexpected_id_list."
150-
)
96+
logger.error(f"GX Suite Problem found with {data_load}: {e}, skipping suite.")
15197
continue
15298
else:
153-
table_results_df.loc[i, "unexpected_id_list"] = str(
154-
list(df[id_field].iloc[row["result.unexpected_index_list"]])
99+
expectations = suite.expectations
100+
101+
validation_definition = gx.ValidationDefinition(
102+
data=batch_definition,
103+
suite=suite,
104+
name=f"validation_definition_{table}",
105+
)
106+
107+
validation_definition = context.validation_definitions.add_or_update(
108+
validation_definition
109+
)
110+
111+
# create and start checking data with checkpoints
112+
checkpoint = context.checkpoints.add_or_update(
113+
gx.checkpoint.checkpoint.Checkpoint(
114+
name=f"{table}_checkpoint",
115+
validation_definitions=[validation_definition],
116+
result_format={
117+
"result_format": "COMPLETE",
118+
"return_unexpected_index_query": False,
119+
"partial_unexpected_count": 0,
120+
},
155121
)
122+
)
123+
124+
checkpoint_result = checkpoint.run(batch_parameters=batch_parameters)
125+
126+
# Logic to handle results
127+
results_dict = list(checkpoint_result.run_results.values())[0].to_json_dict()
128+
table_results_df = pd.json_normalize(results_dict["results"])
129+
130+
cols_not_needed = ["result.unexpected_list", "result.observed_value"]
131+
cols_to_drop = [
132+
c
133+
for c in table_results_df.columns
134+
if c.startswith("exception_info") or c in cols_not_needed
135+
]
136+
137+
table_results_df = table_results_df.drop(columns=cols_to_drop)
138+
table_results_df_list.append(table_results_df)
156139

157-
# drop columns not needed in metatdata
158-
cols_to_drop_meta = [
159-
"notes",
160-
"result_format",
161-
"catch_exceptions",
162-
"rendered_content",
163-
"windows",
164-
]
165-
166-
suite_df = pd.DataFrame()
167-
for i in expectations:
168-
temp_i = i
169-
temp_df = pd.json_normalize(dict(temp_i))
170-
temp_df["expectation_type"] = temp_i.expectation_type
171-
temp_df["dataset_name"] = table
172-
temp_df = temp_df.drop(columns=cols_to_drop_meta)
173-
suite_df = pd.concat([suite_df, temp_df])
174-
175-
df_all_suite_list.append(suite_df)
140+
# generate id lists for each unexpected result set
141+
query_df = table_results_df.loc[
142+
(~table_results_df["result.unexpected_index_list"].isna())
143+
& (table_results_df["result.unexpected_index_list"].values != "[]")
144+
]
145+
146+
table_results_df["unexpected_id_list"] = pd.Series(dtype="object")
147+
for i, row in query_df.iterrows():
148+
try:
149+
# check this
150+
list(df[id_field].iloc[row["result.unexpected_index_list"]])
151+
except Exception as e:
152+
logger.warning(
153+
f"Problem mapping IDs for {table}: {e}. Proceeding without ID list."
154+
)
155+
continue
156+
else:
157+
table_results_df.loc[i, "unexpected_id_list"] = str(
158+
list(df[id_field].iloc[row["result.unexpected_index_list"]])
159+
)
160+
161+
# drop columns not needed in metadata
162+
cols_to_drop_meta = [
163+
"notes",
164+
"result_format",
165+
"catch_exceptions",
166+
"rendered_content",
167+
"windows",
168+
]
169+
170+
suite_df = pd.DataFrame()
171+
for i in expectations:
172+
temp_i = i
173+
temp_df = pd.json_normalize(dict(temp_i))
174+
temp_df["expectation_type"] = temp_i.expectation_type
175+
temp_df["dataset_name"] = table
176+
temp_df["expectation_id_full"] = temp_i.expectation_type + '_' + table
177+
temp_df = temp_df.drop(columns=cols_to_drop_meta, errors='ignore') # errors='ignore' is safer
178+
suite_df = pd.concat([suite_df, temp_df])
179+
180+
df_all_suite_list.append(suite_df)
181+
182+
except Exception as e:
183+
logger.error(f"CRITICAL ERROR processing table '{table}': {str(e)}")
184+
logger.error("Skipping this table and moving to the next.")
185+
continue
186+
187+
if not table_results_df_list:
188+
logger.error("No tables were processed successfully. Exiting.")
189+
return
176190

177191
results_df = pd.concat(table_results_df_list)
178192
metadata_df = pd.concat(df_all_suite_list)
179193

180194
# add expectation_id
181-
metadata_df["expectation_id"] = (
182-
metadata_df["expectation_type"] + "_" + metadata_df["dataset_name"]
183-
)
195+
# metadata_df["expectation_id"] = (
196+
# metadata_df["expectation_type"] + "_" + metadata_df["dataset_name"]
197+
# )
184198
metadata_df["import_date"] = datetime.today().strftime("%Y%m%d")
185199

186200
# set dtypes for Athena with default of string
@@ -199,10 +213,10 @@ def main():
199213
value=results_df.set_index(
200214
["expectation_config.type", "dataset_name"]
201215
).index.factorize()[0]
202-
+ 1,
216+
+ 1,
203217
)
204218
results_df["expectation_id"] = (
205-
results_df["expectation_config.type"] + "_" + results_df["dataset_name"]
219+
results_df["expectation_config.type"] + "_" + results_df["dataset_name"]
206220
)
207221
results_df["import_date"] = datetime.today().strftime("%Y%m%d")
208222

@@ -216,6 +230,7 @@ def main():
216230
"result.element_count": "bigint",
217231
"result.unexpected_count": "bigint",
218232
"result.missing_count": "bigint",
233+
"result.details_mismatched": 'string',
219234
"result.partial_unexpected_list": "array<string>",
220235
"result.unexpected_index_list": "array<bigint>",
221236
"result.unexpected_index_query": "string",
@@ -225,9 +240,6 @@ def main():
225240
"import_date": "string",
226241
}
227242

228-
# TODO for df_vars in [[results_df, dtype_dict_results, target_table], [metadata_df, dtype_dict_metadata, target_metadata_table]]:
229-
# will loop the writing of these tables
230-
231243
wr.s3.to_parquet(
232244
df=results_df,
233245
path=s3_target_location_results,

scripts/jobs/housing/housing_nec_migration_arrears_actions_data_load_gx_suite.py

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -7,19 +7,19 @@
77
import great_expectations.expectations as gxe
88

99

10-
class ExpectPayRefColumnValuesToNotBeNull(gxe.ExpectColumnValuesToNotBeNull):
10+
class ArrearsActionsExpectPayRefColumnValuesToNotBeNull(gxe.ExpectColumnValuesToNotBeNull):
1111
column: str = "LACA_PAY_REF"
1212
description: str = (
1313
"Expect LACA_PAY_REF (pay ref) values to not be Null in contacts load"
1414
)
1515

1616

17-
class ExpectValueColumnValuesToNotBeNull(gxe.ExpectColumnValuesToNotBeNull):
17+
class ArrearsActionsExpectCreatedDateToNotBeNull(gxe.ExpectColumnValuesToNotBeNull):
1818
column: str = "LACA_CREATED_DATE"
19-
description: str = "Expect LCDE_CONTACT_VALUE (contact value) to not be Null"
19+
description: str = "Expect LACA_CREATED_DATE to not be Null"
2020

2121

22-
class ExpectArrearCodeToBeInSet(gxe.ExpectColumnValuesToBeInSet):
22+
class ArrearsActionsExpectArrearCodeToBeInSet(gxe.ExpectColumnValuesToBeInSet):
2323
column: str = "LACA_ARA_CODE"
2424
value_set: list = [
2525
"BAGF",
@@ -104,7 +104,7 @@ class ExpectArrearCodeToBeInSet(gxe.ExpectColumnValuesToBeInSet):
104104
description: str = "Expect arrear code to be one of the set"
105105

106106

107-
class ExpectArrearsActionsColumnsToMatchOrderedList(gxe.ExpectTableColumnsToMatchOrderedList):
107+
class ArrearsActionsExpectArrearsActionsColumnsToMatchOrderedList(gxe.ExpectTableColumnsToMatchOrderedList):
108108
column_list = [
109109
"LACA_BALANCE",
110110
"LACA_PAY_REF",
@@ -136,8 +136,8 @@ class ExpectArrearsActionsColumnsToMatchOrderedList(gxe.ExpectTableColumnsToMatc
136136

137137
suite = gx.ExpectationSuite(name="arrears_actions_data_load_suite")
138138

139-
suite.add_expectation(ExpectArrearsActionsColumnsToMatchOrderedList())
140-
suite.add_expectation(ExpectArrearCodeToBeInSet())
141-
suite.add_expectation(ExpectPayRefColumnValuesToNotBeNull())
142-
suite.add_expectation(ExpectValueColumnValuesToNotBeNull())
139+
suite.add_expectation(ArrearsActionsExpectArrearsActionsColumnsToMatchOrderedList())
140+
suite.add_expectation(ArrearsActionsExpectArrearCodeToBeInSet())
141+
suite.add_expectation(ArrearsActionsExpectPayRefColumnValuesToNotBeNull())
142+
suite.add_expectation(ArrearsActionsExpectCreatedDateToNotBeNull())
143143
suite = context.suites.add(suite)

0 commit comments

Comments
 (0)