diff --git a/.gitignore b/.gitignore index 371a05eb7..9bcd5304c 100644 --- a/.gitignore +++ b/.gitignore @@ -23,3 +23,6 @@ logs/ .coverage* CLAUDE.md .claude/ +.cursor +uv.lock +docs/plans/ diff --git a/dbt/include/databricks/macros/materializations/snapshot_helpers.sql b/dbt/include/databricks/macros/materializations/snapshot_helpers.sql index 56bfd6e09..a73fdbac7 100644 --- a/dbt/include/databricks/macros/materializations/snapshot_helpers.sql +++ b/dbt/include/databricks/macros/materializations/snapshot_helpers.sql @@ -1,4 +1,10 @@ {% macro databricks__snapshot_merge_sql(target, source, insert_cols) -%} + {%- set insert_cols_csv = insert_cols | join(', ') -%} + {#-- Get the hard_deletes configuration from config --#} + {%- set hard_deletes = config.get('hard_deletes', 'ignore') -%} + {%- set invalidate_hard_deletes = (hard_deletes == 'invalidate') -%} + + {#-- Get column names configuration --#} {%- set columns = config.get("snapshot_table_column_names") or get_snapshot_table_column_names() -%} merge into {{ target }} as DBT_INTERNAL_DEST @@ -22,7 +28,15 @@ when not matched and DBT_INTERNAL_SOURCE.{{ adapter.quote('dbt_change_type') }} = 'insert' - then insert * + then insert ({{ insert_cols_csv }}) + values ({{ insert_cols_csv }}) + + {%- if invalidate_hard_deletes %} + when not matched by source + and DBT_INTERNAL_DEST.{{ columns.dbt_valid_to }} is null + then update set + {{ columns.dbt_valid_to }} = current_timestamp() + {%- endif %} ; {% endmacro %} @@ -37,4 +51,227 @@ ); {% endcall %} {% endif %} -{% endmacro %} \ No newline at end of file +{% endmacro %} + + +{% macro databricks__build_snapshot_table(strategy, sql) %} + {%- set columns = config.get("snapshot_table_column_names") or get_snapshot_table_column_names() -%} + {%- set hard_deletes = strategy.hard_deletes -%} + + select *, + {{ strategy.scd_id }} as {{ columns.dbt_scd_id }}, + {{ strategy.updated_at }} as {{ columns.dbt_updated_at }}, + {{ strategy.updated_at }} as {{ columns.dbt_valid_from }}, + {{ get_dbt_valid_to_current(strategy, columns) }} + {%- if hard_deletes == 'new_record' -%} + , 'False' as {{ columns.dbt_is_deleted }} + {%- endif %} + from ( + {{ sql }} + ) sbq +{% endmacro %} + +{% macro databricks__snapshot_staging_table(strategy, source_sql, target_relation) -%} + {% set columns = config.get('snapshot_table_column_names') or get_snapshot_table_column_names() %} + {% if strategy.hard_deletes == 'new_record' %} + {% set new_scd_id = snapshot_hash_arguments([columns.dbt_scd_id, snapshot_get_time()]) %} + {% endif %} + with snapshot_query as ( + + {{ source_sql }} + + ), + + snapshotted_data as ( + + select *, {{ unique_key_fields(strategy.unique_key) }} + from {{ target_relation }} + where + {% if config.get('dbt_valid_to_current') %} + {% set source_unique_key = columns.dbt_valid_to | trim %} + {% set target_unique_key = config.get('dbt_valid_to_current') | trim %} + + {# The exact equals semantics between NULL values depends on the current behavior flag set. Also, update records if the source field is null #} + ( {{ equals(source_unique_key, target_unique_key) }} or {{ source_unique_key }} is null ) + {% else %} + {{ columns.dbt_valid_to }} is null + {% endif %} + + ), + + insertions_source_data as ( + + select *, {{ unique_key_fields(strategy.unique_key) }}, + {{ strategy.updated_at }} as {{ columns.dbt_updated_at }}, + {{ strategy.updated_at }} as {{ columns.dbt_valid_from }}, + {{ get_dbt_valid_to_current(strategy, columns) }}, + {{ strategy.scd_id }} as {{ columns.dbt_scd_id }} + + from snapshot_query + ), + + updates_source_data as ( + + select *, {{ unique_key_fields(strategy.unique_key) }}, + {{ strategy.updated_at }} as {{ columns.dbt_updated_at }}, + {{ strategy.updated_at }} as {{ columns.dbt_valid_from }}, + {{ strategy.updated_at }} as {{ columns.dbt_valid_to }} + + from snapshot_query + ), + + {%- if strategy.hard_deletes == 'invalidate' or strategy.hard_deletes == 'new_record' %} + + deletes_source_data as ( + + select *, {{ unique_key_fields(strategy.unique_key) }} + from snapshot_query + ), + {% endif %} + + insertions as ( + + select + 'insert' as dbt_change_type, + source_data.* + {%- if strategy.hard_deletes == 'new_record' -%} + ,'False' as {{ columns.dbt_is_deleted }} + {%- endif %} + + from insertions_source_data as source_data + left outer join snapshotted_data + on {{ unique_key_join_on(strategy.unique_key, "snapshotted_data", "source_data") }} + where {{ unique_key_is_null(strategy.unique_key, "snapshotted_data") }} + or ({{ unique_key_is_not_null(strategy.unique_key, "snapshotted_data") }} and ( + {{ strategy.row_changed }} {%- if strategy.hard_deletes == 'new_record' -%} or snapshotted_data.{{ columns.dbt_is_deleted }} = 'True' {% endif %} + ) + + ) + + ), + + updates as ( + + select + 'update' as dbt_change_type, + source_data.*, + snapshotted_data.{{ columns.dbt_scd_id }} + {%- if strategy.hard_deletes == 'new_record' -%} + , snapshotted_data.{{ columns.dbt_is_deleted }} + {%- endif %} + + from updates_source_data as source_data + join snapshotted_data + on {{ unique_key_join_on(strategy.unique_key, "snapshotted_data", "source_data") }} + where ( + {{ strategy.row_changed }} {%- if strategy.hard_deletes == 'new_record' -%} or snapshotted_data.{{ columns.dbt_is_deleted }} = 'True' {% endif %} + ) + ) + + {%- if strategy.hard_deletes == 'invalidate' or strategy.hard_deletes == 'new_record' %} + , + deletes as ( + + select + 'delete' as dbt_change_type, + source_data.*, + {{ snapshot_get_time() }} as {{ columns.dbt_valid_from }}, + {{ snapshot_get_time() }} as {{ columns.dbt_updated_at }}, + {{ snapshot_get_time() }} as {{ columns.dbt_valid_to }}, + snapshotted_data.{{ columns.dbt_scd_id }} + {%- if strategy.hard_deletes == 'new_record' -%} + , snapshotted_data.{{ columns.dbt_is_deleted }} + {%- endif %} + from snapshotted_data + left join deletes_source_data as source_data + on {{ unique_key_join_on(strategy.unique_key, "snapshotted_data", "source_data") }} + where {{ unique_key_is_null(strategy.unique_key, "source_data") }} + + {%- if strategy.hard_deletes == 'new_record' %} + and not ( + --avoid updating the record's valid_to if the latest entry is marked as deleted + snapshotted_data.{{ columns.dbt_is_deleted }} = 'True' + and + {% if config.get('dbt_valid_to_current') -%} + snapshotted_data.{{ columns.dbt_valid_to }} = {{ config.get('dbt_valid_to_current') }} + {%- else -%} + snapshotted_data.{{ columns.dbt_valid_to }} is null + {%- endif %} + ) + {%- endif %} + ) + {%- endif %} + + {%- if strategy.hard_deletes == 'new_record' %} + {#-- Databricks-specific: Extract column names from agate.Row tuples --#} + {% set target_columns_raw = get_columns_in_relation(target_relation) %} + {% set snapshotted_cols = [] %} + {% for row in target_columns_raw %} + {#-- agate.Row is a tuple: (col_name, data_type, comment) --#} + {#-- Filter out Databricks metadata rows (starting with # or empty) --#} + {% set col_name = row[0] %} + {% if col_name and not col_name.startswith('#') %} + {% do snapshotted_cols.append(col_name) %} + {% endif %} + {% endfor %} + {% set source_sql_cols = get_column_schema_from_query(source_sql) %} + , + deletion_records as ( + + select + 'insert' as dbt_change_type, + {#/* + If a column has been added to the source it won't yet exist in the + snapshotted table so we insert a null value as a placeholder for the column. + */#} + {%- for col in source_sql_cols -%} + {%- if col.name in snapshotted_cols -%} + snapshotted_data.{{ adapter.quote(col.column) }}, + {%- else -%} + NULL as {{ adapter.quote(col.column) }}, + {%- endif -%} + {% endfor -%} + {%- if strategy.unique_key | is_list -%} + {%- for key in strategy.unique_key -%} + snapshotted_data.{{ key }} as dbt_unique_key_{{ loop.index }}, + {% endfor -%} + {%- else -%} + snapshotted_data.dbt_unique_key as dbt_unique_key, + {% endif -%} + {{ snapshot_get_time() }} as {{ columns.dbt_valid_from }}, + {{ snapshot_get_time() }} as {{ columns.dbt_updated_at }}, + snapshotted_data.{{ columns.dbt_valid_to }} as {{ columns.dbt_valid_to }}, + {{ new_scd_id }} as {{ columns.dbt_scd_id }}, + 'True' as {{ columns.dbt_is_deleted }} + from snapshotted_data + left join deletes_source_data as source_data + on {{ unique_key_join_on(strategy.unique_key, "snapshotted_data", "source_data") }} + where {{ unique_key_is_null(strategy.unique_key, "source_data") }} + and not ( + --avoid inserting a new record if the latest one is marked as deleted + snapshotted_data.{{ columns.dbt_is_deleted }} = 'True' + and + {% if config.get('dbt_valid_to_current') -%} + snapshotted_data.{{ columns.dbt_valid_to }} = {{ config.get('dbt_valid_to_current') }} + {%- else -%} + snapshotted_data.{{ columns.dbt_valid_to }} is null + {%- endif %} + ) + + ) + {%- endif %} + + select * from insertions + union all + select * from updates + {%- if strategy.hard_deletes == 'invalidate' or strategy.hard_deletes == 'new_record' %} + union all + select * from deletes + {%- endif %} + {%- if strategy.hard_deletes == 'new_record' %} + union all + select * from deletion_records + {%- endif %} + + +{%- endmacro %} diff --git a/tests/functional/adapter/simple_snapshot/test_hard_deletes.py b/tests/functional/adapter/simple_snapshot/test_hard_deletes.py new file mode 100644 index 000000000..c55c8c9bd --- /dev/null +++ b/tests/functional/adapter/simple_snapshot/test_hard_deletes.py @@ -0,0 +1,298 @@ +""" +Tests for hard_deletes configuration in snapshots. + +Tests all three hard_deletes modes: +- ignore: deleted records are not tracked in snapshot +- invalidate: deleted records have dbt_valid_to set +- new_record: deleted records get new row with dbt_is_deleted=True +""" + +import pytest + +from dbt.tests.util import run_dbt + +# Snapshot SQL templates for each hard_deletes mode +snapshot_sql_ignore = """ +{% snapshot snapshot_hard_delete_ignore %} + {{ + config( + target_database=database, + target_schema=schema, + unique_key='id', + strategy='check', + check_cols=['name', 'city'], + hard_deletes='ignore', + ) + }} + select * from {{ schema }}.seed_hard_delete +{% endsnapshot %} +""" + +snapshot_sql_invalidate = """ +{% snapshot snapshot_hard_delete_invalidate %} + {{ + config( + target_database=database, + target_schema=schema, + unique_key='id', + strategy='check', + check_cols=['name', 'city'], + hard_deletes='invalidate', + ) + }} + select * from {{ schema }}.seed_hard_delete +{% endsnapshot %} +""" + +snapshot_sql_new_record = """ +{% snapshot snapshot_hard_delete_new_record %} + {{ + config( + target_database=database, + target_schema=schema, + unique_key='id', + strategy='check', + check_cols=['name', 'city'], + hard_deletes='new_record', + ) + }} + select * from {{ schema }}.seed_hard_delete +{% endsnapshot %} +""" + + +class BaseHardDeleteTest: + """Base class for hard delete tests""" + + def setup_initial_data(self, project): + """Create initial seed data with 5 records""" + create_seed_sql = f""" + create table {project.test_schema}.seed_hard_delete ( + id integer, + name string, + city string, + updated_at timestamp + ) + """ + project.run_sql(create_seed_sql) + + insert_seed_sql = f""" + insert into {project.test_schema}.seed_hard_delete (id, name, city, updated_at) values + (1, 'Alice', 'London', current_timestamp()), + (2, 'Bob', 'Paris', current_timestamp()), + (3, 'Charlie', 'Berlin', current_timestamp()), + (4, 'Diana', 'Madrid', current_timestamp()), + (5, 'Eve', 'Rome', current_timestamp()) + """ + project.run_sql(insert_seed_sql) + + def delete_records(self, project, ids_to_delete): + """Delete specific records from seed table""" + ids_str = ",".join(str(id) for id in ids_to_delete) + delete_sql = f""" + delete from {project.test_schema}.seed_hard_delete where id in ({ids_str}) + """ + project.run_sql(delete_sql) + + def get_snapshot_records(self, project, snapshot_name): + """Get all records from snapshot table""" + query = f"select * from {project.test_schema}.{snapshot_name} order by id, dbt_valid_from" + return project.run_sql(query, fetch="all") + + def count_records_by_id(self, project, snapshot_name, record_id): + """Count how many snapshot records exist for a given id""" + query = f""" + select count(*) from {project.test_schema}.{snapshot_name} + where id = {record_id} + """ + result = project.run_sql(query, fetch="one") + return result[0] + + +class TestHardDeleteIgnore(BaseHardDeleteTest): + """Test hard_deletes='ignore' mode""" + + @pytest.fixture(scope="class") + def snapshots(self): + return {"snapshot_hard_delete_ignore.sql": snapshot_sql_ignore} + + def test_hard_delete_ignore(self, project): + """ + Test that with hard_deletes='ignore', deleted records remain unchanged in snapshot. + + Expected behavior: + - After deletion, snapshot should still contain all original records + - No new records should be added + - dbt_valid_to should remain NULL for deleted records + """ + # Setup initial data + self.setup_initial_data(project) + + # Run initial snapshot + results = run_dbt(["snapshot"]) + assert len(results) == 1 + + # Verify initial snapshot has 5 records + initial_records = self.get_snapshot_records(project, "snapshot_hard_delete_ignore") + assert len(initial_records) == 5 + + # Delete records 3 and 4 from source + self.delete_records(project, [3, 4]) + + # Run snapshot again + results = run_dbt(["snapshot"]) + assert len(results) == 1 + + # With 'ignore', snapshot should still have 5 records + # (no change - deleted records remain in snapshot) + final_records = self.get_snapshot_records(project, "snapshot_hard_delete_ignore") + assert len(final_records) == 5, ( + f"Expected 5 records with hard_deletes='ignore', got {len(final_records)}. " + "Deleted records should remain unchanged in snapshot." + ) + + # Verify deleted records (ids 3 and 4) still have NULL dbt_valid_to + # Snapshot columns: id, name, city, updated_at, dbt_scd_id, + # dbt_updated_at, dbt_valid_from, dbt_valid_to + # dbt_valid_to is the last column (index -1) + deleted_ids_found = [] + for record in final_records: + if record[0] in [3, 4]: # id is first column + deleted_ids_found.append(record[0]) + dbt_valid_to = record[-1] # last column + assert dbt_valid_to is None, ( + f"Record id={record[0]} should have NULL dbt_valid_to " + f"with hard_deletes='ignore', but got: {dbt_valid_to}" + ) + + assert len(deleted_ids_found) == 2, ( + f"Should find both deleted records (3 and 4) in snapshot with hard_deletes='ignore', " + f"but found: {deleted_ids_found}" + ) + + +class TestHardDeleteInvalidate(BaseHardDeleteTest): + """Test hard_deletes='invalidate' mode""" + + @pytest.fixture(scope="class") + def snapshots(self): + return {"snapshot_hard_delete_invalidate.sql": snapshot_sql_invalidate} + + def test_hard_delete_invalidate(self, project): + """ + Test that with hard_deletes='invalidate', deleted records have dbt_valid_to set. + + Expected behavior: + - Deleted records should have dbt_valid_to set to a timestamp + - No new records should be added + - Total record count remains the same + """ + # Setup initial data + self.setup_initial_data(project) + + # Run initial snapshot + results = run_dbt(["snapshot"]) + assert len(results) == 1 + + # Verify initial snapshot has 5 records + initial_records = self.get_snapshot_records(project, "snapshot_hard_delete_invalidate") + assert len(initial_records) == 5 + + # Delete records 3 and 4 + self.delete_records(project, [3, 4]) + + # Run snapshot again + results = run_dbt(["snapshot"]) + assert len(results) == 1 + + # With 'invalidate', snapshot should still have 5 records + final_records = self.get_snapshot_records(project, "snapshot_hard_delete_invalidate") + assert ( + len(final_records) == 5 + ), f"Expected 5 records with hard_deletes='invalidate', got {len(final_records)}" + + # Verify deleted records (3, 4) have dbt_valid_to set (not NULL) + # Snapshot columns: id, name, city, updated_at, dbt_scd_id, + # dbt_updated_at, dbt_valid_from, dbt_valid_to + # dbt_valid_to is the last column (index -1) + invalidated_count = 0 + for record in final_records: + if record[0] in [3, 4]: # id column + # dbt_valid_to should NOT be NULL + dbt_valid_to = record[-1] # last column + assert dbt_valid_to is not None, ( + f"Record id={record[0]} should have dbt_valid_to set " + f"with hard_deletes='invalidate', but got {dbt_valid_to}" + ) + invalidated_count += 1 + + assert invalidated_count == 2, f"Expected 2 invalidated records, found {invalidated_count}" + + +class TestHardDeleteNewRecord(BaseHardDeleteTest): + """Test hard_deletes='new_record' mode""" + + @pytest.fixture(scope="class") + def snapshots(self): + return {"snapshot_hard_delete_new_record.sql": snapshot_sql_new_record} + + def test_hard_delete_new_record(self, project): + """ + Test that with hard_deletes='new_record', deleted records get new rows + with dbt_is_deleted=True. + + Expected behavior: + - Original records should have dbt_valid_to set + - New records should be inserted with dbt_is_deleted=True + - Total record count increases by number of deleted records + """ + # Setup initial data + self.setup_initial_data(project) + + # Run initial snapshot + results = run_dbt(["snapshot"]) + assert len(results) == 1 + + # Verify initial snapshot has 5 records + initial_records = self.get_snapshot_records(project, "snapshot_hard_delete_new_record") + assert len(initial_records) == 5 + + # Delete records 3 and 4 + self.delete_records(project, [3, 4]) + + # Run snapshot again + results = run_dbt(["snapshot"]) + assert len(results) == 1 + + # With 'new_record', snapshot should have 7 records (5 original + 2 new deletion records) + final_records = self.get_snapshot_records(project, "snapshot_hard_delete_new_record") + assert len(final_records) == 7, ( + f"Expected 7 records with hard_deletes='new_record' (5 original + 2 deletion records), " + f"got {len(final_records)}" + ) + + # Verify we have 2 records for each deleted id (3 and 4) + count_id_3 = self.count_records_by_id(project, "snapshot_hard_delete_new_record", 3) + count_id_4 = self.count_records_by_id(project, "snapshot_hard_delete_new_record", 4) + + assert count_id_3 == 2, f"Expected 2 records for id=3, got {count_id_3}" + assert count_id_4 == 2, f"Expected 2 records for id=4, got {count_id_4}" + + # Check for dbt_is_deleted column existence and values + # Note: This requires checking if the column exists in the snapshot + check_deleted_sql = f""" + select count(*) from {project.test_schema}.snapshot_hard_delete_new_record + where dbt_is_deleted = true and id in (3, 4) + """ + + try: + deleted_records = project.run_sql(check_deleted_sql, fetch="one") + assert deleted_records[0] == 2, ( + f"Expected 2 records with dbt_is_deleted=true for ids 3 and 4, " + f"got {deleted_records[0]}" + ) + except Exception as e: + # If dbt_is_deleted column doesn't exist, the test should fail + pytest.fail( + f"dbt_is_deleted column should exist with hard_deletes='new_record'. Error: {e}" + )