Skip to content

Commit a042dfe

Browse files
yakovlevvsdamian3031
authored andcommitted
Fix handling of composite unique_key in incremental models
1 parent 4b5b59b commit a042dfe

File tree

3 files changed

+82
-2
lines changed

3 files changed

+82
-2
lines changed
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
kind: Fixes
2+
body: Fix handling of composite unique_key in incremental models
3+
time: 2025-03-12T00:03:46.198336572+03:00
4+
custom:
5+
Author: yakovlevvs
6+
Issue: "465"
7+
PR: "473"

dbt/include/trino/macros/materializations/incremental.sql

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -125,11 +125,15 @@
125125
{% if unique_key %}
126126
{% if unique_key is sequence and unique_key is not string %}
127127
delete from {{ target }}
128-
where
128+
where exists (
129+
select 1
130+
from {{ source }}
131+
where
129132
{% for key in unique_key %}
130-
{{ target }}.{{ key }} in (select {{ key }} from {{ source }})
133+
{{ target }}.{{ key }} = {{ source }}.{{ key }}
131134
{{ "and " if not loop.last }}
132135
{% endfor %}
136+
)
133137
{% if incremental_predicates %}
134138
{% for predicate in incremental_predicates %}
135139
and {{ predicate }}

tests/functional/adapter/materialization/test_incremental_delete_insert.py

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,9 @@
11
import pytest
2+
from dbt.tests.adapter.incremental.test_incremental_predicates import (
3+
BaseIncrementalPredicates,
4+
models__delete_insert_incremental_predicates_sql,
5+
seeds__expected_delete_insert_incremental_predicates_csv,
6+
)
27
from dbt.tests.adapter.incremental.test_incremental_unique_id import (
38
BaseIncrementalUniqueKey,
49
models__duplicated_unary_unique_key_list_sql,
@@ -124,6 +129,39 @@
124129
125130
"""
126131

132+
models__delete_insert_composite_keys_sql = """
133+
{{
134+
config(
135+
materialized='incremental',
136+
incremental_strategy='delete+insert',
137+
unique_key=['id', 'col']
138+
)
139+
}}
140+
select 1 as id, 1 as col
141+
union all
142+
select 1 as id, 3 as col
143+
union all
144+
select 3 as id, 1 as col
145+
union all
146+
select 3 as id, 3 as col
147+
148+
{% if is_incremental() %}
149+
150+
except
151+
(select 1 as id, 1 as col
152+
union all
153+
select 3 as id, 3 as col)
154+
155+
{% endif %}
156+
"""
157+
158+
seeds__expected_delete_insert_composite_keys_csv = """id,col
159+
1,1
160+
1,3
161+
3,1
162+
3,3
163+
"""
164+
127165

128166
class TrinoIncrementalUniqueKey(BaseIncrementalUniqueKey):
129167
@pytest.fixture(scope="class")
@@ -204,3 +242,34 @@ def test_temporary_table_location(self, project):
204242
f'create table "{project.database}"."{project.test_schema}"."model__dbt_tmp"' in logs
205243
)
206244
assert "location = 's3a://datalake/model__dbt_tmp'" in logs
245+
246+
247+
@pytest.mark.iceberg
248+
class TestIcebergCompositeUniqueKeys(BaseIncrementalPredicates):
249+
@pytest.fixture(scope="class")
250+
def seeds(self):
251+
return {
252+
"expected_delete_insert_incremental_predicates.csv": seeds__expected_delete_insert_incremental_predicates_csv,
253+
"expected_delete_insert_composite_keys.csv": seeds__expected_delete_insert_composite_keys_csv,
254+
}
255+
256+
@pytest.fixture(scope="class")
257+
def models(self):
258+
return {
259+
"delete_insert_incremental_predicates.sql": models__delete_insert_incremental_predicates_sql,
260+
"delete_insert_composite_keys.sql": models__delete_insert_composite_keys_sql,
261+
}
262+
263+
def test__incremental_predicates_composite_keys(self, project):
264+
"""seed should match model after two incremental runs"""
265+
266+
expected_fields = self.get_expected_fields(
267+
relation="expected_delete_insert_composite_keys", seed_rows=4
268+
)
269+
test_case_fields = self.get_test_fields(
270+
project,
271+
seed="expected_delete_insert_composite_keys",
272+
incremental_model="delete_insert_composite_keys",
273+
update_sql_file=None,
274+
)
275+
self.check_scenario_correctness(expected_fields, test_case_fields, project)

0 commit comments

Comments
 (0)