1010import great_expectations as gx
1111import pandas as pd
1212from pyathena import connect
13- from scripts .helpers .housing_nec_migration_gx_dq_inputs import sql_config , table_list , partition_keys
13+ from scripts .helpers .housing_nec_migration_gx_dq_inputs import (
14+ sql_config ,
15+ table_list ,
16+ partition_keys ,
17+ )
1418import scripts .jobs .housing .housing_nec_migration_properties_data_load_gx_suite
1519
1620logging .basicConfig (level = logging .INFO )
1721logger = logging .getLogger (__name__ )
1822
19- arg_keys = ['region_name' , 's3_endpoint' , 's3_target_location' , 's3_staging_location' , 'target_database' ,
20- 'target_table' ]
23+ arg_keys = [
24+ "region_name" ,
25+ "s3_endpoint" ,
26+ "s3_target_location" ,
27+ "s3_staging_location" ,
28+ "target_database" ,
29+ "target_table" ,
30+ ]
2131args = getResolvedOptions (sys .argv , arg_keys )
2232locals ().update (args )
2333
@@ -36,92 +46,110 @@ def main():
3646 table_results_df_list = []
3747
3848 for table in table_list :
39- logger .info (f' { table } loading...' )
49+ logger .info (f" { table } loading..." )
4050
41- sql_query = sql_config .get (table ).get (' sql' )
51+ sql_query = sql_config .get (table ).get (" sql" )
4252
43- conn = connect (s3_staging_dir = s3_staging_location ,
44- region_name = region_name )
53+ conn = connect (s3_staging_dir = s3_staging_location , region_name = region_name )
4554
4655 df = pd .read_sql_query (sql_query , conn )
4756
4857 # set up batch
4958 data_source = context .data_sources .add_pandas ("pandas" )
50- data_asset = data_source .add_dataframe_asset (name = f'{ table } _df_asset' )
51- batch_definition = data_asset .add_batch_definition_whole_dataframe ("Athena batch definition" )
59+ data_asset = data_source .add_dataframe_asset (name = f"{ table } _df_asset" )
60+ batch_definition = data_asset .add_batch_definition_whole_dataframe (
61+ "Athena batch definition"
62+ )
5263 batch_parameters = {"dataframe" : df }
5364
5465 # get expectation suite for dataset
55- suite = context .suites .get (name = ' properties_data_load_suite' )
66+ suite = context .suites .get (name = " properties_data_load_suite" )
5667
5768 validation_definition = gx .ValidationDefinition (
58- data = batch_definition ,
59- suite = suite ,
60- name = f'validation_definition_{ table } ' )
61- validation_definition = context .validation_definitions .add (validation_definition )
69+ data = batch_definition , suite = suite , name = f"validation_definition_{ table } "
70+ )
71+ validation_definition = context .validation_definitions .add (
72+ validation_definition
73+ )
6274
6375 # create and start checking data with checkpoints
6476 checkpoint = context .checkpoints .add (
6577 gx .checkpoint .checkpoint .Checkpoint (
66- name = f' { table } _checkpoint' ,
78+ name = f" { table } _checkpoint" ,
6779 validation_definitions = [validation_definition ],
68- result_format = {"result_format" : "COMPLETE" ,
69- "return_unexpected_index_query" : False ,
70- "partial_unexpected_count" : 0 }
80+ result_format = {
81+ "result_format" : "COMPLETE" ,
82+ "return_unexpected_index_query" : False ,
83+ "partial_unexpected_count" : 0 ,
84+ },
7185 )
7286 )
7387
7488 checkpoint_result = checkpoint .run (batch_parameters = batch_parameters )
7589 results_dict = list (checkpoint_result .run_results .values ())[0 ].to_json_dict ()
76- table_results_df = pd .json_normalize (results_dict ['results' ])
77- cols_to_drop = [c for c in table_results_df .columns if c .startswith ('exception_info' )]
78- cols_to_drop .append ('result.unexpected_list' )
90+ table_results_df = pd .json_normalize (results_dict ["results" ])
91+ cols_to_drop = [
92+ c for c in table_results_df .columns if c .startswith ("exception_info" )
93+ ]
94+ cols_to_drop .append ("result.unexpected_list" )
7995 table_results_df = table_results_df .drop (columns = cols_to_drop )
8096 table_results_df_list .append (table_results_df )
8197
8298 # generate id lists for each unexpected result set
83- query_df = table_results_df .loc [(~ table_results_df ['result.unexpected_index_list' ].isna ()) & (
84- table_results_df ['result.unexpected_index_list' ].values != '[]' )]
99+ query_df = table_results_df .loc [
100+ (~ table_results_df ["result.unexpected_index_list" ].isna ())
101+ & (table_results_df ["result.unexpected_index_list" ].values != "[]" )
102+ ]
85103
86- table_results_df [' unexpected_id_list' ] = pd .Series (dtype = ' object' )
104+ table_results_df [" unexpected_id_list" ] = pd .Series (dtype = " object" )
87105 for i , row in query_df .iterrows ():
88- table_results_df .loc [i , 'unexpected_id_list' ] = str (
89- list (df [sql_config .get (table ).get ('id_field' )].iloc [row ['result.unexpected_index_list' ]]))
106+ table_results_df .loc [i , "unexpected_id_list" ] = str (
107+ list (
108+ df [sql_config .get (table ).get ("id_field" )].iloc [
109+ row ["result.unexpected_index_list" ]
110+ ]
111+ )
112+ )
90113
91114 results_df = pd .concat (table_results_df_list )
92115
93- # map DQ dimension type
94- results_df ['dq_dimension_type' ] = results_df ['expectation_config.type' ].map (dq_dimensions_map )
95-
96116 # add clean dataset name
97- results_df ['dataset_name' ] = results_df ['expectation_config.kwargs.batch_id' ].map (
98- lambda x : x .removeprefix ('pandas-' ).removesuffix ('_df_asset' ))
117+ results_df ["dataset_name" ] = results_df ["expectation_config.kwargs.batch_id" ].map (
118+ lambda x : x .removeprefix ("pandas-" ).removesuffix ("_df_asset" )
119+ )
99120
100121 # add composite key for each specific test (so can be tracked over time)
101- results_df .insert (loc = 0 , column = 'expectation_key' ,
102- value = results_df .set_index (['expectation_config.type' , 'dataset_name' ]).index .factorize ()[0 ] + 1 )
103- results_df ['expectation_id' ] = results_df ['expectation_config.type' ] + "_" + results_df ['dataset_name' ]
104- results_df ['import_date' ] = datetime .today ().strftime ('%Y%m%d' )
122+ results_df .insert (
123+ loc = 0 ,
124+ column = "expectation_key" ,
125+ value = results_df .set_index (
126+ ["expectation_config.type" , "dataset_name" ]
127+ ).index .factorize ()[0 ]
128+ + 1 ,
129+ )
130+ results_df ["expectation_id" ] = (
131+ results_df ["expectation_config.type" ] + "_" + results_df ["dataset_name" ]
132+ )
133+ results_df ["import_date" ] = datetime .today ().strftime ("%Y%m%d" )
105134
106135 # set dtypes for Athena
107- dtype_dict = {'expectation_config.type' : 'string' ,
108- 'expectation_config.kwargs.batch_id' : 'string' ,
109- 'expectation_config.kwargs.column' : 'string' ,
110- 'expectation_config.kwargs.min_value' : 'string' ,
111- 'expectation_config.kwargs.max_value' : 'string' ,
112- 'result.element_count' : 'bigint' ,
113- 'result.unexpected_count' : 'bigint' ,
114- 'result.missing_count' : 'bigint' ,
115- 'result.partial_unexpected_list' : 'array<string>' ,
116- 'result.unexpected_index_list' : 'array<bigint>' ,
117- 'result.unexpected_index_query' : 'string' ,
118- 'expectation_config.kwargs.regex' : 'string' ,
119- 'expectation_config.kwargs.value_set' : 'string' ,
120- 'expectation_config.kwargs.column_list' : 'string' ,
121- 'import_year' : 'string' ,
122- 'import_month' : 'string' ,
123- 'import_day' : 'string' ,
124- 'import_date' : 'string' }
136+ dtype_dict = {
137+ "expectation_config.type" : "string" ,
138+ "expectation_config.kwargs.batch_id" : "string" ,
139+ "expectation_config.kwargs.column" : "string" ,
140+ "expectation_config.kwargs.min_value" : "string" ,
141+ "expectation_config.kwargs.max_value" : "string" ,
142+ "result.element_count" : "bigint" ,
143+ "result.unexpected_count" : "bigint" ,
144+ "result.missing_count" : "bigint" ,
145+ "result.partial_unexpected_list" : "array<string>" ,
146+ "result.unexpected_index_list" : "array<bigint>" ,
147+ "result.unexpected_index_query" : "string" ,
148+ "expectation_config.kwargs.regex" : "string" ,
149+ "expectation_config.kwargs.value_set" : "string" ,
150+ "expectation_config.kwargs.column_list" : "string" ,
151+ "import_date" : "string" ,
152+ }
125153
126154 # write to s3
127155 wr .s3 .to_parquet (
@@ -132,11 +160,13 @@ def main():
132160 table = target_table ,
133161 mode = "overwrite_partitions" ,
134162 partition_cols = partition_keys ,
135- dtype = dtype_dict
163+ dtype = dtype_dict ,
136164 )
137165
138- logger .info (f'Data Quality test results for NEC data loads written to { s3_target_location } ' )
166+ logger .info (
167+ f"Data Quality test results for NEC data loads written to { s3_target_location } "
168+ )
139169
140170
141- if __name__ == ' __main__' :
171+ if __name__ == " __main__" :
142172 main ()
0 commit comments