diff --git a/.gitignore b/.gitignore index 371a05eb7..9bcd5304c 100644 --- a/.gitignore +++ b/.gitignore @@ -23,3 +23,6 @@ logs/ .coverage* CLAUDE.md .claude/ +.cursor +uv.lock +docs/plans/ diff --git a/AGENTS.md b/AGENTS.md index ec8734069..66bbf1c80 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -261,6 +261,23 @@ DatabricksAdapter (impl.py) - Implement Databricks features (liquid clustering, column masks, tags) - **Important**: To override a `spark__macro_name` macro, create `databricks__macro_name` (NOT `spark__macro_name`) +**Jinja2 Whitespace Control:** +- **Prefer using `-` in Jinja tags** (`{%-`, `-%}`) to strip whitespace and avoid blank lines in generated SQL +- Good: `{%- if condition -%}` - strips whitespace before and after +- Without `-`: `{% if condition %}` - may leave blank lines in output +- This keeps generated SQL clean and readable, especially for conditional column additions +- Note: Sometimes whitespace stripping can break formatting, so use judgment +- Example: + ```jinja + select + column1, + column2 + {%- if config.get('extra_column') -%} + , extra_column + {%- endif %} + from table + ``` + #### Multi-Statement SQL Execution When a macro needs to execute multiple SQL statements (e.g., DELETE followed by INSERT), use the `execute_multiple_statements` helper: @@ -390,6 +407,9 @@ Models can be configured with Databricks-specific options: - ❌ `if adapter.compare_dbr_version(16, 1) >= 0:` - ✅ `if adapter.has_capability(DBRCapability.COMMENT_ON_COLUMN):` - ✅ `{% if adapter.has_dbr_capability('comment_on_column') %}` +8. **Jinja2 Whitespace**: Prefer using `-` in Jinja tags (`{%-`, `-%}`) to strip whitespace and prevent blank lines in generated SQL: + - Preferred: `{%- if condition -%}` + - Without: `{% if condition %}` (may create blank lines) ## 🚨 Common Pitfalls for Agents diff --git a/CHANGELOG.md b/CHANGELOG.md index 1bcf47424..3b0264e2c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,13 @@ ## dbt-databricks 1.11.1 (TBD) +### Features + +- Add support for `hard_deletes='new_record'` in snapshot materializations, enabling tracking of deleted source records with dedicated deletion records marked by `dbt_is_deleted` column (thanks @randypitcherii!) ([#1176](https://github.com/databricks/dbt-databricks/issues/1176), [#1263](https://github.com/databricks/dbt-databricks/pull/1263)) + - Implements complete support for all three `hard_deletes` modes: `ignore` (default), `invalidate`, and `new_record` + - `new_record` mode creates deletion records with actual source column values and `dbt_is_deleted=true` for full audit trail + - `invalidate` mode uses Delta Lake's `WHEN NOT MATCHED BY SOURCE` clause to set `dbt_valid_to` on deleted records + - Uses Databricks native BOOLEAN type for `dbt_is_deleted` column for improved type safety and performance + ### Fixes - Fix bug that was applying UniForm tblproperties on managed Iceberg tables causing materializations to fail diff --git a/dbt/include/databricks/macros/materializations/snapshot_helpers.sql b/dbt/include/databricks/macros/materializations/snapshot_helpers.sql index 56bfd6e09..71142bdc9 100644 --- a/dbt/include/databricks/macros/materializations/snapshot_helpers.sql +++ b/dbt/include/databricks/macros/materializations/snapshot_helpers.sql @@ -1,40 +1,277 @@ {% macro databricks__snapshot_merge_sql(target, source, insert_cols) -%} + {%- set insert_cols_csv = insert_cols | join(', ') -%} + {# Get the hard_deletes configuration from config #} + {%- set hard_deletes = config.get('hard_deletes', 'ignore') -%} + {%- set invalidate_hard_deletes = (hard_deletes == 'invalidate') -%} + + {# Get column names configuration #} {%- set columns = config.get("snapshot_table_column_names") or get_snapshot_table_column_names() -%} merge into {{ target }} as DBT_INTERNAL_DEST - {% if target.is_iceberg %} - {# create view only supports a name (no catalog, or schema) #} - using {{ source.identifier }} as DBT_INTERNAL_SOURCE - {% else %} - using {{ source }} as DBT_INTERNAL_SOURCE - {% endif %} + {%- if target.is_iceberg %} + {# create view only supports a name (no catalog, or schema) #} + using {{ source.identifier }} as DBT_INTERNAL_SOURCE + {%- else %} + using {{ source }} as DBT_INTERNAL_SOURCE + {%- endif %} on DBT_INTERNAL_SOURCE.{{ adapter.quote(columns.dbt_scd_id) }} = DBT_INTERNAL_DEST.{{ adapter.quote(columns.dbt_scd_id) }} when matched - {% if config.get("dbt_valid_to_current") %} - and ( DBT_INTERNAL_DEST.{{ adapter.quote(columns.dbt_valid_to) }} = {{ config.get('dbt_valid_to_current') }} or - DBT_INTERNAL_DEST.{{ adapter.quote(columns.dbt_valid_to) }} is null ) - {% else %} - and DBT_INTERNAL_DEST.{{ adapter.quote(columns.dbt_valid_to) }} is null - {% endif %} - and DBT_INTERNAL_SOURCE.{{ adapter.quote('dbt_change_type') }} in ('update', 'delete') + {%- if config.get("dbt_valid_to_current") %} + and ( DBT_INTERNAL_DEST.{{ adapter.quote(columns.dbt_valid_to) }} = {{ config.get('dbt_valid_to_current') }} or + DBT_INTERNAL_DEST.{{ adapter.quote(columns.dbt_valid_to) }} is null ) + {%- else %} + and DBT_INTERNAL_DEST.{{ adapter.quote(columns.dbt_valid_to) }} is null + {%- endif %} + and DBT_INTERNAL_SOURCE.{{ adapter.quote('dbt_change_type') }} in ('update', 'delete') then update set {{ adapter.quote(columns.dbt_valid_to) }} = DBT_INTERNAL_SOURCE.{{ adapter.quote(columns.dbt_valid_to) }} when not matched - and DBT_INTERNAL_SOURCE.{{ adapter.quote('dbt_change_type') }} = 'insert' - then insert * + and DBT_INTERNAL_SOURCE.{{ adapter.quote('dbt_change_type') }} = 'insert' + then insert ({{ insert_cols_csv }}) + values ({{ insert_cols_csv }}) + + {%- if invalidate_hard_deletes %} + when not matched by source + and DBT_INTERNAL_DEST.{{ columns.dbt_valid_to }} is null + then update set + {{ columns.dbt_valid_to }} = current_timestamp() + {%- endif %} ; {% endmacro %} {% macro databricks__create_columns(relation, columns) %} - {% if columns|length > 0 %} - {% call statement() %} - alter table {{ relation }} add columns ( - {% for column in columns %} - {{ adapter.quote(column.name) }} {{ column.data_type }} {{- ',' if not loop.last -}} - {% endfor %} - ); - {% endcall %} + {%- if columns|length > 0 %} + {%- call statement() %} + alter table {{ relation }} add columns ( + {%- for column in columns %} + {{ adapter.quote(column.name) }} {{ column.data_type }} {{- ',' if not loop.last -}} + {%- endfor %} + ); + {%- endcall %} + {%- endif %} +{% endmacro %} + + +{% macro databricks__build_snapshot_table(strategy, sql) %} + {%- set columns = config.get("snapshot_table_column_names") or get_snapshot_table_column_names() -%} + {%- set hard_deletes = strategy.hard_deletes -%} + + select *, + {{ strategy.scd_id }} as {{ columns.dbt_scd_id }}, + {{ strategy.updated_at }} as {{ columns.dbt_updated_at }}, + {{ strategy.updated_at }} as {{ columns.dbt_valid_from }}, + {{ get_dbt_valid_to_current(strategy, columns) }} + {%- if hard_deletes == 'new_record' -%} + , false as {{ columns.dbt_is_deleted }} + {%- endif %} + from ( + {{ sql }} + ) sbq +{% endmacro %} + +{% macro databricks__snapshot_staging_table(strategy, source_sql, target_relation) -%} + {% set columns = config.get('snapshot_table_column_names') or get_snapshot_table_column_names() %} + {% if strategy.hard_deletes == 'new_record' %} + {% set new_scd_id = snapshot_hash_arguments([columns.dbt_scd_id, snapshot_get_time()]) %} {% endif %} -{% endmacro %} \ No newline at end of file + with snapshot_query as ( + + {{ source_sql }} + + ), + + snapshotted_data as ( + + select *, {{ unique_key_fields(strategy.unique_key) }} + from {{ target_relation }} + where + {% if config.get('dbt_valid_to_current') %} + {% set source_unique_key = columns.dbt_valid_to | trim %} + {% set target_unique_key = config.get('dbt_valid_to_current') | trim %} + + {# The exact equals semantics between NULL values depends on the current behavior flag set. Also, update records if the source field is null #} + ( {{ equals(source_unique_key, target_unique_key) }} or {{ source_unique_key }} is null ) + {% else %} + {{ columns.dbt_valid_to }} is null + {% endif %} + + ), + + insertions_source_data as ( + + select *, {{ unique_key_fields(strategy.unique_key) }}, + {{ strategy.updated_at }} as {{ columns.dbt_updated_at }}, + {{ strategy.updated_at }} as {{ columns.dbt_valid_from }}, + {{ get_dbt_valid_to_current(strategy, columns) }}, + {{ strategy.scd_id }} as {{ columns.dbt_scd_id }} + + from snapshot_query + ), + + updates_source_data as ( + + select *, {{ unique_key_fields(strategy.unique_key) }}, + {{ strategy.updated_at }} as {{ columns.dbt_updated_at }}, + {{ strategy.updated_at }} as {{ columns.dbt_valid_from }}, + {{ strategy.updated_at }} as {{ columns.dbt_valid_to }} + + from snapshot_query + ), + + {%- if strategy.hard_deletes == 'invalidate' or strategy.hard_deletes == 'new_record' %} + + deletes_source_data as ( + + select *, {{ unique_key_fields(strategy.unique_key) }} + from snapshot_query + ), + {% endif %} + + insertions as ( + + select + 'insert' as dbt_change_type, + source_data.* + {%- if strategy.hard_deletes == 'new_record' -%} + , false as {{ columns.dbt_is_deleted }} + {%- endif %} + + from insertions_source_data as source_data + left outer join snapshotted_data + on {{ unique_key_join_on(strategy.unique_key, "snapshotted_data", "source_data") }} + where {{ unique_key_is_null(strategy.unique_key, "snapshotted_data") }} + or ({{ unique_key_is_not_null(strategy.unique_key, "snapshotted_data") }} and ( + {{ strategy.row_changed }} {%- if strategy.hard_deletes == 'new_record' -%} or snapshotted_data.{{ columns.dbt_is_deleted }} = true {% endif %} + ) + + ) + + ), + + updates as ( + + select + 'update' as dbt_change_type, + source_data.*, + snapshotted_data.{{ columns.dbt_scd_id }} + {%- if strategy.hard_deletes == 'new_record' -%} + , snapshotted_data.{{ columns.dbt_is_deleted }} + {%- endif %} + + from updates_source_data as source_data + join snapshotted_data + on {{ unique_key_join_on(strategy.unique_key, "snapshotted_data", "source_data") }} + where ( + {{ strategy.row_changed }} {%- if strategy.hard_deletes == 'new_record' -%} or snapshotted_data.{{ columns.dbt_is_deleted }} = true {% endif %} + ) + ) + + {%- if strategy.hard_deletes == 'invalidate' or strategy.hard_deletes == 'new_record' %} + , + deletes as ( + + select + 'delete' as dbt_change_type, + source_data.*, + {{ snapshot_get_time() }} as {{ columns.dbt_valid_from }}, + {{ snapshot_get_time() }} as {{ columns.dbt_updated_at }}, + {{ snapshot_get_time() }} as {{ columns.dbt_valid_to }}, + snapshotted_data.{{ columns.dbt_scd_id }} + {%- if strategy.hard_deletes == 'new_record' -%} + , snapshotted_data.{{ columns.dbt_is_deleted }} + {%- endif %} + from snapshotted_data + left join deletes_source_data as source_data + on {{ unique_key_join_on(strategy.unique_key, "snapshotted_data", "source_data") }} + where {{ unique_key_is_null(strategy.unique_key, "source_data") }} + + {%- if strategy.hard_deletes == 'new_record' %} + and not ( + -- avoid updating the record's valid_to if the latest entry is marked as deleted + snapshotted_data.{{ columns.dbt_is_deleted }} = true + and + {% if config.get('dbt_valid_to_current') -%} + snapshotted_data.{{ columns.dbt_valid_to }} = {{ config.get('dbt_valid_to_current') }} + {%- else -%} + snapshotted_data.{{ columns.dbt_valid_to }} is null + {%- endif %} + ) + {%- endif %} + ) + {%- endif %} + + {%- if strategy.hard_deletes == 'new_record' %} + {# Databricks-specific: Extract column names from agate.Row tuples #} + {% set target_columns_raw = get_columns_in_relation(target_relation) %} + {% set snapshotted_cols = [] %} + {% for row in target_columns_raw %} + {# agate.Row is a tuple: (col_name, data_type, comment) #} + {# Filter out Databricks metadata rows (starting with # or empty) #} + {% set col_name = row[0] %} + {% if col_name and not col_name.startswith('#') %} + {% do snapshotted_cols.append(col_name) %} + {% endif %} + {% endfor %} + {% set source_sql_cols = get_column_schema_from_query(source_sql) %} + , + deletion_records as ( + + select + 'insert' as dbt_change_type, + {# + If a column has been added to the source it won't yet exist in the + snapshotted table so we insert a null value as a placeholder for the column. + #} + {%- for col in source_sql_cols -%} + {%- if col.name in snapshotted_cols -%} + snapshotted_data.{{ adapter.quote(col.column) }}, + {%- else -%} + NULL as {{ adapter.quote(col.column) }}, + {%- endif -%} + {% endfor -%} + {%- if strategy.unique_key | is_list -%} + {%- for key in strategy.unique_key -%} + snapshotted_data.{{ key }} as dbt_unique_key_{{ loop.index }}, + {% endfor -%} + {%- else -%} + snapshotted_data.dbt_unique_key as dbt_unique_key, + {% endif -%} + {{ snapshot_get_time() }} as {{ columns.dbt_valid_from }}, + {{ snapshot_get_time() }} as {{ columns.dbt_updated_at }}, + snapshotted_data.{{ columns.dbt_valid_to }} as {{ columns.dbt_valid_to }}, + {{ new_scd_id }} as {{ columns.dbt_scd_id }}, + true as {{ columns.dbt_is_deleted }} + from snapshotted_data + left join deletes_source_data as source_data + on {{ unique_key_join_on(strategy.unique_key, "snapshotted_data", "source_data") }} + where {{ unique_key_is_null(strategy.unique_key, "source_data") }} + and not ( + -- avoid inserting a new record if the latest one is marked as deleted + snapshotted_data.{{ columns.dbt_is_deleted }} = true + and + {% if config.get('dbt_valid_to_current') -%} + snapshotted_data.{{ columns.dbt_valid_to }} = {{ config.get('dbt_valid_to_current') }} + {%- else -%} + snapshotted_data.{{ columns.dbt_valid_to }} is null + {%- endif %} + ) + + ) + {%- endif %} + + select * from insertions + union all + select * from updates + {%- if strategy.hard_deletes == 'invalidate' or strategy.hard_deletes == 'new_record' %} + union all + select * from deletes + {%- endif %} + {%- if strategy.hard_deletes == 'new_record' %} + union all + select * from deletion_records + {%- endif %} + + +{%- endmacro %} diff --git a/pyproject.toml b/pyproject.toml index 65dd08d7f..97859cdf3 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -77,7 +77,6 @@ dependencies = [ "debugpy", "pytest-cov", ] -path = ".hatch" python = "3.10" [tool.hatch.envs.default.scripts] diff --git a/tests/functional/adapter/simple_snapshot/fixtures.py b/tests/functional/adapter/simple_snapshot/fixtures.py index e8b7ec885..8e9565406 100644 --- a/tests/functional/adapter/simple_snapshot/fixtures.py +++ b/tests/functional/adapter/simple_snapshot/fixtures.py @@ -26,3 +26,52 @@ select 1 as id {% endsnapshot %} """ + +# Hard deletes test fixtures +snapshot_hard_delete_ignore_sql = """ +{% snapshot snapshot_hard_delete_ignore %} + {{ + config( + target_database=database, + target_schema=schema, + unique_key='id', + strategy='check', + check_cols=['name', 'city'], + hard_deletes='ignore', + ) + }} + select * from {{ schema }}.seed_hard_delete +{% endsnapshot %} +""" + +snapshot_hard_delete_invalidate_sql = """ +{% snapshot snapshot_hard_delete_invalidate %} + {{ + config( + target_database=database, + target_schema=schema, + unique_key='id', + strategy='check', + check_cols=['name', 'city'], + hard_deletes='invalidate', + ) + }} + select * from {{ schema }}.seed_hard_delete +{% endsnapshot %} +""" + +snapshot_hard_delete_new_record_sql = """ +{% snapshot snapshot_hard_delete_new_record %} + {{ + config( + target_database=database, + target_schema=schema, + unique_key='id', + strategy='check', + check_cols=['name', 'city'], + hard_deletes='new_record', + ) + }} + select * from {{ schema }}.seed_hard_delete +{% endsnapshot %} +""" diff --git a/tests/functional/adapter/simple_snapshot/test_hard_deletes.py b/tests/functional/adapter/simple_snapshot/test_hard_deletes.py new file mode 100644 index 000000000..8c426b25a --- /dev/null +++ b/tests/functional/adapter/simple_snapshot/test_hard_deletes.py @@ -0,0 +1,250 @@ +""" +Tests for hard_deletes configuration in snapshots. + +Tests all three hard_deletes modes: +- ignore: deleted records are not tracked in snapshot +- invalidate: deleted records have dbt_valid_to set +- new_record: deleted records get new row with dbt_is_deleted=True +""" + +import pytest +from dbt.tests.util import run_dbt + +from tests.functional.adapter.simple_snapshot import fixtures + + +class BaseHardDeleteTest: + """Base class for hard delete tests""" + + def setup_initial_data(self, project): + """Create initial seed data with 5 records""" + create_seed_sql = f""" + create table {project.test_schema}.seed_hard_delete ( + id integer, + name string, + city string, + updated_at timestamp + ) + """ + project.run_sql(create_seed_sql) + + insert_seed_sql = f""" + insert into {project.test_schema}.seed_hard_delete (id, name, city, updated_at) values + (1, 'Alice', 'London', current_timestamp()), + (2, 'Bob', 'Paris', current_timestamp()), + (3, 'Charlie', 'Berlin', current_timestamp()), + (4, 'Diana', 'Madrid', current_timestamp()), + (5, 'Eve', 'Rome', current_timestamp()) + """ + project.run_sql(insert_seed_sql) + + def delete_records(self, project, ids_to_delete): + """Delete specific records from seed table""" + ids_str = ",".join(str(id) for id in ids_to_delete) + delete_sql = f""" + delete from {project.test_schema}.seed_hard_delete where id in ({ids_str}) + """ + project.run_sql(delete_sql) + + def get_snapshot_records(self, project, snapshot_name): + """Get all records from snapshot table""" + query = f"select * from {project.test_schema}.{snapshot_name} order by id, dbt_valid_from" + return project.run_sql(query, fetch="all") + + def count_records_by_id(self, project, snapshot_name, record_id): + """Count how many snapshot records exist for a given id""" + query = f""" + select count(*) from {project.test_schema}.{snapshot_name} + where id = {record_id} + """ + result = project.run_sql(query, fetch="one") + return result[0] + + +class TestHardDeleteIgnore(BaseHardDeleteTest): + """Test hard_deletes='ignore' mode""" + + @pytest.fixture(scope="class") + def snapshots(self): + return {"snapshot_hard_delete_ignore.sql": fixtures.snapshot_hard_delete_ignore_sql} + + def test_hard_delete_ignore(self, project): + """ + Test that with hard_deletes='ignore', deleted records remain unchanged in snapshot. + + Expected behavior: + - After deletion, snapshot should still contain all original records + - No new records should be added + - dbt_valid_to should remain NULL for deleted records + """ + # Setup initial data + self.setup_initial_data(project) + + # Run initial snapshot + results = run_dbt(["snapshot"]) + assert len(results) == 1 + + # Verify initial snapshot has 5 records + initial_records = self.get_snapshot_records(project, "snapshot_hard_delete_ignore") + assert len(initial_records) == 5 + + # Delete records 3 and 4 from source + self.delete_records(project, [3, 4]) + + # Run snapshot again + results = run_dbt(["snapshot"]) + assert len(results) == 1 + + # With 'ignore', snapshot should still have 5 records + # (no change - deleted records remain in snapshot) + final_records = self.get_snapshot_records(project, "snapshot_hard_delete_ignore") + assert len(final_records) == 5, ( + f"Expected 5 records with hard_deletes='ignore', got {len(final_records)}. " + "Deleted records should remain unchanged in snapshot." + ) + + # Verify deleted records (ids 3 and 4) still have NULL dbt_valid_to + # Snapshot columns: id, name, city, updated_at, dbt_scd_id, + # dbt_updated_at, dbt_valid_from, dbt_valid_to + # dbt_valid_to is the last column (index -1) + deleted_ids_found = [] + for record in final_records: + if record[0] in [3, 4]: # id is first column + deleted_ids_found.append(record[0]) + dbt_valid_to = record[-1] # last column + assert dbt_valid_to is None, ( + f"Record id={record[0]} should have NULL dbt_valid_to " + f"with hard_deletes='ignore', but got: {dbt_valid_to}" + ) + + assert len(deleted_ids_found) == 2, ( + f"Should find both deleted records (3 and 4) in snapshot with hard_deletes='ignore', " + f"but found: {deleted_ids_found}" + ) + + +class TestHardDeleteInvalidate(BaseHardDeleteTest): + """Test hard_deletes='invalidate' mode""" + + @pytest.fixture(scope="class") + def snapshots(self): + return {"snapshot_hard_delete_invalidate.sql": fixtures.snapshot_hard_delete_invalidate_sql} + + def test_hard_delete_invalidate(self, project): + """ + Test that with hard_deletes='invalidate', deleted records have dbt_valid_to set. + + Expected behavior: + - Deleted records should have dbt_valid_to set to a timestamp + - No new records should be added + - Total record count remains the same + """ + # Setup initial data + self.setup_initial_data(project) + + # Run initial snapshot + results = run_dbt(["snapshot"]) + assert len(results) == 1 + + # Verify initial snapshot has 5 records + initial_records = self.get_snapshot_records(project, "snapshot_hard_delete_invalidate") + assert len(initial_records) == 5 + + # Delete records 3 and 4 + self.delete_records(project, [3, 4]) + + # Run snapshot again + results = run_dbt(["snapshot"]) + assert len(results) == 1 + + # With 'invalidate', snapshot should still have 5 records + final_records = self.get_snapshot_records(project, "snapshot_hard_delete_invalidate") + assert len(final_records) == 5, ( + f"Expected 5 records with hard_deletes='invalidate', got {len(final_records)}" + ) + + # Verify deleted records (3, 4) have dbt_valid_to set (not NULL) + # Snapshot columns: id, name, city, updated_at, dbt_scd_id, + # dbt_updated_at, dbt_valid_from, dbt_valid_to + # dbt_valid_to is the last column (index -1) + invalidated_count = 0 + for record in final_records: + if record[0] in [3, 4]: # id column + # dbt_valid_to should NOT be NULL + dbt_valid_to = record[-1] # last column + assert dbt_valid_to is not None, ( + f"Record id={record[0]} should have dbt_valid_to set " + f"with hard_deletes='invalidate', but got {dbt_valid_to}" + ) + invalidated_count += 1 + + assert invalidated_count == 2, f"Expected 2 invalidated records, found {invalidated_count}" + + +class TestHardDeleteNewRecord(BaseHardDeleteTest): + """Test hard_deletes='new_record' mode""" + + @pytest.fixture(scope="class") + def snapshots(self): + return {"snapshot_hard_delete_new_record.sql": fixtures.snapshot_hard_delete_new_record_sql} + + def test_hard_delete_new_record(self, project): + """ + Test that with hard_deletes='new_record', deleted records get new rows + with dbt_is_deleted=True. + + Expected behavior: + - Original records should have dbt_valid_to set + - New records should be inserted with dbt_is_deleted=True + - Total record count increases by number of deleted records + """ + # Setup initial data + self.setup_initial_data(project) + + # Run initial snapshot + results = run_dbt(["snapshot"]) + assert len(results) == 1 + + # Verify initial snapshot has 5 records + initial_records = self.get_snapshot_records(project, "snapshot_hard_delete_new_record") + assert len(initial_records) == 5 + + # Delete records 3 and 4 + self.delete_records(project, [3, 4]) + + # Run snapshot again + results = run_dbt(["snapshot"]) + assert len(results) == 1 + + # With 'new_record', snapshot should have 7 records (5 original + 2 new deletion records) + final_records = self.get_snapshot_records(project, "snapshot_hard_delete_new_record") + assert len(final_records) == 7, ( + f"Expected 7 records with hard_deletes='new_record' (5 original + 2 deletion records), " + f"got {len(final_records)}" + ) + + # Verify we have 2 records for each deleted id (3 and 4) + count_id_3 = self.count_records_by_id(project, "snapshot_hard_delete_new_record", 3) + count_id_4 = self.count_records_by_id(project, "snapshot_hard_delete_new_record", 4) + + assert count_id_3 == 2, f"Expected 2 records for id=3, got {count_id_3}" + assert count_id_4 == 2, f"Expected 2 records for id=4, got {count_id_4}" + + # Check for dbt_is_deleted column existence and values + # Note: This requires checking if the column exists in the snapshot + check_deleted_sql = f""" + select count(*) from {project.test_schema}.snapshot_hard_delete_new_record + where dbt_is_deleted = true and id in (3, 4) + """ + + try: + deleted_records = project.run_sql(check_deleted_sql, fetch="one") + assert deleted_records[0] == 2, ( + f"Expected 2 records with dbt_is_deleted=true for ids 3 and 4, " + f"got {deleted_records[0]}" + ) + except Exception as e: + # If dbt_is_deleted column doesn't exist, the test should fail + pytest.fail( + f"dbt_is_deleted column should exist with hard_deletes='new_record'. Error: {e}" + )