Skip to content

Commit 9259b77

Browse files
feat: Complete hard_deletes='new_record' implementation for snapshots
Fixes #1176 Implements full support for hard_deletes configuration in snapshot materializations, enabling users to track deleted source records with dedicated deletion records marked by dbt_is_deleted=True. ## Root Cause The dbt-core snapshot_staging_table macro generates a deletion_records CTE that relies on get_column_schema_from_query() for source columns, which returns proper column schema objects with .name attributes. However, when building the list of snapshotted_cols from the target table, it used get_columns_in_relation() which returns agate.Row tuples like ('col_name', 'data_type', 'comment'). The deletion_records CTE tried to iterate these tuples using .name attribute (via get_list_of_column_names()), which doesn't exist on tuples. This caused the column matching logic to fail silently, preventing deletion records from being properly constructed with the correct columns from the snapshotted table. This resulted in deletion records being inserted with NULL values for all source columns (id, name, etc.) instead of the actual values from the deleted records, causing malformed output as reported in issue #1176. ## Solution Created databricks__snapshot_staging_table override that properly extracts column names from agate.Row tuples by accessing index [0] instead of .name attribute. This ensures the deletion_records CTE receives correct column lists for both source and target tables, allowing proper column matching when inserting deletion records. Additionally, overrode databricks__build_snapshot_table to include dbt_is_deleted column in initial snapshot table creation when hard_deletes='new_record', ensuring the column exists from the start and doesn't need to be added later. ## Changes **New file: dbt/include/databricks/macros/materializations/snapshot_helpers.sql** - databricks__build_snapshot_table: Adds dbt_is_deleted column for new_record mode - databricks__snapshot_staging_table: Complete override to fix column name extraction - Properly extracts column names from agate.Row tuples using index [0] - Filters out Databricks metadata rows (starting with '#') - Generates correct deletion_records CTE with proper column matching **New file: dbt/include/databricks/macros/materializations/snapshot_merge.sql** - databricks__snapshot_merge_sql: Implements hard_deletes-aware MERGE logic - Supports 'invalidate' mode with WHEN NOT MATCHED BY SOURCE clause - Uses 'insert *' pattern to include all staging table columns including dbt_is_deleted **New file: tests/functional/adapter/simple_snapshot/test_hard_deletes.py** - Comprehensive functional tests for all three hard_deletes modes - TestHardDeleteIgnore: Verifies deleted records remain unchanged (default) - TestHardDeleteInvalidate: Verifies dbt_valid_to is set for deleted records - TestHardDeleteNewRecord: Verifies new deletion records with dbt_is_deleted=True ## All Three Modes Now Working **hard_deletes='ignore'** (default) - Deleted records remain unchanged in snapshot - dbt_valid_to stays NULL for records no longer in source - Maintains backward compatibility **hard_deletes='invalidate'** - Deleted records are invalidated by setting dbt_valid_to timestamp - Uses Delta Lake's WHEN NOT MATCHED BY SOURCE clause - Original records marked as no longer valid when removed from source **hard_deletes='new_record'** - Original records are invalidated (dbt_valid_to set) - New deletion records inserted with dbt_is_deleted=True and actual source column values - Provides complete audit trail of deletions - Resolves malformed output issue where deletion records had NULL values ## Testing - All 3 functional tests passing (ignore, invalidate, new_record) - Code quality checks passing (ruff, ruff-format, mypy) - No regressions in existing snapshot functionality - Verified with Databricks Delta Lake MERGE operations - Tested against Unity Catalog cluster ## Files Modified - dbt/include/databricks/macros/materializations/snapshot_helpers.sql (new, 221 lines) - dbt/include/databricks/macros/materializations/snapshot_merge.sql (new, 32 lines) - tests/functional/adapter/simple_snapshot/test_hard_deletes.py (new, 298 lines) - .gitignore (added docs/plans/ exclusion) Signed-off-by: Randy Pitcher <[email protected]> Co-Authored-By: Claude <[email protected]> Signed-off-by: Randy Pitcher <[email protected]>
1 parent d695920 commit 9259b77

File tree

4 files changed

+552
-0
lines changed

4 files changed

+552
-0
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,3 +24,4 @@ logs/
2424
CLAUDE.md
2525
.claude/
2626
.cursor
27+
uv.lockdocs/plans/
Lines changed: 221 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,221 @@
1+
{% macro databricks__build_snapshot_table(strategy, sql) %}
2+
{%- set columns = config.get("snapshot_table_column_names") or get_snapshot_table_column_names() -%}
3+
{%- set hard_deletes = strategy.hard_deletes -%}
4+
5+
select *,
6+
{{ strategy.scd_id }} as {{ columns.dbt_scd_id }},
7+
{{ strategy.updated_at }} as {{ columns.dbt_updated_at }},
8+
{{ strategy.updated_at }} as {{ columns.dbt_valid_from }},
9+
{{ get_dbt_valid_to_current(strategy, columns) }}
10+
{%- if hard_deletes == 'new_record' -%}
11+
, 'False' as {{ columns.dbt_is_deleted }}
12+
{%- endif %}
13+
from (
14+
{{ sql }}
15+
) sbq
16+
{% endmacro %}
17+
18+
{% macro databricks__snapshot_staging_table(strategy, source_sql, target_relation) -%}
19+
{% set columns = config.get('snapshot_table_column_names') or get_snapshot_table_column_names() %}
20+
{% if strategy.hard_deletes == 'new_record' %}
21+
{% set new_scd_id = snapshot_hash_arguments([columns.dbt_scd_id, snapshot_get_time()]) %}
22+
{% endif %}
23+
with snapshot_query as (
24+
25+
{{ source_sql }}
26+
27+
),
28+
29+
snapshotted_data as (
30+
31+
select *, {{ unique_key_fields(strategy.unique_key) }}
32+
from {{ target_relation }}
33+
where
34+
{% if config.get('dbt_valid_to_current') %}
35+
{% set source_unique_key = columns.dbt_valid_to | trim %}
36+
{% set target_unique_key = config.get('dbt_valid_to_current') | trim %}
37+
38+
{# The exact equals semantics between NULL values depends on the current behavior flag set. Also, update records if the source field is null #}
39+
( {{ equals(source_unique_key, target_unique_key) }} or {{ source_unique_key }} is null )
40+
{% else %}
41+
{{ columns.dbt_valid_to }} is null
42+
{% endif %}
43+
44+
),
45+
46+
insertions_source_data as (
47+
48+
select *, {{ unique_key_fields(strategy.unique_key) }},
49+
{{ strategy.updated_at }} as {{ columns.dbt_updated_at }},
50+
{{ strategy.updated_at }} as {{ columns.dbt_valid_from }},
51+
{{ get_dbt_valid_to_current(strategy, columns) }},
52+
{{ strategy.scd_id }} as {{ columns.dbt_scd_id }}
53+
54+
from snapshot_query
55+
),
56+
57+
updates_source_data as (
58+
59+
select *, {{ unique_key_fields(strategy.unique_key) }},
60+
{{ strategy.updated_at }} as {{ columns.dbt_updated_at }},
61+
{{ strategy.updated_at }} as {{ columns.dbt_valid_from }},
62+
{{ strategy.updated_at }} as {{ columns.dbt_valid_to }}
63+
64+
from snapshot_query
65+
),
66+
67+
{%- if strategy.hard_deletes == 'invalidate' or strategy.hard_deletes == 'new_record' %}
68+
69+
deletes_source_data as (
70+
71+
select *, {{ unique_key_fields(strategy.unique_key) }}
72+
from snapshot_query
73+
),
74+
{% endif %}
75+
76+
insertions as (
77+
78+
select
79+
'insert' as dbt_change_type,
80+
source_data.*
81+
{%- if strategy.hard_deletes == 'new_record' -%}
82+
,'False' as {{ columns.dbt_is_deleted }}
83+
{%- endif %}
84+
85+
from insertions_source_data as source_data
86+
left outer join snapshotted_data
87+
on {{ unique_key_join_on(strategy.unique_key, "snapshotted_data", "source_data") }}
88+
where {{ unique_key_is_null(strategy.unique_key, "snapshotted_data") }}
89+
or ({{ unique_key_is_not_null(strategy.unique_key, "snapshotted_data") }} and (
90+
{{ strategy.row_changed }} {%- if strategy.hard_deletes == 'new_record' -%} or snapshotted_data.{{ columns.dbt_is_deleted }} = 'True' {% endif %}
91+
)
92+
93+
)
94+
95+
),
96+
97+
updates as (
98+
99+
select
100+
'update' as dbt_change_type,
101+
source_data.*,
102+
snapshotted_data.{{ columns.dbt_scd_id }}
103+
{%- if strategy.hard_deletes == 'new_record' -%}
104+
, snapshotted_data.{{ columns.dbt_is_deleted }}
105+
{%- endif %}
106+
107+
from updates_source_data as source_data
108+
join snapshotted_data
109+
on {{ unique_key_join_on(strategy.unique_key, "snapshotted_data", "source_data") }}
110+
where (
111+
{{ strategy.row_changed }} {%- if strategy.hard_deletes == 'new_record' -%} or snapshotted_data.{{ columns.dbt_is_deleted }} = 'True' {% endif %}
112+
)
113+
)
114+
115+
{%- if strategy.hard_deletes == 'invalidate' or strategy.hard_deletes == 'new_record' %}
116+
,
117+
deletes as (
118+
119+
select
120+
'delete' as dbt_change_type,
121+
source_data.*,
122+
{{ snapshot_get_time() }} as {{ columns.dbt_valid_from }},
123+
{{ snapshot_get_time() }} as {{ columns.dbt_updated_at }},
124+
{{ snapshot_get_time() }} as {{ columns.dbt_valid_to }},
125+
snapshotted_data.{{ columns.dbt_scd_id }}
126+
{%- if strategy.hard_deletes == 'new_record' -%}
127+
, snapshotted_data.{{ columns.dbt_is_deleted }}
128+
{%- endif %}
129+
from snapshotted_data
130+
left join deletes_source_data as source_data
131+
on {{ unique_key_join_on(strategy.unique_key, "snapshotted_data", "source_data") }}
132+
where {{ unique_key_is_null(strategy.unique_key, "source_data") }}
133+
134+
{%- if strategy.hard_deletes == 'new_record' %}
135+
and not (
136+
--avoid updating the record's valid_to if the latest entry is marked as deleted
137+
snapshotted_data.{{ columns.dbt_is_deleted }} = 'True'
138+
and
139+
{% if config.get('dbt_valid_to_current') -%}
140+
snapshotted_data.{{ columns.dbt_valid_to }} = {{ config.get('dbt_valid_to_current') }}
141+
{%- else -%}
142+
snapshotted_data.{{ columns.dbt_valid_to }} is null
143+
{%- endif %}
144+
)
145+
{%- endif %}
146+
)
147+
{%- endif %}
148+
149+
{%- if strategy.hard_deletes == 'new_record' %}
150+
{#-- Databricks-specific: Extract column names from agate.Row tuples --#}
151+
{% set target_columns_raw = get_columns_in_relation(target_relation) %}
152+
{% set snapshotted_cols = [] %}
153+
{% for row in target_columns_raw %}
154+
{#-- agate.Row is a tuple: (col_name, data_type, comment) --#}
155+
{#-- Filter out Databricks metadata rows (starting with # or empty) --#}
156+
{% set col_name = row[0] %}
157+
{% if col_name and not col_name.startswith('#') %}
158+
{% do snapshotted_cols.append(col_name) %}
159+
{% endif %}
160+
{% endfor %}
161+
{% set source_sql_cols = get_column_schema_from_query(source_sql) %}
162+
,
163+
deletion_records as (
164+
165+
select
166+
'insert' as dbt_change_type,
167+
{#/*
168+
If a column has been added to the source it won't yet exist in the
169+
snapshotted table so we insert a null value as a placeholder for the column.
170+
*/#}
171+
{%- for col in source_sql_cols -%}
172+
{%- if col.name in snapshotted_cols -%}
173+
snapshotted_data.{{ adapter.quote(col.column) }},
174+
{%- else -%}
175+
NULL as {{ adapter.quote(col.column) }},
176+
{%- endif -%}
177+
{% endfor -%}
178+
{%- if strategy.unique_key | is_list -%}
179+
{%- for key in strategy.unique_key -%}
180+
snapshotted_data.{{ key }} as dbt_unique_key_{{ loop.index }},
181+
{% endfor -%}
182+
{%- else -%}
183+
snapshotted_data.dbt_unique_key as dbt_unique_key,
184+
{% endif -%}
185+
{{ snapshot_get_time() }} as {{ columns.dbt_valid_from }},
186+
{{ snapshot_get_time() }} as {{ columns.dbt_updated_at }},
187+
snapshotted_data.{{ columns.dbt_valid_to }} as {{ columns.dbt_valid_to }},
188+
{{ new_scd_id }} as {{ columns.dbt_scd_id }},
189+
'True' as {{ columns.dbt_is_deleted }}
190+
from snapshotted_data
191+
left join deletes_source_data as source_data
192+
on {{ unique_key_join_on(strategy.unique_key, "snapshotted_data", "source_data") }}
193+
where {{ unique_key_is_null(strategy.unique_key, "source_data") }}
194+
and not (
195+
--avoid inserting a new record if the latest one is marked as deleted
196+
snapshotted_data.{{ columns.dbt_is_deleted }} = 'True'
197+
and
198+
{% if config.get('dbt_valid_to_current') -%}
199+
snapshotted_data.{{ columns.dbt_valid_to }} = {{ config.get('dbt_valid_to_current') }}
200+
{%- else -%}
201+
snapshotted_data.{{ columns.dbt_valid_to }} is null
202+
{%- endif %}
203+
)
204+
205+
)
206+
{%- endif %}
207+
208+
select * from insertions
209+
union all
210+
select * from updates
211+
{%- if strategy.hard_deletes == 'invalidate' or strategy.hard_deletes == 'new_record' %}
212+
union all
213+
select * from deletes
214+
{%- endif %}
215+
{%- if strategy.hard_deletes == 'new_record' %}
216+
union all
217+
select * from deletion_records
218+
{%- endif %}
219+
220+
221+
{%- endmacro %}
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
{% macro databricks__snapshot_merge_sql(target, source, insert_cols) -%}
2+
{%- set insert_cols_csv = insert_cols | join(', ') -%}
3+
{#-- Get the hard_deletes configuration from config --#}
4+
{%- set hard_deletes = config.get('hard_deletes', 'ignore') -%}
5+
{%- set invalidate_hard_deletes = (hard_deletes == 'invalidate') -%}
6+
7+
{#-- Get column names configuration --#}
8+
{%- set columns = config.get("snapshot_table_column_names") or get_snapshot_table_column_names() -%}
9+
10+
merge into {{ target }} as DBT_INTERNAL_DEST
11+
using {{ source }} as DBT_INTERNAL_SOURCE
12+
on DBT_INTERNAL_SOURCE.{{ columns.dbt_scd_id }} = DBT_INTERNAL_DEST.{{ columns.dbt_scd_id }}
13+
14+
when matched
15+
and DBT_INTERNAL_DEST.{{ columns.dbt_valid_to }} is null
16+
and DBT_INTERNAL_SOURCE.dbt_change_type in ('update', 'delete')
17+
then update set
18+
{{ columns.dbt_valid_to }} = DBT_INTERNAL_SOURCE.{{ columns.dbt_valid_to }}
19+
20+
when not matched
21+
and DBT_INTERNAL_SOURCE.dbt_change_type = 'insert'
22+
then insert *
23+
24+
{%- if invalidate_hard_deletes %}
25+
when not matched by source
26+
and DBT_INTERNAL_DEST.{{ columns.dbt_valid_to }} is null
27+
then update set
28+
{{ columns.dbt_valid_to }} = current_timestamp()
29+
{%- endif %}
30+
31+
{%- endmacro %}
32+

0 commit comments

Comments
 (0)