Skip to content

Commit 539f8cd

Browse files
committed
Add new script and job for creating a Data Quality test metatdata table to add additional information to accompany Housing GX data quality test results.
1 parent d297fec commit 539f8cd

File tree

1 file changed

+85
-0
lines changed

1 file changed

+85
-0
lines changed
Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
# flake8: noqa: F821
2+
3+
import awswrangler as wr
4+
from datetime import datetime
5+
import logging
6+
import sys
7+
8+
from awsglue.utils import getResolvedOptions
9+
import great_expectations as gx
10+
import pandas as pd
11+
from scripts.helpers.housing_gx_dq_inputs import table_list, partition_keys
12+
import scripts.jobs.housing.housing_person_reshape_gx_suite
13+
import scripts.jobs.housing.housing_tenure_reshape_gx_suite
14+
import scripts.jobs.housing.housing_contacts_reshape_gx_suite
15+
import scripts.jobs.housing.housing_assets_reshape_gx_suite
16+
import scripts.jobs.housing.housing_homeowner_record_sheet_gx_suite
17+
import scripts.jobs.housing.housing_dwellings_list_gx_suite
18+
19+
logging.basicConfig(level=logging.INFO)
20+
logger = logging.getLogger(__name__)
21+
22+
arg_keys = ['region_name', 's3_endpoint', 's3_target_location', 's3_staging_location', 'target_database',
23+
'target_table']
24+
args = getResolvedOptions(sys.argv, arg_keys)
25+
locals().update(args)
26+
27+
28+
def main():
29+
# add GX context
30+
context = gx.get_context(mode="file", project_root_dir=s3_target_location)
31+
32+
df_all_suite_list = []
33+
34+
for table in table_list:
35+
36+
# get expectation suite for dataset
37+
suite = context.suites.get(name=f'{table}_suite')
38+
expectations = suite.expectations
39+
40+
# drop columns not needed
41+
cols_to_drop = ['notes', 'result_format', 'catch_exceptions',
42+
'rendered_content', 'windows', 'batch_id']
43+
44+
suite_df = pd.DataFrame()
45+
for i in expectations:
46+
temp_i = i
47+
temp_df = pd.json_normalize(dict(temp_i))
48+
temp_df['expectation_type'] = temp_i.expectation_type
49+
temp_df['dataset_name'] = table
50+
temp_df = temp_df.drop(columns=cols_to_drop)
51+
suite_df = pd.concat([suite_df, temp_df])
52+
53+
df_all_suite_list.append(suite_df)
54+
55+
df = pd.concat(df_all_suite_list)
56+
57+
# add expectation_id
58+
df['expectation_id'] = df['expectation_type'] + "_" + df['dataset_name']
59+
60+
df['import_year'] = datetime.today().year
61+
df['import_month'] = datetime.today().month
62+
df['import_day'] = datetime.today().day
63+
df['import_date'] = datetime.today().strftime('%Y%m%d')
64+
65+
# set dtypes for Athena with default of string
66+
dict_values = ['string' for _ in range(len(df.columns))]
67+
dtype_dict = dict(zip(df.columns, dict_values))
68+
69+
# write to s3
70+
wr.s3.to_parquet(
71+
df=df,
72+
path=s3_target_location,
73+
dataset=True,
74+
database=target_database,
75+
table=target_table,
76+
mode="overwrite",
77+
partition_cols=partition_keys,
78+
dtype=dtype_dict
79+
)
80+
81+
logger.info(f'GX Data Quality test metadata written to {s3_target_location}')
82+
83+
84+
if __name__ == '__main__':
85+
main()

0 commit comments

Comments
 (0)