@@ -44,6 +44,24 @@ def add_snapshot_date_columns(data_frame):
4444 return data_frame
4545
4646
47+ def deduplicate_by_id_and_last_updated (df ):
48+ """
49+ Deduplicates rows with the same (id, last_updated) combination by keeping the one with the latest import_date.
50+ To resolve: spotted duplicated rows with same id and last_updated timestamp in some incremental tables (e.g. documents)
51+ """
52+ window_spec = Window .partitionBy ("id" , "last_updated" ).orderBy (
53+ F .col ("import_date" ).desc ()
54+ )
55+
56+ deduplicated_df = (
57+ df .withColumn ("row_num" , F .row_number ().over (window_spec ))
58+ .filter (F .col ("row_num" ) == 1 )
59+ .drop ("row_num" )
60+ )
61+
62+ return deduplicated_df
63+
64+
4765def prepare_increments (increment_df ):
4866 # In case there are several days worth of increments: only keep the latest version of a record
4967 id_partition = Window .partitionBy ("id" )
@@ -62,6 +80,15 @@ def prepare_increments(increment_df):
6280 .where (F .col ("last_updated_nonull" ) == F .col ("latest" ))
6381 .drop ("latest" , "last_updated_nonull" )
6482 )
83+
84+ # Check for residual duplicates - print and further de-duplicate
85+ duplicate_ids = increment_df .groupBy ("id" ).count ().filter ("count > 1" )
86+ if duplicate_ids .count () > 0 :
87+ duplicate_ids .join (increment_df , "id" ).show (truncate = False )
88+ increment_df = deduplicate_by_id_and_last_updated (increment_df )
89+ else :
90+ logger .info ("No duplicated rows after initial deduplication." )
91+
6592 return increment_df
6693
6794
@@ -356,7 +383,7 @@ def purge_today_partition(
356383 job .commit ()
357384 finally :
358385 if len (dq_errors ) > 0 :
359- logger .error (f"DQ Errors: { dq_errors } " )
360- raise Exception (f"Data quality check failed : { '; ' .join (dq_errors )} " )
386+ logger .error (f"Errors: { dq_errors } " )
387+ raise Exception (f"Job Failed : { '; ' .join (dq_errors )} " )
361388 spark .sparkContext ._gateway .close ()
362389 spark .stop ()
0 commit comments