@@ -149,6 +149,32 @@ def loadIncrementsSinceDate(
149149 "enforcement_breach_details" : {"unique" : ["id" ]},
150150}
151151
152+
153+ def purge_today_partition (
154+ glueContext : GlueContext , target_destination : str , retentionPeriod : int = 0
155+ ) -> None :
156+ """
157+ Purges (delete) only today's partition under the given target destination.
158+
159+ Parameters:
160+ glueContext: GlueContext instance.
161+ target_destination: Base S3 path (e.g., "s3://your-bucket/path").
162+ retentionPeriod: Retention period in hours (default 0, meaning delete all files immediately).
163+
164+ Returns:
165+ partition_path: The S3 partition path that was purged.
166+ """
167+ now = datetime .now ()
168+ snapshot_year = str (now .year )
169+ snapshot_month = str (now .month ).zfill (2 )
170+ snapshot_day = str (now .day ).zfill (2 )
171+ snapshot_date = snapshot_year + snapshot_month + snapshot_day
172+
173+ partition_path = f"{ target_destination } /snapshot_year={ snapshot_year } /snapshot_month={ snapshot_month } /snapshot_day={ snapshot_day } /snapshot_date={ snapshot_date } "
174+
175+ glueContext .purge_s3_path (partition_path , {"retentionPeriod" : retentionPeriod })
176+
177+
152178if __name__ == "__main__" :
153179 args = getResolvedOptions (sys .argv , ["JOB_NAME" ])
154180
@@ -342,6 +368,9 @@ def loadIncrementsSinceDate(
342368 snapshot_df , glueContext , "resultDataFrame"
343369 )
344370 target_destination = s3_bucket_target + table_name
371+
372+ # Clean up today's partition before writing
373+ purge_today_partition (glueContext , target_destination )
345374 parquetData = glueContext .write_dynamic_frame .from_options (
346375 frame = resultDataFrame ,
347376 connection_type = "s3" ,
@@ -351,6 +380,7 @@ def loadIncrementsSinceDate(
351380 "partitionKeys" : PARTITION_KEYS ,
352381 },
353382 )
383+
354384 job .commit ()
355385 finally :
356386 if len (dq_errors ) > 0 :
0 commit comments