11import sys
2- from datetime import datetime , timedelta
3- from typing import Optional
2+ from datetime import datetime
43
4+ import pydeequ
55import pyspark .sql .functions as F
66from awsglue .context import GlueContext
77from awsglue .dynamicframe import DynamicFrame
1111from pydeequ .repository import FileSystemMetricsRepository , ResultKey
1212from pydeequ .verification import VerificationResult , VerificationSuite
1313from pyspark import SparkContext
14- from pyspark .sql import DataFrame , SparkSession , Window
14+ from pyspark .sql import SparkSession , Window
1515from pyspark .sql .functions import col , max
1616
1717from scripts .helpers .data_quality_testing import (
2727)
2828
2929
30- def get_latest_snapshot (dfa : DataFrame ) -> DataFrame :
31- if "snapshot_date" not in dfa .columns :
32- logger .warn (
33- "No snapshot_date column found in the dataframe, adding snapshot date columns"
34- )
35- dfa = add_snapshot_date_columns (dfa )
36- max_date = dfa .select (max ("snapshot_date" )).first ()[0 ]
37- df_latest_snapshot = dfa .where (col ("snapshot_date" ) == max_date )
38- return df_latest_snapshot
30+ def get_latest_snapshot (dfa ):
31+ dfa = dfa .where (col ("snapshot_date" ) == dfa .select (max ("snapshot_date" )).first ()[0 ])
32+ return dfa
3933
4034
4135def add_snapshot_date_columns (data_frame ):
@@ -78,22 +72,11 @@ def apply_increments(snapshot_df, increment_df):
7872 return snapshot_df
7973
8074
81- def loadIncrementsSinceDate (
82- increment_table_name : str , name_space : str , date : Optional [str ] = None
83- ) -> DataFrame :
84- """
85- Loads increments from the specified catalog table starting from a given date.
86- If the provided date is None, it defaults to 60 days ago.
87-
88- Returns:
89- DataFrame: A Spark DataFrame containing the loaded increments.
90- """
91- if date is None :
92- date = (datetime .now () - timedelta (days = 60 )).strftime ("%Y%m%d" ) # default date
75+ def loadIncrementsSinceDate (increment_table_name , name_space , date ):
9376 increment_ddf = glueContext .create_dynamic_frame .from_catalog (
9477 name_space = name_space ,
9578 table_name = increment_table_name ,
96- push_down_predicate = f"import_date>=' { date } ' " ,
79+ push_down_predicate = f"import_date>={ date } " ,
9780 transformation_ctx = f"datasource_{ increment_table_name } " ,
9881 )
9982 increment_df = increment_ddf .toDF ()
@@ -149,32 +132,6 @@ def loadIncrementsSinceDate(
149132 "enforcement_breach_details" : {"unique" : ["id" ]},
150133}
151134
152-
153- def purge_today_partition (
154- glueContext : GlueContext , target_destination : str , retentionPeriod : int = 0
155- ) -> None :
156- """
157- Purges (delete) only today's partition under the given target destination.
158-
159- Parameters:
160- glueContext: GlueContext instance.
161- target_destination: Base S3 path (e.g., "s3://your-bucket/path").
162- retentionPeriod: Retention period in hours (default 0, meaning delete all files immediately).
163-
164- Returns:
165- partition_path: The S3 partition path that was purged.
166- """
167- now = datetime .now ()
168- snapshot_year = str (now .year )
169- snapshot_month = str (now .month ).zfill (2 )
170- snapshot_day = str (now .day ).zfill (2 )
171- snapshot_date = snapshot_year + snapshot_month + snapshot_day
172-
173- partition_path = f"{ target_destination } /snapshot_year={ snapshot_year } /snapshot_month={ snapshot_month } /snapshot_day={ snapshot_day } /snapshot_date={ snapshot_date } "
174-
175- glueContext .purge_s3_path (partition_path , {"retentionPeriod" : retentionPeriod })
176-
177-
178135if __name__ == "__main__" :
179136 args = getResolvedOptions (sys .argv , ["JOB_NAME" ])
180137
@@ -216,11 +173,10 @@ def purge_today_partition(
216173 f"No snapshot and no increment for { increment_table_name } , going to the next table"
217174 )
218175 continue
219-
220- # Load increments from default date
221176 increment_df = loadIncrementsSinceDate (
222177 increment_table_name = increment_table_name ,
223178 name_space = source_catalog_database ,
179+ date = "20210101" ,
224180 )
225181 if increment_df .rdd .isEmpty ():
226182 logger .info (
@@ -234,18 +190,14 @@ def purge_today_partition(
234190 # snapshot table in glue catalogue
235191 else :
236192 pushDownPredicate = create_pushdown_predicate (
237- partitionDateColumn = "snapshot_date" , daysBuffer = 60
193+ partitionDateColumn = "snapshot_date" , daysBuffer = 3
238194 )
239195 # load latest snpashot
240196 snapshot_ddf = glueContext .create_dynamic_frame .from_catalog (
241197 name_space = source_catalog_database ,
242198 table_name = snapshot_table_name ,
243199 push_down_predicate = pushDownPredicate ,
244200 )
245- if snapshot_ddf .count () == 0 :
246- logger .error (
247- f"No data returned for table { snapshot_table_name } using push_down_predicate: { pushDownPredicate } . "
248- )
249201 snapshot_df = snapshot_ddf .toDF ()
250202 snapshot_df = get_latest_snapshot (snapshot_df )
251203 last_snapshot_date = snapshot_df .select (max ("snapshot_date" )).first ()[0 ]
@@ -278,7 +230,6 @@ def purge_today_partition(
278230 # apply COU
279231 logger .info (f"Applying increment { increment_table_name } " )
280232 snapshot_df = apply_increments (snapshot_df , increment_df )
281-
282233 else :
283234 logger .info (
284235 f"Couldn't find table { increment_table_name } in database { source_catalog_database } , saving same snapshot as yesterday"
@@ -347,7 +298,7 @@ def purge_today_partition(
347298
348299 except Exception as verificationError :
349300 logger .info (
350- "Job cancelled due to data quality test failure, continuing to next table."
301+ f "Job cancelled due to data quality test failure, continuing to next table."
351302 )
352303 message = verificationError .args
353304 logger .info (f"{ message [0 ]} " )
@@ -358,7 +309,7 @@ def purge_today_partition(
358309
359310 else :
360311 logger .info (
361- "Data quality tests passed, appending data quality results to JSON and moving on to writing data"
312+ f "Data quality tests passed, appending data quality results to JSON and moving on to writing data"
362313 )
363314 verificationSuite .saveOrAppendResult (resultKey ).run ()
364315
@@ -368,9 +319,6 @@ def purge_today_partition(
368319 snapshot_df , glueContext , "resultDataFrame"
369320 )
370321 target_destination = s3_bucket_target + table_name
371-
372- # Clean up today's partition before writing
373- purge_today_partition (glueContext , target_destination )
374322 parquetData = glueContext .write_dynamic_frame .from_options (
375323 frame = resultDataFrame ,
376324 connection_type = "s3" ,
@@ -380,7 +328,6 @@ def purge_today_partition(
380328 "partitionKeys" : PARTITION_KEYS ,
381329 },
382330 )
383-
384331 job .commit ()
385332 finally :
386333 if len (dq_errors ) > 0 :
0 commit comments