Skip to content

Commit c0047d5

Browse files
authored
Add Great Expectations data quality testing for NEC Migration data loads. (#2395)
* Add Great Expectations data quality testing for NEC Migration data load outputs. Includes test suite and terraform. * Change dq inputs file import to correct one.
1 parent 9b6490f commit c0047d5

File tree

4 files changed

+235
-0
lines changed

4 files changed

+235
-0
lines changed
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
sql_config = {
2+
"properties_1a": {
3+
"sql": """ SELECT *
4+
FROM "housing_nec_migration"."properties_1a" """,
5+
"id_field": "LPRO_PROPREF",
6+
},
7+
}
8+
9+
10+
table_list = ['properties_1a']
11+
12+
partition_keys = ['import_date']
Lines changed: 142 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,142 @@
1+
# flake8: noqa: F821
2+
3+
import awswrangler as wr
4+
from datetime import datetime, date
5+
import json
6+
import logging
7+
import sys
8+
9+
from awsglue.utils import getResolvedOptions
10+
import great_expectations as gx
11+
import pandas as pd
12+
from pyathena import connect
13+
from scripts.helpers.housing_nec_migration_gx_dq_inputs import sql_config, table_list, partition_keys
14+
import scripts.jobs.housing.housing_nec_migration_properties_data_load_gx_suite
15+
16+
logging.basicConfig(level=logging.INFO)
17+
logger = logging.getLogger(__name__)
18+
19+
arg_keys = ['region_name', 's3_endpoint', 's3_target_location', 's3_staging_location', 'target_database',
20+
'target_table']
21+
args = getResolvedOptions(sys.argv, arg_keys)
22+
locals().update(args)
23+
24+
25+
def json_serial(obj):
26+
"""JSON serializer for objects not serializable by default."""
27+
if isinstance(obj, (datetime, date)):
28+
return obj.isoformat()
29+
raise TypeError(f"Type {type(obj)} not serializable")
30+
31+
32+
def main():
33+
# add GX context
34+
context = gx.get_context(mode="file", project_root_dir=s3_target_location)
35+
36+
table_results_df_list = []
37+
38+
for table in table_list:
39+
logger.info(f'{table} loading...')
40+
41+
sql_query = sql_config.get(table).get('sql')
42+
43+
conn = connect(s3_staging_dir=s3_staging_location,
44+
region_name=region_name)
45+
46+
df = pd.read_sql_query(sql_query, conn)
47+
48+
# set up batch
49+
data_source = context.data_sources.add_pandas("pandas")
50+
data_asset = data_source.add_dataframe_asset(name=f'{table}_df_asset')
51+
batch_definition = data_asset.add_batch_definition_whole_dataframe("Athena batch definition")
52+
batch_parameters = {"dataframe": df}
53+
54+
# get expectation suite for dataset
55+
suite = context.suites.get(name='properties_data_load_suite')
56+
57+
validation_definition = gx.ValidationDefinition(
58+
data=batch_definition,
59+
suite=suite,
60+
name=f'validation_definition_{table}')
61+
validation_definition = context.validation_definitions.add(validation_definition)
62+
63+
# create and start checking data with checkpoints
64+
checkpoint = context.checkpoints.add(
65+
gx.checkpoint.checkpoint.Checkpoint(
66+
name=f'{table}_checkpoint',
67+
validation_definitions=[validation_definition],
68+
result_format={"result_format": "COMPLETE",
69+
"return_unexpected_index_query": False,
70+
"partial_unexpected_count": 0}
71+
)
72+
)
73+
74+
checkpoint_result = checkpoint.run(batch_parameters=batch_parameters)
75+
results_dict = list(checkpoint_result.run_results.values())[0].to_json_dict()
76+
table_results_df = pd.json_normalize(results_dict['results'])
77+
cols_to_drop = [c for c in table_results_df.columns if c.startswith('exception_info')]
78+
cols_to_drop.append('result.unexpected_list')
79+
table_results_df = table_results_df.drop(columns=cols_to_drop)
80+
table_results_df_list.append(table_results_df)
81+
82+
# generate id lists for each unexpected result set
83+
query_df = table_results_df.loc[(~table_results_df['result.unexpected_index_list'].isna()) & (
84+
table_results_df['result.unexpected_index_list'].values != '[]')]
85+
86+
table_results_df['unexpected_id_list'] = pd.Series(dtype='object')
87+
for i, row in query_df.iterrows():
88+
table_results_df.loc[i, 'unexpected_id_list'] = str(
89+
list(df[sql_config.get(table).get('id_field')].iloc[row['result.unexpected_index_list']]))
90+
91+
results_df = pd.concat(table_results_df_list)
92+
93+
# map DQ dimension type
94+
results_df['dq_dimension_type'] = results_df['expectation_config.type'].map(dq_dimensions_map)
95+
96+
# add clean dataset name
97+
results_df['dataset_name'] = results_df['expectation_config.kwargs.batch_id'].map(
98+
lambda x: x.removeprefix('pandas-').removesuffix('_df_asset'))
99+
100+
# add composite key for each specific test (so can be tracked over time)
101+
results_df.insert(loc=0, column='expectation_key',
102+
value=results_df.set_index(['expectation_config.type', 'dataset_name']).index.factorize()[0] + 1)
103+
results_df['expectation_id'] = results_df['expectation_config.type'] + "_" + results_df['dataset_name']
104+
results_df['import_date'] = datetime.today().strftime('%Y%m%d')
105+
106+
# set dtypes for Athena
107+
dtype_dict = {'expectation_config.type': 'string',
108+
'expectation_config.kwargs.batch_id': 'string',
109+
'expectation_config.kwargs.column': 'string',
110+
'expectation_config.kwargs.min_value': 'string',
111+
'expectation_config.kwargs.max_value': 'string',
112+
'result.element_count': 'bigint',
113+
'result.unexpected_count': 'bigint',
114+
'result.missing_count': 'bigint',
115+
'result.partial_unexpected_list': 'array<string>',
116+
'result.unexpected_index_list': 'array<bigint>',
117+
'result.unexpected_index_query': 'string',
118+
'expectation_config.kwargs.regex': 'string',
119+
'expectation_config.kwargs.value_set': 'string',
120+
'expectation_config.kwargs.column_list': 'string',
121+
'import_year': 'string',
122+
'import_month': 'string',
123+
'import_day': 'string',
124+
'import_date': 'string'}
125+
126+
# write to s3
127+
wr.s3.to_parquet(
128+
df=results_df,
129+
path=s3_target_location,
130+
dataset=True,
131+
database=target_database,
132+
table=target_table,
133+
mode="overwrite_partitions",
134+
partition_cols=partition_keys,
135+
dtype=dtype_dict
136+
)
137+
138+
logger.info(f'Data Quality test results for NEC data loads written to {s3_target_location}')
139+
140+
141+
if __name__ == '__main__':
142+
main()
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
# flake8: noqa: F821
2+
3+
import sys
4+
5+
from awsglue.utils import getResolvedOptions
6+
import great_expectations as gx
7+
import great_expectations.expectations as gxe
8+
9+
10+
class ExpectPropRefColumnValuesToBeUnique(gxe.ExpectColumnValuesToBeUnique):
11+
column: str = "LPRO_PROPREF"
12+
description: str = "Expect UPRN (LPRO_PROPREF) values to be unique"
13+
14+
15+
class ExpectPropTypeCodeToBeInSet(gxe.ExpectColumnValuesToBeInSet):
16+
column: str = "LPRO_HOU_PTV_CODE"
17+
value_set: list = [
18+
"BUN",
19+
"CMC",
20+
"CMF",
21+
"COM",
22+
"CYC",
23+
"DUP",
24+
"FLT",
25+
"GAR",
26+
"HOU",
27+
"MAI",
28+
"PRA",
29+
"PSP",
30+
"ROM",
31+
"STD",
32+
"TRV",
33+
]
34+
description: str = "Expect property type codes to contain one of the set"
35+
36+
37+
arg_key = ["s3_target_location"]
38+
args = getResolvedOptions(sys.argv, arg_key)
39+
locals().update(args)
40+
41+
# add to GX context
42+
context = gx.get_context(mode="file", project_root_dir=s3_target_location)
43+
44+
suite = gx.ExpectationSuite(name="properties_data_load_suite")
45+
46+
suite.add_expectation(ExpectPropRefColumnValuesToBeUnique())
47+
suite.add_expectation(ExpectPropTypeCodeToBeInSet())
48+
suite = context.suites.add(suite)
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
module "housing_nec_migration_apply_gx_dq_tests" {
2+
source = "../modules/aws-glue-job"
3+
is_production_environment = local.is_production_environment
4+
is_live_environment = local.is_live_environment
5+
6+
count = local.is_live_environment ? 1 : 0
7+
8+
department = module.department_housing_data_source
9+
job_name = "${local.short_identifier_prefix}Housing NEC Migration GX Data Quality Testing"
10+
glue_scripts_bucket_id = module.glue_scripts_data_source.bucket_id
11+
glue_temp_bucket_id = module.glue_temp_storage_data_source.bucket_id
12+
glue_job_timeout = 360
13+
helper_module_key = data.aws_s3_object.helpers.key
14+
pydeequ_zip_key = data.aws_s3_object.pydeequ.key
15+
spark_ui_output_storage_id = module.spark_ui_output_storage_data_source.bucket_id
16+
trigger_enabled = local.is_production_environment
17+
number_of_workers_for_glue_job = 2
18+
schedule = "cron(0 10 ? * MON-FRI *)"
19+
job_parameters = {
20+
"--job-bookmark-option" = "job-bookmark-enable"
21+
"--enable-glue-datacatalog" = "true"
22+
"--enable-continuous-cloudwatch-log" = "true"
23+
"--additional-python-modules" = "great_expectations==1.5.8,PyAthena,numpy==1.26.1,awswrangler==3.10.0"
24+
"--region_name" = data.aws_region.current.name
25+
"--s3_endpoint" = "https://s3.${data.aws_region.current.name}.amazonaws.com"
26+
"--s3_target_location" = "s3://${module.raw_zone_data_source.bucket_id}/housing/nec-migration-data-quality-tests/"
27+
"--s3_staging_location" = "s3://${module.athena_storage_data_source.bucket_id}/housing/nec-migration-data-quality-tests/"
28+
"--target_database" = "housing_nec_migration"
29+
"--target_table" = "housing_nec_data_loads_dq_tests"
30+
}
31+
32+
script_name = "housing_nec_migration_apply_gx_dq_tests"
33+
}

0 commit comments

Comments
 (0)