Skip to content

Commit 28c4e07

Browse files
committed
tascomi snapshot - change the daysBuffer and clean the partition before writing the data
1 parent 0d12464 commit 28c4e07

File tree

1 file changed

+27
-1
lines changed

1 file changed

+27
-1
lines changed

scripts/jobs/planning/tascomi_create_daily_snapshot.py

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,29 @@ def loadIncrementsSinceDate(increment_table_name, name_space, date):
8282
return increment_df
8383

8484

85+
def purge_today_partition(
86+
glueContext: GlueContext, target_destination: str, retentionPeriod: int = 0
87+
) -> None:
88+
"""
89+
Purges (delete) only today's partition under the given target destination.
90+
Parameters:
91+
glueContext: GlueContext instance.
92+
target_destination: Base S3 path (e.g., "s3://your-bucket/path").
93+
retentionPeriod: Retention period in hours (default 0, meaning delete all files immediately).
94+
Returns:
95+
partition_path: The S3 partition path that was purged.
96+
"""
97+
now = datetime.now()
98+
snapshot_year = str(now.year)
99+
snapshot_month = str(now.month).zfill(2)
100+
snapshot_day = str(now.day).zfill(2)
101+
snapshot_date = snapshot_year + snapshot_month + snapshot_day
102+
103+
partition_path = f"{target_destination}/snapshot_year={snapshot_year}/snapshot_month={snapshot_month}/snapshot_day={snapshot_day}/snapshot_date={snapshot_date}"
104+
105+
glueContext.purge_s3_path(partition_path, {"retentionPeriod": retentionPeriod})
106+
107+
85108
# dict containing parameters for DQ checks
86109
dq_params = {
87110
"appeals": {"unique": ["id"]},
@@ -189,7 +212,7 @@ def loadIncrementsSinceDate(increment_table_name, name_space, date):
189212
# snapshot table in glue catalogue
190213
else:
191214
pushDownPredicate = create_pushdown_predicate(
192-
partitionDateColumn="snapshot_date", daysBuffer=3
215+
partitionDateColumn="snapshot_date", daysBuffer=30
193216
)
194217
# load latest snpashot
195218
snapshot_ddf = glueContext.create_dynamic_frame.from_catalog(
@@ -318,6 +341,9 @@ def loadIncrementsSinceDate(increment_table_name, name_space, date):
318341
snapshot_df, glueContext, "resultDataFrame"
319342
)
320343
target_destination = s3_bucket_target + table_name
344+
345+
# Clean up today's partition before writing
346+
purge_today_partition(glueContext, target_destination)
321347
parquetData = glueContext.write_dynamic_frame.from_options(
322348
frame=resultDataFrame,
323349
connection_type="s3",

0 commit comments

Comments
 (0)