Skip to content

Commit 7f4e60f

Browse files
committed
clean the partition before writing to it
1 parent 5bb2aed commit 7f4e60f

File tree

1 file changed

+47
-7
lines changed

1 file changed

+47
-7
lines changed

scripts/jobs/copy_tables_landing_to_raw.py

Lines changed: 47 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,27 +1,62 @@
1-
import boto3
2-
import sys
31
import re
4-
from awsglue.transforms import *
5-
from awsglue.utils import getResolvedOptions
6-
from pyspark.context import SparkContext
2+
import sys
3+
from datetime import datetime
4+
5+
import boto3
76
from awsglue.context import GlueContext
87
from awsglue.dynamicframe import DynamicFrame
98
from awsglue.job import Job
9+
from awsglue.utils import getResolvedOptions
10+
from pyspark.context import SparkContext
1011

1112
from scripts.helpers.helpers import (
13+
PARTITION_KEYS,
1214
add_timestamp_column,
1315
get_glue_env_var,
1416
initialise_job,
15-
PARTITION_KEYS,
1617
)
1718

19+
20+
def purge_today_partition(
21+
glue_context: GlueContext,
22+
target_destination: str,
23+
table_name: str,
24+
retentionPeriod: int = 0,
25+
) -> None:
26+
"""
27+
Purges (delete) only today's partition under the given target destination.
28+
Parameters:
29+
glue_context: GlueContext instance.
30+
target_destination: Base S3 path (e.g., "s3://your-bucket/path").
31+
table_name: Name of the table being purged for logging purposes.
32+
retentionPeriod: Retention period in hours (default 0, meaning delete all files immediately).
33+
Returns:
34+
partition_path: The S3 partition path that was purged.
35+
"""
36+
now = datetime.now()
37+
import_year = str(now.year)
38+
import_month = str(now.month).zfill(2)
39+
import_day = str(now.day).zfill(2)
40+
import_date = import_year + import_month + import_day
41+
42+
partition_path = f"{target_destination}/import_year={import_year}/import_month={import_month}/import_day={import_day}/import_date={import_date}"
43+
44+
logger.info(
45+
f"Purging today's partition for table {table_name} at path: {partition_path}"
46+
)
47+
glue_context.purge_s3_path(partition_path, {"retentionPeriod": retentionPeriod})
48+
logger.info(f"Successfully purged partition for table {table_name}")
49+
50+
1851
## @params: [JOB_NAME]
1952
args = getResolvedOptions(sys.argv, ["JOB_NAME"])
2053

2154
sc = SparkContext()
2255
glue_context = GlueContext(sc)
2356
spark = glue_context.spark_session
2457
job = Job(glue_context)
58+
59+
# Global logger
2560
logger = glue_context.get_logger()
2661

2762
initialise_job(args, job, logger)
@@ -60,8 +95,13 @@
6095

6196
table_with_timestamp = add_timestamp_column(table_data_frame)
6297

98+
target_destination = f"s3://{bucket_target}/{prefix}{table_name}"
99+
100+
# Clean up today's partition before writing
101+
purge_today_partition(glue_context, target_destination, table_name)
102+
63103
data_sink = glue_context.getSink(
64-
path="s3://" + bucket_target + "/" + prefix + table_name + "/",
104+
path=target_destination + "/",
65105
connection_type="s3",
66106
updateBehavior="UPDATE_IN_DATABASE",
67107
partitionKeys=PARTITION_KEYS,

0 commit comments

Comments
 (0)