Skip to content

Commit 01beda2

Browse files
authored
Further-deduplicate-the-incremental-rows (#2209)
* further deduplicate the incremental rows * pass the logger to the function * revert the workers to 12 - it must less than 16 based on the terraform linter * use standard logger * stop the spark quickly in finally * small tweak on shuffle and small files
1 parent 320561a commit 01beda2

File tree

1 file changed

+37
-4
lines changed

1 file changed

+37
-4
lines changed

scripts/jobs/planning/tascomi_create_daily_snapshot.py

Lines changed: 37 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import logging
12
import sys
23
from datetime import datetime
34

@@ -25,6 +26,11 @@
2526
table_exists_in_catalog,
2627
)
2728

29+
logging.basicConfig(
30+
level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s"
31+
)
32+
logger = logging.getLogger(__name__)
33+
2834

2935
def get_latest_snapshot(dfa):
3036
dfa = dfa.where(col("snapshot_date") == dfa.select(max("snapshot_date")).first()[0])
@@ -44,6 +50,24 @@ def add_snapshot_date_columns(data_frame):
4450
return data_frame
4551

4652

53+
def deduplicate_by_id_and_last_updated(df):
54+
"""
55+
Deduplicates rows with the same (id, last_updated) combination by keeping the one with the latest import_date.
56+
To resolve: spotted duplicated rows with same id and last_updated timestamp in some incremental tables (e.g. documents)
57+
"""
58+
window_spec = Window.partitionBy("id", "last_updated").orderBy(
59+
F.col("import_date").desc()
60+
)
61+
62+
deduplicated_df = (
63+
df.withColumn("row_num", F.row_number().over(window_spec))
64+
.filter(F.col("row_num") == 1)
65+
.drop("row_num")
66+
)
67+
68+
return deduplicated_df
69+
70+
4771
def prepare_increments(increment_df):
4872
# In case there are several days worth of increments: only keep the latest version of a record
4973
id_partition = Window.partitionBy("id")
@@ -62,6 +86,15 @@ def prepare_increments(increment_df):
6286
.where(F.col("last_updated_nonull") == F.col("latest"))
6387
.drop("latest", "last_updated_nonull")
6488
)
89+
90+
# Check for residual duplicates - print and further de-duplicate
91+
duplicate_ids = increment_df.groupBy("id").count().filter("count > 1")
92+
if duplicate_ids.limit(1).count() > 0:
93+
duplicate_ids.join(increment_df, "id").show(truncate=False)
94+
increment_df = deduplicate_by_id_and_last_updated(increment_df)
95+
else:
96+
logger.info("No duplicated rows after initial deduplication.")
97+
6598
return increment_df
6699

67100

@@ -165,7 +198,6 @@ def purge_today_partition(
165198
sc = SparkContext.getOrCreate()
166199
glueContext = GlueContext(sc)
167200
spark = SparkSession(sc)
168-
logger = glueContext.get_logger()
169201
job = Job(glueContext)
170202
job.init(args["JOB_NAME"], args)
171203

@@ -336,7 +368,7 @@ def purge_today_partition(
336368
verificationSuite.saveOrAppendResult(resultKey).run()
337369

338370
# if data quality tests succeed, write to S3
339-
371+
snapshot_df = snapshot_df.coalesce(300)
340372
resultDataFrame = DynamicFrame.fromDF(
341373
snapshot_df, glueContext, "resultDataFrame"
342374
)
@@ -356,7 +388,8 @@ def purge_today_partition(
356388
job.commit()
357389
finally:
358390
if len(dq_errors) > 0:
359-
logger.error(f"DQ Errors: {dq_errors}")
360-
raise Exception(f"Data quality check failed: {'; '.join(dq_errors)}")
391+
logger.error(f"Errors: {dq_errors}")
392+
spark.stop()
393+
raise SystemExit(f"Failed: {'; '.join(dq_errors)}")
361394
spark.sparkContext._gateway.close()
362395
spark.stop()

0 commit comments

Comments
 (0)