Skip to content

Commit ea9f6f9

Browse files
Support new column in source table with hard_deletes = new_record (#1201)
1 parent 1f31e6a commit ea9f6f9

File tree

6 files changed

+149
-0
lines changed

6 files changed

+149
-0
lines changed
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
kind: Fixes
2+
body: 'Update dbt snapshot staging table creation to handle new source columns when
3+
using hard_deletes == ''new_record'' '
4+
time: 2025-07-15T12:59:33.467839-07:00
5+
custom:
6+
Author: colin-rogers-dbt
7+
Issue: "852"

dbt-adapters/src/dbt/include/global_project/macros/adapters/columns.sql

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,13 @@
1616
{{ return(columns) }}
1717
{% endmacro %}
1818

19+
{%- macro get_list_of_column_names(columns) -%}
20+
{% set col_names = [] %}
21+
{% for col in columns %}
22+
{% do col_names.append(col.name) %}
23+
{% endfor %}
24+
{{ return(col_names) }}
25+
{% endmacro %}
1926

2027
{% macro get_empty_subquery_sql(select_sql, select_sql_header=none) -%}
2128
{{ return(adapter.dispatch('get_empty_subquery_sql', 'dbt')(select_sql, select_sql_header)) }}

dbt-adapters/src/dbt/include/global_project/macros/materializations/snapshots/helpers.sql

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -165,14 +165,23 @@
165165
{%- endif %}
166166

167167
{%- if strategy.hard_deletes == 'new_record' %}
168+
{% set snapshotted_cols = get_list_of_column_names(get_columns_in_relation(target_relation)) %}
168169
{% set source_sql_cols = get_column_schema_from_query(source_sql) %}
169170
,
170171
deletion_records as (
171172

172173
select
173174
'insert' as dbt_change_type,
175+
{#
176+
If a column has been added to the source it won't yet exist in the
177+
snapshotted table so we insert a null value as a placeholder for the column.
178+
#}
174179
{%- for col in source_sql_cols -%}
180+
{%- if col.name in snapshotted_cols -%}
175181
snapshotted_data.{{ adapter.quote(col.column) }},
182+
{%- else -%}
183+
NULL as {{ adapter.quote(col.column) }},
184+
{%- endif -%}
176185
{% endfor -%}
177186
{%- if strategy.unique_key | is_list -%}
178187
{%- for key in strategy.unique_key -%}
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
from dbt.tests.adapter.simple_snapshot.test_ephemeral_snapshot_hard_deletes import (
2+
BaseSnapshotEphemeralHardDeletes,
3+
)
4+
5+
6+
class TestSnapshotEphemeralHardDeletes(BaseSnapshotEphemeralHardDeletes):
7+
pass
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
kind: Features
2+
body: 'Add BaseSnapshotEphemeralHardDeletes test '
3+
time: 2025-07-15T13:10:36.059488-07:00
4+
custom:
5+
Author: colin-rogers-dbt
6+
Issue: "1201"
Lines changed: 113 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,113 @@
1+
import pytest
2+
3+
from dbt.tests.util import run_dbt
4+
5+
6+
# Source table creation statement
7+
_source_create_sql = """
8+
create table {database}.{schema}.src_customers (
9+
id INTEGER,
10+
first_name VARCHAR(50),
11+
last_name VARCHAR(50),
12+
email VARCHAR(50),
13+
updated_at TIMESTAMP
14+
);
15+
"""
16+
17+
# Initial data for source table
18+
_source_insert_sql = """
19+
insert into {database}.{schema}.src_customers (id, first_name, last_name, email, updated_at) values
20+
(1, 'John', 'Doe', '[email protected]', '2023-01-01 10:00:00'),
21+
(2, 'Jane', 'Smith', '[email protected]', '2023-01-02 11:00:00'),
22+
(3, 'Bob', 'Johnson', '[email protected]', '2023-01-03 12:00:00');
23+
"""
24+
25+
# SQL to add a dummy column to source table (simulating schema change)
26+
_source_alter_sql = """
27+
alter table {database}.{schema}.src_customers add column dummy_column VARCHAR(50) default 'dummy_value';
28+
"""
29+
30+
# Sources YAML configuration
31+
_sources_yml = """
32+
version: 2
33+
34+
sources:
35+
- name: test_source
36+
schema: "{{ target.schema }}"
37+
tables:
38+
- name: src_customers
39+
"""
40+
41+
# Ephemeral model that references the source
42+
_ephemeral_customers_sql = """
43+
{{ config(materialized='ephemeral') }}
44+
45+
select * from {{ source('test_source', 'src_customers') }}
46+
"""
47+
48+
# Snapshots YAML configuration with hard_deletes: new_record
49+
_snapshots_yml = """
50+
snapshots:
51+
- name: snapshot_customers
52+
relation: ref('ephemeral_customers')
53+
config:
54+
unique_key: id
55+
strategy: check
56+
check_cols: all
57+
hard_deletes: new_record
58+
"""
59+
60+
# Test model to query the snapshot (for verification)
61+
_ref_snapshot_sql = """
62+
select * from {{ ref('snapshot_customers') }}
63+
"""
64+
65+
66+
class BaseSnapshotEphemeralHardDeletes:
67+
@pytest.fixture(scope="class")
68+
def models(self):
69+
return {
70+
"_sources.yml": _sources_yml,
71+
"ephemeral_customers.sql": _ephemeral_customers_sql,
72+
"snapshots.yml": _snapshots_yml,
73+
"ref_snapshot.sql": _ref_snapshot_sql,
74+
}
75+
76+
@pytest.fixture(scope="class")
77+
def source_create_sql(self):
78+
return _source_create_sql
79+
80+
@pytest.fixture(scope="class")
81+
def source_insert_sql(self):
82+
return _source_insert_sql
83+
84+
@pytest.fixture(scope="class")
85+
def source_alter_sql(self):
86+
return _source_alter_sql
87+
88+
def test_ephemeral_snapshot_hard_deletes(
89+
self, project, source_create_sql, source_insert_sql, source_alter_sql
90+
):
91+
92+
project.run_sql(
93+
source_create_sql.format(database=project.database, schema=project.test_schema)
94+
)
95+
project.run_sql(
96+
source_insert_sql.format(database=project.database, schema=project.test_schema)
97+
)
98+
99+
results = run_dbt(["snapshot"])
100+
assert results is not None
101+
assert len(results) == 1 # type: ignore
102+
103+
snapshot_result = project.run_sql(
104+
"select count(*) as row_count from snapshot_customers", fetch="one"
105+
)
106+
assert snapshot_result[0] == 3 # Should have 3 rows from initial data
107+
108+
project.run_sql(
109+
source_alter_sql.format(database=project.database, schema=project.test_schema)
110+
)
111+
112+
results = run_dbt(["snapshot"])
113+
assert len(results) == 1 # type: ignore

0 commit comments

Comments
 (0)