Skip to content

Commit f290364

Browse files
authored
Check for dbt_valid_to_current config for deleted records to avoid duplication (#1241)
1 parent b3467f1 commit f290364

File tree

5 files changed

+107
-4
lines changed

5 files changed

+107
-4
lines changed
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
kind: Fixes
2+
body: Fix duplicates when hard_deletes new_record is used with custom dbt_valid_to_current
3+
time: 2025-09-03T15:21:02.112323+12:00
4+
custom:
5+
Author: jeremyyeo
6+
Issue: "1240"

dbt-adapters/src/dbt/include/global_project/macros/materializations/snapshots/helpers.sql

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -158,7 +158,12 @@
158158
and not (
159159
--avoid updating the record's valid_to if the latest entry is marked as deleted
160160
snapshotted_data.{{ columns.dbt_is_deleted }} = 'True'
161-
and snapshotted_data.{{ columns.dbt_valid_to }} is null
161+
and
162+
{% if config.get('dbt_valid_to_current') -%}
163+
snapshotted_data.{{ columns.dbt_valid_to }} = {{ config.get('dbt_valid_to_current') }}
164+
{%- else -%}
165+
snapshotted_data.{{ columns.dbt_valid_to }} is null
166+
{%- endif %}
162167
)
163168
{%- endif %}
164169
)
@@ -172,10 +177,10 @@
172177

173178
select
174179
'insert' as dbt_change_type,
175-
{#
180+
{#/*
176181
If a column has been added to the source it won't yet exist in the
177182
snapshotted table so we insert a null value as a placeholder for the column.
178-
#}
183+
*/#}
179184
{%- for col in source_sql_cols -%}
180185
{%- if col.name in snapshotted_cols -%}
181186
snapshotted_data.{{ adapter.quote(col.column) }},
@@ -202,7 +207,12 @@
202207
and not (
203208
--avoid inserting a new record if the latest one is marked as deleted
204209
snapshotted_data.{{ columns.dbt_is_deleted }} = 'True'
205-
and snapshotted_data.{{ columns.dbt_valid_to }} is null
210+
and
211+
{% if config.get('dbt_valid_to_current') -%}
212+
snapshotted_data.{{ columns.dbt_valid_to }} = {{ config.get('dbt_valid_to_current') }}
213+
{%- else -%}
214+
snapshotted_data.{{ columns.dbt_valid_to }} is null
215+
{%- endif %}
206216
)
207217
208218
)

dbt-postgres/tests/functional/adapter/test_simple_snapshot.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,9 @@
22
from dbt.tests.adapter.simple_snapshot.new_record_timestamp_mode import (
33
BaseSnapshotNewRecordTimestampMode,
44
)
5+
from dbt.tests.adapter.simple_snapshot.new_record_dbt_valid_to_current import (
6+
BaseSnapshotNewRecordDbtValidToCurrent,
7+
)
58
from dbt.tests.adapter.simple_snapshot.test_snapshot import (
69
BaseSimpleSnapshot,
710
BaseSnapshotCheck,
@@ -22,3 +25,7 @@ class TestSnapshotNewRecordTimestampMode(BaseSnapshotNewRecordTimestampMode):
2225

2326
class TestSnapshotNewRecordCheckMode(BaseSnapshotNewRecordCheckMode):
2427
pass
28+
29+
30+
class TestSnapshotNewRecordDbtValidToCurrent(BaseSnapshotNewRecordDbtValidToCurrent):
31+
pass
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
kind: Fixes
2+
body: Fix duplicates when hard_deletes new_record is used with custom dbt_valid_to_current
3+
time: 2025-09-03T15:29:30.515546+12:00
4+
custom:
5+
Author: jeremyyeo
6+
Issue: "1240"
Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
import pytest
2+
3+
from dbt.tests.util import run_dbt
4+
5+
_seed_new_record_mode_statements = [
6+
"create table {database}.{schema}.seed (id INTEGER, first_name VARCHAR(50));",
7+
"insert into {database}.{schema}.seed (id, first_name) values (1, 'Judith'), (2, 'Arthur');",
8+
]
9+
10+
_snapshot_actual_sql = """
11+
{% snapshot snapshot_actual %}
12+
select * from {{target.database}}.{{target.schema}}.seed
13+
{% endsnapshot %}
14+
"""
15+
16+
_delete_sql = """
17+
delete from {database}.{schema}.seed where id = 1
18+
"""
19+
20+
# If the deletion worked correctly, this should return one row (and not more) where dbt_is_deleted is True
21+
_delete_check_sql = """
22+
select dbt_scd_id from {schema}.snapshot_actual where id = 1 and dbt_is_deleted = 'True'
23+
"""
24+
25+
_snapshots_yml = """
26+
snapshots:
27+
- name: snapshot_actual
28+
config:
29+
unique_key: id
30+
strategy: check
31+
check_cols: all
32+
hard_deletes: new_record
33+
dbt_valid_to_current: "date('9999-12-31')"
34+
"""
35+
36+
37+
class BaseSnapshotNewRecordDbtValidToCurrent:
38+
@pytest.fixture(scope="class")
39+
def snapshots(self):
40+
return {"snapshot.sql": _snapshot_actual_sql}
41+
42+
@pytest.fixture(scope="class")
43+
def models(self):
44+
return {"snapshots.yml": _snapshots_yml}
45+
46+
@pytest.fixture(scope="class")
47+
def seed_new_record_mode_statements(self):
48+
return _seed_new_record_mode_statements
49+
50+
@pytest.fixture(scope="class")
51+
def delete_sql(self):
52+
return _delete_sql
53+
54+
def test_snapshot_new_record_mode(
55+
self,
56+
project,
57+
seed_new_record_mode_statements,
58+
delete_sql,
59+
):
60+
for stmt in seed_new_record_mode_statements:
61+
project.run_sql(stmt)
62+
63+
# Snapshot once to get the initial snapshot
64+
run_dbt(["snapshot"])
65+
66+
# Remove the record from the source data
67+
project.run_sql(delete_sql)
68+
69+
# Snapshot twice in a row checking that the deleted record is not duplicated in the snapshot
70+
run_dbt(["snapshot"])
71+
run_dbt(["snapshot"])
72+
73+
check_result = project.run_sql(_delete_check_sql, fetch="all")
74+
assert len(check_result) == 1

0 commit comments

Comments
 (0)