@@ -83,13 +83,13 @@ def loadIncrementsSinceDate(
8383) -> DataFrame :
8484 """
8585 Loads increments from the specified catalog table starting from a given date.
86- If the provided date is None, it defaults to 30 days ago.
86+ If the provided date is None, it defaults to 60 days ago.
8787
8888 Returns:
8989 DataFrame: A Spark DataFrame containing the loaded increments.
9090 """
9191 if date is None :
92- date = (datetime .now () - timedelta (days = 30 )).strftime ("%Y%m%d" ) # default date
92+ date = (datetime .now () - timedelta (days = 60 )).strftime ("%Y%m%d" ) # default date
9393 increment_ddf = glueContext .create_dynamic_frame .from_catalog (
9494 name_space = name_space ,
9595 table_name = increment_table_name ,
@@ -149,6 +149,32 @@ def loadIncrementsSinceDate(
149149 "enforcement_breach_details" : {"unique" : ["id" ]},
150150}
151151
152+
153+ def purge_today_partition (
154+ glueContext : GlueContext , target_destination : str , retentionPeriod : int = 0
155+ ) -> None :
156+ """
157+ Purges (delete) only today's partition under the given target destination.
158+
159+ Parameters:
160+ glueContext: GlueContext instance.
161+ target_destination: Base S3 path (e.g., "s3://your-bucket/path").
162+ retentionPeriod: Retention period in hours (default 0, meaning delete all files immediately).
163+
164+ Returns:
165+ partition_path: The S3 partition path that was purged.
166+ """
167+ now = datetime .now ()
168+ snapshot_year = str (now .year )
169+ snapshot_month = str (now .month ).zfill (2 )
170+ snapshot_day = str (now .day ).zfill (2 )
171+ snapshot_date = snapshot_year + snapshot_month + snapshot_day
172+
173+ partition_path = f"{ target_destination } /snapshot_year={ snapshot_year } /snapshot_month={ snapshot_month } /snapshot_day={ snapshot_day } /snapshot_date={ snapshot_date } "
174+
175+ glueContext .purge_s3_path (partition_path , {"retentionPeriod" : retentionPeriod })
176+
177+
152178if __name__ == "__main__" :
153179 args = getResolvedOptions (sys .argv , ["JOB_NAME" ])
154180
@@ -208,14 +234,18 @@ def loadIncrementsSinceDate(
208234 # snapshot table in glue catalogue
209235 else :
210236 pushDownPredicate = create_pushdown_predicate (
211- partitionDateColumn = "snapshot_date" , daysBuffer = 3
237+ partitionDateColumn = "snapshot_date" , daysBuffer = 60
212238 )
213239 # load latest snpashot
214240 snapshot_ddf = glueContext .create_dynamic_frame .from_catalog (
215241 name_space = source_catalog_database ,
216242 table_name = snapshot_table_name ,
217- # push_down_predicate=pushDownPredicate
243+ push_down_predicate = pushDownPredicate ,
218244 )
245+ if snapshot_ddf .count () == 0 :
246+ logger .error (
247+ f"No data returned for table { snapshot_table_name } using push_down_predicate: { pushDownPredicate } . "
248+ )
219249 snapshot_df = snapshot_ddf .toDF ()
220250 snapshot_df = get_latest_snapshot (snapshot_df )
221251 last_snapshot_date = snapshot_df .select (max ("snapshot_date" )).first ()[0 ]
@@ -248,6 +278,7 @@ def loadIncrementsSinceDate(
248278 # apply COU
249279 logger .info (f"Applying increment { increment_table_name } " )
250280 snapshot_df = apply_increments (snapshot_df , increment_df )
281+
251282 else :
252283 logger .info (
253284 f"Couldn't find table { increment_table_name } in database { source_catalog_database } , saving same snapshot as yesterday"
@@ -337,6 +368,9 @@ def loadIncrementsSinceDate(
337368 snapshot_df , glueContext , "resultDataFrame"
338369 )
339370 target_destination = s3_bucket_target + table_name
371+
372+ # Clean up today's partition before writing
373+ purge_today_partition (glueContext , target_destination )
340374 parquetData = glueContext .write_dynamic_frame .from_options (
341375 frame = resultDataFrame ,
342376 connection_type = "s3" ,
@@ -346,6 +380,7 @@ def loadIncrementsSinceDate(
346380 "partitionKeys" : PARTITION_KEYS ,
347381 },
348382 )
383+
349384 job .commit ()
350385 finally :
351386 if len (dq_errors ) > 0 :
0 commit comments