Skip to content

Commit 223bdaf

Browse files
authored
Update GX DQ script and inputs to include more tables and data loads (#2436)
* Update GX DQ script and inputs to include more tables and data loads * Fix formatting
1 parent a80fbbd commit 223bdaf

File tree

2 files changed

+132
-88
lines changed

2 files changed

+132
-88
lines changed
Lines changed: 13 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,22 +1,17 @@
1-
sql_config = {
2-
"properties_1a": {
3-
"sql": """ SELECT *
4-
FROM "housing_nec_migration"."properties_1a" """,
5-
"id_field": "LPRO_PROPREF",
6-
},
7-
"properties_1b": {
8-
"sql": """ SELECT *
9-
FROM "housing_nec_migration"."properties_1b" """,
10-
"id_field": "LPRO_PROPREF",
11-
},
12-
"properties_1c": {
13-
"sql": """ SELECT *
14-
FROM "housing_nec_migration"."properties_1c" """,
15-
"id_field": "LPRO_PROPREF",
16-
},
17-
}
1+
sql_config = {"properties": {"id_field": "LPRO_PROPREF"}}
182

3+
data_load_list = ["properties"]
194

20-
table_list = ["properties_1a", "properties_1b", "properties_1c"]
5+
table_list = {
6+
"properties": [
7+
"properties_1a",
8+
"properties_1b",
9+
"properties_1c",
10+
"properties_1d",
11+
"properties_1e",
12+
"properties_4a",
13+
"properties_4c",
14+
]
15+
}
2116

2217
partition_keys = ["import_date"]

scripts/jobs/housing/housing_nec_migration_apply_gx_dq_tests.py

Lines changed: 119 additions & 70 deletions
Original file line numberDiff line numberDiff line change
@@ -10,18 +10,34 @@
1010
import great_expectations as gx
1111
import pandas as pd
1212
from pyathena import connect
13-
from scripts.helpers.housing_nec_migration_gx_dq_inputs import sql_config, table_list
13+
from scripts.helpers.housing_nec_migration_gx_dq_inputs import (
14+
sql_config,
15+
data_load_list,
16+
table_list,
17+
)
1418
import scripts.jobs.housing.housing_nec_migration_properties_data_load_gx_suite
1519

1620
logging.basicConfig(level=logging.INFO)
1721
logger = logging.getLogger(__name__)
1822

19-
arg_keys = ['region_name', 's3_endpoint', 's3_target_location', 's3_staging_location', 'target_database',
20-
'target_table']
23+
arg_keys = [
24+
"region_name",
25+
"s3_endpoint",
26+
"s3_target_location",
27+
"s3_staging_location",
28+
"target_database",
29+
"target_table",
30+
]
2131
args = getResolvedOptions(sys.argv, arg_keys)
2232
locals().update(args)
2333

2434

35+
def get_sql_query(sql_config, data_load, table):
36+
query = f"SELECT * FROM housing_nec_migration.{table}"
37+
id_field = sql_config.get(data_load).get("id_field")
38+
return query, id_field
39+
40+
2541
def json_serial(obj):
2642
"""JSON serializer for objects not serializable by default."""
2743
if isinstance(obj, (datetime, date)):
@@ -35,88 +51,119 @@ def main():
3551

3652
table_results_df_list = []
3753

38-
for table in table_list:
39-
logger.info(f'{table} loading...')
40-
41-
sql_query = sql_config.get(table).get('sql')
54+
for data_load in data_load_list:
55+
logger.info(f"{data_load} loading...")
4256

43-
conn = connect(s3_staging_dir=s3_staging_location,
44-
region_name=region_name)
57+
for table in table_list.get(data_load):
58+
logger.info(f"{table} loading...")
4559

46-
df = pd.read_sql_query(sql_query, conn)
47-
48-
# set up batch
49-
data_source = context.data_sources.add_pandas(f'{table}_pandas')
50-
data_asset = data_source.add_dataframe_asset(name=f'{table}_df_asset')
51-
batch_definition = data_asset.add_batch_definition_whole_dataframe("Athena batch definition")
52-
batch_parameters = {"dataframe": df}
60+
sql_query, id_field = get_sql_query(
61+
sql_config=sql_config, data_load=data_load, table=table
62+
)
5363

54-
# get expectation suite for dataset
55-
suite = context.suites.get(name='properties_data_load_suite')
64+
conn = connect(s3_staging_dir=s3_staging_location, region_name=region_name)
5665

57-
validation_definition = gx.ValidationDefinition(
58-
data=batch_definition,
59-
suite=suite,
60-
name=f'validation_definition_{table}')
61-
validation_definition = context.validation_definitions.add(validation_definition)
66+
df = pd.read_sql_query(sql_query, conn)
6267

63-
# create and start checking data with checkpoints
64-
checkpoint = context.checkpoints.add(
65-
gx.checkpoint.checkpoint.Checkpoint(
66-
name=f'{table}_checkpoint',
67-
validation_definitions=[validation_definition],
68-
result_format={"result_format": "COMPLETE",
69-
"return_unexpected_index_query": False,
70-
"partial_unexpected_count": 0}
68+
# set up batch
69+
data_source = context.data_sources.add_pandas(f"{table}_pandas")
70+
data_asset = data_source.add_dataframe_asset(name=f"{table}_df_asset")
71+
batch_definition = data_asset.add_batch_definition_whole_dataframe(
72+
"Athena batch definition"
7173
)
72-
)
74+
batch_parameters = {"dataframe": df}
7375

74-
checkpoint_result = checkpoint.run(batch_parameters=batch_parameters)
75-
results_dict = list(checkpoint_result.run_results.values())[0].to_json_dict()
76-
table_results_df = pd.json_normalize(results_dict['results'])
77-
cols_not_needed = ['result.unexpected_list', 'result.observed_value']
78-
cols_to_drop = [c for c in table_results_df.columns if c.startswith('exception_info') or c in cols_not_needed]
76+
# get expectation suite for dataset
77+
suite = context.suites.get(name=f"{data_load}_data_load_suite")
7978

80-
table_results_df = table_results_df.drop(columns=cols_to_drop)
81-
table_results_df_list.append(table_results_df)
79+
validation_definition = gx.ValidationDefinition(
80+
data=batch_definition,
81+
suite=suite,
82+
name=f"validation_definition_{table}",
83+
)
84+
validation_definition = context.validation_definitions.add(
85+
validation_definition
86+
)
8287

83-
# generate id lists for each unexpected result set
84-
query_df = table_results_df.loc[(~table_results_df['result.unexpected_index_list'].isna()) & (
85-
table_results_df['result.unexpected_index_list'].values != '[]')]
88+
# create and start checking data with checkpoints
89+
checkpoint = context.checkpoints.add(
90+
gx.checkpoint.checkpoint.Checkpoint(
91+
name=f"{table}_checkpoint",
92+
validation_definitions=[validation_definition],
93+
result_format={
94+
"result_format": "COMPLETE",
95+
"return_unexpected_index_query": False,
96+
"partial_unexpected_count": 0,
97+
},
98+
)
99+
)
86100

87-
table_results_df['unexpected_id_list'] = pd.Series(dtype='object')
88-
for i, row in query_df.iterrows():
89-
table_results_df.loc[i, 'unexpected_id_list'] = str(
90-
list(df[sql_config.get(table).get('id_field')].iloc[row['result.unexpected_index_list']]))
101+
checkpoint_result = checkpoint.run(batch_parameters=batch_parameters)
102+
results_dict = list(checkpoint_result.run_results.values())[
103+
0
104+
].to_json_dict()
105+
table_results_df = pd.json_normalize(results_dict["results"])
106+
cols_not_needed = ["result.unexpected_list", "result.observed_value"]
107+
cols_to_drop = [
108+
c
109+
for c in table_results_df.columns
110+
if c.startswith("exception_info") or c in cols_not_needed
111+
]
112+
113+
table_results_df = table_results_df.drop(columns=cols_to_drop)
114+
table_results_df_list.append(table_results_df)
115+
116+
# generate id lists for each unexpected result set
117+
query_df = table_results_df.loc[
118+
(~table_results_df["result.unexpected_index_list"].isna())
119+
& (table_results_df["result.unexpected_index_list"].values != "[]")
120+
]
121+
122+
table_results_df["unexpected_id_list"] = pd.Series(dtype="object")
123+
for i, row in query_df.iterrows():
124+
table_results_df.loc[i, "unexpected_id_list"] = str(
125+
list(df[id_field].iloc[row["result.unexpected_index_list"]])
126+
)
91127

92128
results_df = pd.concat(table_results_df_list)
93129

94130
# add clean dataset name
95-
results_df['dataset_name'] = results_df['expectation_config.kwargs.batch_id'].map(
96-
lambda x: x.removeprefix('pandas-').removesuffix('_df_asset'))
131+
results_df["dataset_name"] = results_df["expectation_config.kwargs.batch_id"].map(
132+
lambda x: x.removeprefix("pandas-").removesuffix("_df_asset")
133+
)
97134

98135
# add composite key for each specific test (so can be tracked over time)
99-
results_df.insert(loc=0, column='expectation_key',
100-
value=results_df.set_index(['expectation_config.type', 'dataset_name']).index.factorize()[0] + 1)
101-
results_df['expectation_id'] = results_df['expectation_config.type'] + "_" + results_df['dataset_name']
102-
results_df['import_date'] = datetime.today().strftime('%Y%m%d')
136+
results_df.insert(
137+
loc=0,
138+
column="expectation_key",
139+
value=results_df.set_index(
140+
["expectation_config.type", "dataset_name"]
141+
).index.factorize()[0]
142+
+ 1,
143+
)
144+
results_df["expectation_id"] = (
145+
results_df["expectation_config.type"] + "_" + results_df["dataset_name"]
146+
)
147+
results_df["import_date"] = datetime.today().strftime("%Y%m%d")
103148

104149
# set dtypes for Athena
105-
dtype_dict = {'expectation_config.type': 'string',
106-
'expectation_config.kwargs.batch_id': 'string',
107-
'expectation_config.kwargs.column': 'string',
108-
'expectation_config.kwargs.min_value': 'string',
109-
'expectation_config.kwargs.max_value': 'string',
110-
'result.element_count': 'bigint',
111-
'result.unexpected_count': 'bigint',
112-
'result.missing_count': 'bigint',
113-
'result.partial_unexpected_list': 'array<string>',
114-
'result.unexpected_index_list': 'array<bigint>',
115-
'result.unexpected_index_query': 'string',
116-
'expectation_config.kwargs.regex': 'string',
117-
'expectation_config.kwargs.value_set': 'string',
118-
'expectation_config.kwargs.column_list': 'string',
119-
'import_date': 'string'}
150+
dtype_dict = {
151+
"expectation_config.type": "string",
152+
"expectation_config.kwargs.batch_id": "string",
153+
"expectation_config.kwargs.column": "string",
154+
"expectation_config.kwargs.min_value": "string",
155+
"expectation_config.kwargs.max_value": "string",
156+
"result.element_count": "bigint",
157+
"result.unexpected_count": "bigint",
158+
"result.missing_count": "bigint",
159+
"result.partial_unexpected_list": "array<string>",
160+
"result.unexpected_index_list": "array<bigint>",
161+
"result.unexpected_index_query": "string",
162+
"expectation_config.kwargs.regex": "string",
163+
"expectation_config.kwargs.value_set": "string",
164+
"expectation_config.kwargs.column_list": "string",
165+
"import_date": "string",
166+
}
120167

121168
# write to s3
122169
wr.s3.to_parquet(
@@ -127,11 +174,13 @@ def main():
127174
table=target_table,
128175
mode="overwrite",
129176
dtype=dtype_dict,
130-
schema_evolution=True
177+
schema_evolution=True,
131178
)
132179

133-
logger.info(f'Data Quality test results for NEC data loads written to {s3_target_location}')
180+
logger.info(
181+
f"Data Quality test results for NEC data loads written to {s3_target_location}"
182+
)
134183

135184

136-
if __name__ == '__main__':
185+
if __name__ == "__main__":
137186
main()

0 commit comments

Comments
 (0)