1010import great_expectations as gx
1111import pandas as pd
1212from pyathena import connect
13- from scripts .helpers .housing_nec_migration_gx_dq_inputs import sql_config , data_load_list , table_list
13+ from scripts .helpers .housing_nec_migration_gx_dq_inputs import (
14+ sql_config ,
15+ data_load_list ,
16+ table_list ,
17+ )
1418import scripts .jobs .housing .housing_nec_migration_properties_data_load_gx_suite
1519
1620logging .basicConfig (level = logging .INFO )
@@ -48,12 +52,14 @@ def main():
4852 table_results_df_list = []
4953
5054 for data_load in data_load_list :
51- logger .info (f' { data_load } loading...' )
55+ logger .info (f" { data_load } loading..." )
5256
5357 for table in table_list .get (data_load ):
5458 logger .info (f"{ table } loading..." )
5559
56- sql_query , id_field = get_sql_query (sql_config = sql_config , data_load = data_load , table = table )
60+ sql_query , id_field = get_sql_query (
61+ sql_config = sql_config , data_load = data_load , table = table
62+ )
5763
5864 conn = connect (s3_staging_dir = s3_staging_location , region_name = region_name )
5965
@@ -71,7 +77,9 @@ def main():
7177 suite = context .suites .get (name = f"{ data_load } _data_load_suite" )
7278
7379 validation_definition = gx .ValidationDefinition (
74- data = batch_definition , suite = suite , name = f"validation_definition_{ table } "
80+ data = batch_definition ,
81+ suite = suite ,
82+ name = f"validation_definition_{ table } " ,
7583 )
7684 validation_definition = context .validation_definitions .add (
7785 validation_definition
@@ -91,7 +99,9 @@ def main():
9199 )
92100
93101 checkpoint_result = checkpoint .run (batch_parameters = batch_parameters )
94- results_dict = list (checkpoint_result .run_results .values ())[0 ].to_json_dict ()
102+ results_dict = list (checkpoint_result .run_results .values ())[
103+ 0
104+ ].to_json_dict ()
95105 table_results_df = pd .json_normalize (results_dict ["results" ])
96106 cols_not_needed = ["result.unexpected_list" , "result.observed_value" ]
97107 cols_to_drop = [
@@ -112,11 +122,7 @@ def main():
112122 table_results_df ["unexpected_id_list" ] = pd .Series (dtype = "object" )
113123 for i , row in query_df .iterrows ():
114124 table_results_df .loc [i , "unexpected_id_list" ] = str (
115- list (
116- df [id_field ].iloc [
117- row ["result.unexpected_index_list" ]
118- ]
119- )
125+ list (df [id_field ].iloc [row ["result.unexpected_index_list" ]])
120126 )
121127
122128 results_df = pd .concat (table_results_df_list )
0 commit comments