Skip to content

Commit bab73a4

Browse files
brunoddBruno De Deken
andauthored
bugfix - PR #180 - *Snowflake > Spark* Ensure matched inserts sync to snowflake
Ensure a new INSERT is synchronized to Snowflake, even when a match is already found. ## Motivation and Context When a delta table is VACUUMed before a sync to Snowflake was triggered, a updated record will be considered as an "INSERT" record. However the target table might already contain this record. This PR ensures that this record is still updated as would be desired. ## How Has This Been Tested? This was a bug in our code. This code currently lives in our repo and fixed the issue. --------- Co-authored-by: Bruno De Deken <bruno.dedeken@nike.com>
1 parent b5168e1 commit bab73a4

File tree

2 files changed

+3
-3
lines changed

2 files changed

+3
-3
lines changed

src/koheesio/integrations/spark/snowflake.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -970,7 +970,7 @@ def _build_sf_merge_query(
970970
f"""
971971
MERGE INTO {target_table} target
972972
USING {stage_table} temp ON {key_join_string}
973-
WHEN MATCHED AND temp._change_type = 'update_postimage'
973+
WHEN MATCHED AND (temp._change_type = 'update_postimage' OR temp._change_type = 'insert')
974974
THEN UPDATE SET {assignment_string}
975975
WHEN NOT MATCHED AND temp._change_type != 'delete'
976976
THEN INSERT ({columns_string})

tests/spark/integrations/snowflake/test_sync_task.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -474,7 +474,7 @@ def test_merge_query_no_delete(self):
474474
"""
475475
MERGE INTO target_table target
476476
USING tmp_table temp ON target.Country = temp.Country
477-
WHEN MATCHED AND temp._change_type = 'update_postimage'
477+
WHEN MATCHED AND (temp._change_type = 'update_postimage' OR temp._change_type = 'insert')
478478
THEN UPDATE SET NumVaccinated = temp.NumVaccinated, AvailableDoses = temp.AvailableDoses
479479
WHEN NOT MATCHED AND temp._change_type != 'delete'
480480
THEN INSERT (Country, NumVaccinated, AvailableDoses)
@@ -495,7 +495,7 @@ def test_merge_query_with_delete(self):
495495
"""
496496
MERGE INTO target_table target
497497
USING tmp_table temp ON target.Country = temp.Country
498-
WHEN MATCHED AND temp._change_type = 'update_postimage'
498+
WHEN MATCHED AND (temp._change_type = 'update_postimage' OR temp._change_type = 'insert')
499499
THEN UPDATE SET NumVaccinated = temp.NumVaccinated, AvailableDoses = temp.AvailableDoses
500500
WHEN NOT MATCHED AND temp._change_type != 'delete'
501501
THEN INSERT (Country, NumVaccinated, AvailableDoses)

0 commit comments

Comments
 (0)