11import sys
2- from datetime import datetime , timedelta
3- from typing import Optional
2+ from datetime import datetime
43
54import pyspark .sql .functions as F
65from awsglue .context import GlueContext
1110from pydeequ .repository import FileSystemMetricsRepository , ResultKey
1211from pydeequ .verification import VerificationResult , VerificationSuite
1312from pyspark import SparkContext
14- from pyspark .sql import DataFrame , SparkSession , Window
13+ from pyspark .sql import SparkSession , Window
1514from pyspark .sql .functions import col , max
1615
1716from scripts .helpers .data_quality_testing import (
2726)
2827
2928
30- def get_latest_snapshot (dfa : DataFrame ) -> DataFrame :
31- if "snapshot_date" not in dfa .columns :
32- logger .warn (
33- "No snapshot_date column found in the dataframe, adding snapshot date columns"
34- )
35- dfa = add_snapshot_date_columns (dfa )
36- max_date = dfa .select (max ("snapshot_date" )).first ()[0 ]
37- df_latest_snapshot = dfa .where (col ("snapshot_date" ) == max_date )
38- return df_latest_snapshot
29+ def get_latest_snapshot (dfa ):
30+ dfa = dfa .where (col ("snapshot_date" ) == dfa .select (max ("snapshot_date" )).first ()[0 ])
31+ return dfa
3932
4033
4134def add_snapshot_date_columns (data_frame ):
@@ -78,22 +71,11 @@ def apply_increments(snapshot_df, increment_df):
7871 return snapshot_df
7972
8073
81- def loadIncrementsSinceDate (
82- increment_table_name : str , name_space : str , date : Optional [str ] = None
83- ) -> DataFrame :
84- """
85- Loads increments from the specified catalog table starting from a given date.
86- If the provided date is None, it defaults to 60 days ago.
87-
88- Returns:
89- DataFrame: A Spark DataFrame containing the loaded increments.
90- """
91- if date is None :
92- date = (datetime .now () - timedelta (days = 60 )).strftime ("%Y%m%d" ) # default date
74+ def loadIncrementsSinceDate (increment_table_name , name_space , date ):
9375 increment_ddf = glueContext .create_dynamic_frame .from_catalog (
9476 name_space = name_space ,
9577 table_name = increment_table_name ,
96- push_down_predicate = f"import_date>=' { date } ' " ,
78+ push_down_predicate = f"import_date>={ date } " ,
9779 transformation_ctx = f"datasource_{ increment_table_name } " ,
9880 )
9981 increment_df = increment_ddf .toDF ()
@@ -149,32 +131,6 @@ def loadIncrementsSinceDate(
149131 "enforcement_breach_details" : {"unique" : ["id" ]},
150132}
151133
152-
153- def purge_today_partition (
154- glueContext : GlueContext , target_destination : str , retentionPeriod : int = 0
155- ) -> None :
156- """
157- Purges (delete) only today's partition under the given target destination.
158-
159- Parameters:
160- glueContext: GlueContext instance.
161- target_destination: Base S3 path (e.g., "s3://your-bucket/path").
162- retentionPeriod: Retention period in hours (default 0, meaning delete all files immediately).
163-
164- Returns:
165- partition_path: The S3 partition path that was purged.
166- """
167- now = datetime .now ()
168- snapshot_year = str (now .year )
169- snapshot_month = str (now .month ).zfill (2 )
170- snapshot_day = str (now .day ).zfill (2 )
171- snapshot_date = snapshot_year + snapshot_month + snapshot_day
172-
173- partition_path = f"{ target_destination } /snapshot_year={ snapshot_year } /snapshot_month={ snapshot_month } /snapshot_day={ snapshot_day } /snapshot_date={ snapshot_date } "
174-
175- glueContext .purge_s3_path (partition_path , {"retentionPeriod" : retentionPeriod })
176-
177-
178134if __name__ == "__main__" :
179135 args = getResolvedOptions (sys .argv , ["JOB_NAME" ])
180136
@@ -216,11 +172,10 @@ def purge_today_partition(
216172 f"No snapshot and no increment for { increment_table_name } , going to the next table"
217173 )
218174 continue
219-
220- # Load increments from default date
221175 increment_df = loadIncrementsSinceDate (
222176 increment_table_name = increment_table_name ,
223177 name_space = source_catalog_database ,
178+ date = "20210101" ,
224179 )
225180 if increment_df .rdd .isEmpty ():
226181 logger .info (
@@ -234,18 +189,14 @@ def purge_today_partition(
234189 # snapshot table in glue catalogue
235190 else :
236191 pushDownPredicate = create_pushdown_predicate (
237- partitionDateColumn = "snapshot_date" , daysBuffer = 60
192+ partitionDateColumn = "snapshot_date" , daysBuffer = 3
238193 )
239194 # load latest snpashot
240195 snapshot_ddf = glueContext .create_dynamic_frame .from_catalog (
241196 name_space = source_catalog_database ,
242197 table_name = snapshot_table_name ,
243198 push_down_predicate = pushDownPredicate ,
244199 )
245- if snapshot_ddf .count () == 0 :
246- logger .error (
247- f"No data returned for table { snapshot_table_name } using push_down_predicate: { pushDownPredicate } . "
248- )
249200 snapshot_df = snapshot_ddf .toDF ()
250201 snapshot_df = get_latest_snapshot (snapshot_df )
251202 last_snapshot_date = snapshot_df .select (max ("snapshot_date" )).first ()[0 ]
@@ -278,7 +229,6 @@ def purge_today_partition(
278229 # apply COU
279230 logger .info (f"Applying increment { increment_table_name } " )
280231 snapshot_df = apply_increments (snapshot_df , increment_df )
281-
282232 else :
283233 logger .info (
284234 f"Couldn't find table { increment_table_name } in database { source_catalog_database } , saving same snapshot as yesterday"
@@ -368,9 +318,6 @@ def purge_today_partition(
368318 snapshot_df , glueContext , "resultDataFrame"
369319 )
370320 target_destination = s3_bucket_target + table_name
371-
372- # Clean up today's partition before writing
373- purge_today_partition (glueContext , target_destination )
374321 parquetData = glueContext .write_dynamic_frame .from_options (
375322 frame = resultDataFrame ,
376323 connection_type = "s3" ,
@@ -380,7 +327,6 @@ def purge_today_partition(
380327 "partitionKeys" : PARTITION_KEYS ,
381328 },
382329 )
383-
384330 job .commit ()
385331 finally :
386332 if len (dq_errors ) > 0 :
0 commit comments