Skip to content

Commit baf1581

Browse files
committed
Add metadata table to GX DQ test Glue job
1 parent c946b3c commit baf1581

File tree

3 files changed

+58
-10
lines changed

3 files changed

+58
-10
lines changed

scripts/helpers/housing_nec_migration_gx_dq_inputs.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
"properties_1c",
1010
"properties_1d",
1111
"properties_1e",
12+
"properties_2a",
1213
"properties_4a",
1314
"properties_4c",
1415
]

scripts/jobs/housing/housing_nec_migration_apply_gx_dq_tests.py

Lines changed: 52 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,9 @@
2727
"s3_staging_location",
2828
"target_database",
2929
"target_table",
30+
"target_table_metadata",
31+
"s3_target_location_metadata",
32+
"s3_target_location_results",
3033
]
3134
args = getResolvedOptions(sys.argv, arg_keys)
3235
locals().update(args)
@@ -50,6 +53,7 @@ def main():
5053
context = gx.get_context(mode="file", project_root_dir=s3_target_location)
5154

5255
table_results_df_list = []
56+
df_all_suite_list = []
5357

5458
for data_load in data_load_list:
5559
logger.info(f"{data_load} loading...")
@@ -75,6 +79,7 @@ def main():
7579

7680
# get expectation suite for dataset
7781
suite = context.suites.get(name=f"{data_load}_data_load_suite")
82+
expectations = suite.expectations
7883

7984
validation_definition = gx.ValidationDefinition(
8085
data=batch_definition,
@@ -125,14 +130,45 @@ def main():
125130
list(df[id_field].iloc[row["result.unexpected_index_list"]])
126131
)
127132

133+
# drop columns not needed in metatdata
134+
cols_to_drop_meta = [
135+
"notes",
136+
"result_format",
137+
"catch_exceptions",
138+
"rendered_content",
139+
"windows",
140+
]
141+
142+
suite_df = pd.DataFrame()
143+
for i in expectations:
144+
temp_i = i
145+
temp_df = pd.json_normalize(dict(temp_i))
146+
temp_df["expectation_type"] = temp_i.expectation_type
147+
temp_df["dataset_name"] = table
148+
temp_df = temp_df.drop(columns=cols_to_drop_meta)
149+
suite_df = pd.concat([suite_df, temp_df])
150+
151+
df_all_suite_list.append(suite_df)
152+
128153
results_df = pd.concat(table_results_df_list)
154+
metadata_df = pd.concat(df_all_suite_list)
155+
156+
# add expectation_id
157+
metadata_df["expectation_id"] = (
158+
metadata_df["expectation_type"] + "_" + metadata_df["dataset_name"]
159+
)
160+
metadata_df["import_date"] = datetime.today().strftime("%Y%m%d")
161+
162+
# set dtypes for Athena with default of string
163+
dict_values = ["string" for _ in range(len(metadata_df.columns))]
164+
dtype_dict_metadata = dict(zip(metadata_df.columns, dict_values))
129165

130166
# add clean dataset name
131167
results_df["dataset_name"] = results_df["expectation_config.kwargs.batch_id"].map(
132168
lambda x: x.removeprefix("pandas-").removesuffix("_df_asset")
133169
)
134170

135-
# add composite key for each specific test (so can be tracked over time)
171+
# add composite key for each test (so can be tracked over time)
136172
results_df.insert(
137173
loc=0,
138174
column="expectation_key",
@@ -146,8 +182,8 @@ def main():
146182
)
147183
results_df["import_date"] = datetime.today().strftime("%Y%m%d")
148184

149-
# set dtypes for Athena
150-
dtype_dict = {
185+
# # set dtypes for Athena
186+
dtype_dict_results = {
151187
"expectation_config.type": "string",
152188
"expectation_config.kwargs.batch_id": "string",
153189
"expectation_config.kwargs.column": "string",
@@ -165,20 +201,27 @@ def main():
165201
"import_date": "string",
166202
}
167203

168-
# write to s3
204+
# TODO for df_vars in [[results_df, dtype_dict_results, target_table], [metadata_df, dtype_dict_metadata, target_metadata_table]]:
205+
# will loop the writing of these tables
206+
169207
wr.s3.to_parquet(
170208
df=results_df,
171-
path=s3_target_location,
209+
path=s3_target_location_results,
172210
dataset=True,
173211
database=target_database,
174212
table=target_table,
175213
mode="overwrite",
176-
dtype=dtype_dict,
177-
schema_evolution=True,
214+
dtype=dtype_dict_results,
178215
)
179216

180-
logger.info(
181-
f"Data Quality test results for NEC data loads written to {s3_target_location}"
217+
wr.s3.to_parquet(
218+
df=metadata_df,
219+
path=s3_target_location_metadata,
220+
dataset=True,
221+
database=target_database,
222+
table=target_table_metadata,
223+
mode="overwrite",
224+
dtype=dtype_dict_metadata,
182225
)
183226

184227

terraform/etl/54-aws-glue-housing-nec-migration-apply-gx-dq-tests.tf

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,17 +16,21 @@ module "housing_nec_migration_apply_gx_dq_tests" {
1616
trigger_enabled = local.is_production_environment
1717
number_of_workers_for_glue_job = 2
1818
schedule = "cron(0 10 ? * MON-FRI *)"
19-
job_parameters = {
19+
job_parameters = {
2020
"--job-bookmark-option" = "job-bookmark-enable"
2121
"--enable-glue-datacatalog" = "true"
2222
"--enable-continuous-cloudwatch-log" = "true"
2323
"--additional-python-modules" = "great_expectations==1.5.8,PyAthena,numpy==1.26.1,awswrangler==3.10.0"
2424
"--region_name" = data.aws_region.current.name
2525
"--s3_endpoint" = "https://s3.${data.aws_region.current.name}.amazonaws.com"
2626
"--s3_target_location" = "s3://${module.raw_zone_data_source.bucket_id}/housing/nec-migration-data-quality-tests/"
27+
"--s3_target_location_metadata" = "s3://${module.raw_zone_data_source.bucket_id}/housing/nec-migration-data-quality-tests/metadata/"
28+
"--s3_target_location_results" = "s3://${module.raw_zone_data_source.bucket_id}/housing/nec-migration-data-quality-tests/results/"
2729
"--s3_staging_location" = "s3://${module.athena_storage_data_source.bucket_id}/housing/nec-migration-data-quality-tests/"
2830
"--target_database" = "housing_nec_migration"
2931
"--target_table" = "housing_nec_data_loads_dq_tests"
32+
"--target_table_metadata" = "housing_nec_data_loads_dq_metadata"
33+
3034
}
3135

3236
script_name = "housing_nec_migration_apply_gx_dq_tests"

0 commit comments

Comments
 (0)