From 29756327ffa2e1de813e27418869314041bbaa45 Mon Sep 17 00:00:00 2001 From: Colin Date: Wed, 8 Oct 2025 17:02:20 -0700 Subject: [PATCH 1/4] Add support for Iceberg REST w Glue databases --- .../snowflake/catalogs/_iceberg_rest.py | 6 + .../macros/relations/table/create.sql | 76 ++++++++++-- .../test_iceberg_rest_catalog_integrations.py | 111 ++++++++++++++++++ 3 files changed, 186 insertions(+), 7 deletions(-) diff --git a/dbt-snowflake/src/dbt/adapters/snowflake/catalogs/_iceberg_rest.py b/dbt-snowflake/src/dbt/adapters/snowflake/catalogs/_iceberg_rest.py index acd98c1d1..65ba3a188 100644 --- a/dbt-snowflake/src/dbt/adapters/snowflake/catalogs/_iceberg_rest.py +++ b/dbt-snowflake/src/dbt/adapters/snowflake/catalogs/_iceberg_rest.py @@ -18,6 +18,7 @@ class IcebergRestCatalogRelation: catalog_name: Optional[str] = constants.DEFAULT_ICEBERG_REST_CATALOG.name table_format: Optional[str] = constants.ICEBERG_TABLE_FORMAT catalog_linked_database: Optional[str] = None + catalog_linked_database_type: Optional[str] = None # e.g., 'glue' for AWS Glue external_volume: Optional[str] = None file_format: Optional[str] = None target_file_size: Optional[str] = None @@ -33,6 +34,7 @@ class IcebergRestCatalogIntegration(CatalogIntegration): allows_writes = True auto_refresh = None catalog_linked_database: Optional[str] = None + catalog_linked_database_type: Optional[str] = None max_data_extension_time_in_days: Optional[int] = None target_file_size: Optional[str] = None @@ -43,6 +45,9 @@ def __init__(self, config: CatalogIntegrationConfig) -> None: self.external_volume: Optional[str] = config.external_volume if adapter_properties := config.adapter_properties: self.catalog_linked_database = adapter_properties.get("catalog_linked_database") + self.catalog_linked_database_type = adapter_properties.get( + "catalog_linked_database_type" + ) self.auto_refresh = adapter_properties.get("auto_refresh") self.target_file_size = adapter_properties.get("target_file_size") self.max_data_extension_time_in_days = adapter_properties.get( @@ -79,6 +84,7 @@ def build_relation(self, model: RelationConfig) -> IcebergRestCatalogRelation: catalog_name=self.name, external_volume=None, catalog_linked_database=self.catalog_linked_database, + catalog_linked_database_type=self.catalog_linked_database_type, auto_refresh=parse_model.auto_refresh(model) or self.auto_refresh, target_file_size=parse_model.target_file_size(model) or self.target_file_size, max_data_extension_time_in_days=parse_model.max_data_extension_time_in_days(model) diff --git a/dbt-snowflake/src/dbt/include/snowflake/macros/relations/table/create.sql b/dbt-snowflake/src/dbt/include/snowflake/macros/relations/table/create.sql index 7567923aa..7ebab72e8 100644 --- a/dbt-snowflake/src/dbt/include/snowflake/macros/relations/table/create.sql +++ b/dbt-snowflake/src/dbt/include/snowflake/macros/relations/table/create.sql @@ -10,7 +10,12 @@ {%- elif catalog_relation.catalog_type == 'BUILT_IN' -%} {{ snowflake__create_table_built_in_sql(relation, compiled_code) }} {%- elif catalog_relation.catalog_type == 'ICEBERG_REST' -%} - {{ snowflake__create_table_iceberg_rest_sql(relation, compiled_code) }} + {%- if catalog_relation.catalog_linked_database_type is defined and + catalog_relation.catalog_linked_database_type == 'glue' -%} + {{ snowflake__create_table_iceberg_rest_with_glue(relation, compiled_code, catalog_relation) }} + {%- else -%} + {{ snowflake__create_table_iceberg_rest_sql(relation, compiled_code) }} + {%- endif -%} {%- else -%} {% do exceptions.raise_compiler_error('Unexpected model config for: ' ~ relation) %} {%- endif -%} @@ -186,7 +191,7 @@ alter iceberg table {{ relation }} resume recluster; {% macro snowflake__create_table_iceberg_rest_sql(relation, compiled_code) -%} {#- - Implements CREATE ICEBERG TABLE ... CATALOG('catalog_name') (external REST catalog): + Implements CREATE ICEBERG TABLE for Iceberg REST catalogs with Catalog Linked Databases. https://docs.snowflake.com/en/sql-reference/sql/create-iceberg-table-rest Limitations: @@ -194,6 +199,10 @@ alter iceberg table {{ relation }} resume recluster; - Iceberg REST does not support CREATE OR REPLACE - Iceberg catalogs do not support table renaming operations - For existing tables, we must DROP the table first before creating the new one + + Note: Iceberg REST writes only work with Catalog Linked Databases (CLD). + Most CLDs support CTAS. For AWS Glue CLD (which doesn't support CTAS), + a 4-step process is handled at the materialization level (see table.sql). -#} {%- set catalog_relation = adapter.build_catalog_relation(config.model) -%} @@ -222,16 +231,12 @@ alter iceberg table {{ relation }} resume recluster; {% endif %} -{# Create the table (works for both new and replacement scenarios) #} +{#- All Iceberg REST writes use CLD and support CTAS (except Glue, handled in table.sql) -#} create iceberg table {{ relation }} {%- if contract_config.enforced %} {{ get_table_columns_and_constraints() }} {%- endif %} {{ optional('external_volume', catalog_relation.external_volume, "'") }} - {%- if not catalog_relation|attr('catalog_linked_database') -%} - catalog = '{{ catalog_relation.catalog_name }}' -- external REST catalog name - {{ optional('base_location', catalog_relation.base_location, "'") }} - {%- endif %} {{ optional('target_file_size', catalog_relation.target_file_size, "'") }} {{ optional('auto_refresh', catalog_relation.auto_refresh) }} {{ optional('max_data_extension_time_in_days', catalog_relation.max_data_extension_time_in_days)}} @@ -245,6 +250,63 @@ as ( {%- endmacro %} +{% macro snowflake__create_table_iceberg_rest_with_glue(relation, compiled_code, catalog_relation) -%} +{#- + Creates an Iceberg table for Catalog Linked Databases (e.g., AWS Glue) with explicit column definitions. + This is used when CTAS is not supported. + + This macro is specifically for CLD where we need to create the table with an explicit schema + because CTAS is not available. +-#} + +{# Step 1: Get the schema from the compiled query #} +{% set sql_columns = get_column_schema_from_query(compiled_code) %} + +{# Step 2: Create the iceberg table in the CLD with explicit column definitions #} + +{%- set copy_grants = config.get('copy_grants', default=false) -%} +{%- set row_access_policy = config.get('row_access_policy', default=none) -%} +{%- set table_tag = config.get('table_tag', default=none) -%} + +{%- set sql_header = config.get('sql_header', none) -%} +{{ sql_header if sql_header is not none }} + +{# Check if relation exists and drop if necessary (CLD doesn't support CREATE OR REPLACE) #} +{% set existing_relation = adapter.get_relation(database=relation.database, schema=relation.schema, identifier=relation.identifier) %} +{% if existing_relation %} + drop table if exists {{ existing_relation }}; +{% endif %} + +{# Create the table with explicit column definitions #} +create iceberg table {{ relation }} ( + {%- for column in sql_columns -%} + {% if column.data_type == "FIXED" %} + {%- set data_type = "INT" -%} + {% elif "character varying" in column.data_type %} + {%- set data_type = "STRING" -%} + {% else %} + {%- set data_type = column.data_type -%} + {% endif %} + {{ adapter.quote(column.name.lower()) }} {{ data_type }} + {%- if not loop.last %}, {% endif -%} + {% endfor -%} +) +{{ optional('external_volume', catalog_relation.external_volume, "'") }} +{{ optional('target_file_size', catalog_relation.target_file_size, "'") }} +{{ optional('auto_refresh', catalog_relation.auto_refresh) }} +{{ optional('max_data_extension_time_in_days', catalog_relation.max_data_extension_time_in_days)}} +{% if row_access_policy -%} with row access policy {{ row_access_policy }} {%- endif %} +{% if table_tag -%} with tag ({{ table_tag }}) {%- endif %} +{% if copy_grants -%} copy grants {%- endif %} +; + +{# Step 3: Insert data from the view (in regular DB) into the table (in CLD) #} +insert into {{ relation }} + {{ compiled_code }}; + +{%- endmacro %} + + {% macro py_write_table(compiled_code, target_relation) %} {%- set catalog_relation = adapter.build_catalog_relation(config.model) -%} diff --git a/dbt-snowflake/tests/functional/adapter/catalog_integrations/test_iceberg_rest_catalog_integrations.py b/dbt-snowflake/tests/functional/adapter/catalog_integrations/test_iceberg_rest_catalog_integrations.py index a057f077f..30c91dd29 100644 --- a/dbt-snowflake/tests/functional/adapter/catalog_integrations/test_iceberg_rest_catalog_integrations.py +++ b/dbt-snowflake/tests/functional/adapter/catalog_integrations/test_iceberg_rest_catalog_integrations.py @@ -51,6 +51,7 @@ class TestSnowflakeIcebergRestCatalogIntegration(BaseCatalogIntegrationValidation): + """Test for non-Glue Catalog Linked Databases (e.g., Polaris) that support CTAS""" @pytest.fixture(scope="class") def catalogs(self): @@ -68,6 +69,7 @@ def catalogs(self): "catalog_linked_database": os.getenv( "SNOWFLAKE_TEST_CATALOG_LINKED_DATABASE" ), + # No catalog_linked_database_type means standard CTAS is used "max_data_extension_time_in_days": 1, "target_file_size": "AUTO", "auto_refresh": "true", @@ -103,3 +105,112 @@ def test_basic_iceberg_rest_catalog_integration(self, project): result = run_dbt(["run"]) assert len(result) == 4 run_dbt(["run"]) + + +class TestSnowflakeIcebergRestGlueCatalogIntegration(BaseCatalogIntegrationValidation): + """Test for AWS Glue Catalog Linked Database that doesn't support CTAS + + Note: AWS Glue CLDs in Snowflake have several limitations: + 1. Require lowercase unquoted identifiers OR double-quoted identifiers + 2. May not support SHOW SCHEMAS and other metadata operations + 3. Require specific IAM permissions and AWS configuration + + This test is skipped by default. To test Glue CLD functionality: + 1. Set up a properly configured AWS Glue CLD in Snowflake + 2. Ensure proper IAM roles and permissions + 3. Manually create a schema in lowercase + 4. Remove the @pytest.mark.skip decorator and run the test + """ + + @pytest.fixture(scope="class") + def catalogs(self): + return { + "catalogs": [ + { + "name": "glue_iceberg_rest_catalog", + "active_write_integration": "glue_iceberg_rest_catalog_integration", + "write_integrations": [ + { + "name": "glue_iceberg_rest_catalog_integration", + "catalog_type": "iceberg_rest", + "table_format": "iceberg", + "adapter_properties": { + "catalog_linked_database": os.getenv( + "SNOWFLAKE_TEST_CATALOG_LINKED_DATABASE_GLUE" + ), + "catalog_linked_database_type": "glue", # Glue requires 4-step process + "max_data_extension_time_in_days": 1, + "target_file_size": "AUTO", + "auto_refresh": "true", + }, + } + ], + }, + ] + } + + @pytest.fixture(scope="class") + def project_config_update(self): + # Force quoting for Glue CLD compatibility + return { + "quoting": { + "database": False, + "schema": True, + "identifier": True, + } + } + + @pytest.fixture(scope="class", autouse=True) + def setup_glue_schema(self, project): + """Pre-create schema with quoted lowercase identifier for Glue CLD""" + adapter = project.adapter + glue_database = os.getenv("SNOWFLAKE_TEST_CATALOG_LINKED_DATABASE_GLUE") + schema_name = project.test_schema.lower() + + # Create schema with quoted identifier to preserve lowercase + create_schema_sql = f'CREATE SCHEMA IF NOT EXISTS {glue_database}."{schema_name}"' + adapter.execute(create_schema_sql, fetch=False) + + yield + + # Cleanup: drop schema after test + drop_schema_sql = f'DROP SCHEMA IF EXISTS {glue_database}."{schema_name}"' + try: + adapter.execute(drop_schema_sql, fetch=False) + except: + pass # Ignore cleanup errors + + # AWS Glue requires lowercase identifiers and alphanumeric characters only + @pytest.fixture(scope="class") + def unique_schema(self, request, prefix) -> str: + test_file = request.module.__name__ + # We only want the last part of the name + test_file = test_file.split(".")[-1] + unique_schema = f"{prefix}_{test_file}_glue" + # Remove underscores and convert to lowercase for Glue compatibility + return unique_schema.replace("_", "").lower() + + @pytest.fixture(scope="class") + def models(self): + # Use different catalog name for Glue models + return { + "models": { + "glue_basic_iceberg_table.sql": """ + {{ config(materialized='table', + catalog_name='glue_iceberg_rest_catalog') }} + select 1 as id, 'test' as name, 1.0 as price, '2021-01-01' as test_date + """, + "glue_iceberg_table_with_catalog_config.sql": """ + {{ config(materialized='table', catalog_name='glue_iceberg_rest_catalog', + target_file_size='16MB', max_data_extension_time_in_days=1, auto_refresh='true') }} + select 1 as id + """, + } + } + + def test_glue_iceberg_rest_catalog_integration(self, project): + """Test Glue CLD with 4-step table creation process""" + result = run_dbt(["run"]) + assert len(result) == 2 + # Run again to test update path + run_dbt(["run"]) From ce3ca62d318941fa3d758e0addfb63ef588b8041 Mon Sep 17 00:00:00 2001 From: Colin Date: Thu, 9 Oct 2025 12:05:48 -0700 Subject: [PATCH 2/4] add snapshot tests --- .../adapter/test_snapshot_hard_deletes.py | 180 ++++++++++++++++++ 1 file changed, 180 insertions(+) create mode 100644 dbt-spark/tests/functional/adapter/test_snapshot_hard_deletes.py diff --git a/dbt-spark/tests/functional/adapter/test_snapshot_hard_deletes.py b/dbt-spark/tests/functional/adapter/test_snapshot_hard_deletes.py new file mode 100644 index 000000000..209a9711f --- /dev/null +++ b/dbt-spark/tests/functional/adapter/test_snapshot_hard_deletes.py @@ -0,0 +1,180 @@ +import pytest + +from dbt.tests.adapter.simple_snapshot.new_record_check_mode import ( + BaseSnapshotNewRecordCheckMode, +) +from dbt.tests.adapter.simple_snapshot.new_record_dbt_valid_to_current import ( + BaseSnapshotNewRecordDbtValidToCurrent, +) +from dbt.tests.adapter.simple_snapshot.new_record_timestamp_mode import ( + BaseSnapshotNewRecordTimestampMode, +) +from dbt.tests.adapter.simple_snapshot.test_ephemeral_snapshot_hard_deletes import ( + BaseSnapshotEphemeralHardDeletes, +) + + +# Spark-compatible seed statements for timestamp mode tests +_spark_seed_new_record_mode_statements = [ + """create table {database}.{schema}.seed ( + id INTEGER, + first_name VARCHAR(50), + last_name VARCHAR(50), + email VARCHAR(50), + gender VARCHAR(50), + ip_address VARCHAR(20), + updated_at TIMESTAMP);""", + """create table {database}.{schema}.snapshot_expected ( + id INTEGER, + first_name VARCHAR(50), + last_name VARCHAR(50), + email VARCHAR(50), + gender VARCHAR(50), + ip_address VARCHAR(20), + + -- snapshotting fields + updated_at TIMESTAMP, + dbt_valid_from TIMESTAMP, + dbt_valid_to TIMESTAMP, + dbt_scd_id STRING, + dbt_updated_at TIMESTAMP, + dbt_is_deleted STRING + );""", + # seed inserts + """insert into {database}.{schema}.seed (id, first_name, last_name, email, gender, ip_address, updated_at) values +(1, 'Judith', 'Kennedy', '(not provided)', 'Female', '54.60.24.128', '2015-12-24 12:19:28'), +(2, 'Arthur', 'Kelly', '(not provided)', 'Male', '62.56.24.215', '2015-10-28 16:22:15'), +(3, 'Rachel', 'Moreno', 'rmoreno2@msu.edu', 'Female', '31.222.249.23', '2016-04-05 02:05:30'), +(4, 'Ralph', 'Turner', 'rturner3@hp.com', 'Male', '157.83.76.114', '2016-08-08 00:06:51'), +(5, 'Laura', 'Gonzales', 'lgonzales4@howstuffworks.com', 'Female', '30.54.105.168', '2016-09-01 08:25:38'), +(6, 'Katherine', 'Lopez', 'klopez5@yahoo.co.jp', 'Female', '169.138.46.89', '2016-08-30 18:52:11'), +(7, 'Jeremy', 'Hamilton', 'jhamilton6@mozilla.org', 'Male', '231.189.13.133', '2016-07-17 02:09:46'), +(8, 'Heather', 'Rose', 'hrose7@goodreads.com', 'Female', '87.165.201.65', '2015-12-29 22:03:56'), +(9, 'Gregory', 'Kelly', 'gkelly8@trellian.com', 'Male', '154.209.99.7', '2016-03-24 21:18:16'), +(10, 'Rachel', 'Lopez', 'rlopez9@themeforest.net', 'Female', '237.165.82.71', '2016-08-20 15:44:49'), +(11, 'Donna', 'Welch', 'dwelcha@shutterfly.com', 'Female', '103.33.110.138', '2016-02-27 01:41:48'), +(12, 'Russell', 'Lawrence', 'rlawrenceb@qq.com', 'Male', '189.115.73.4', '2016-06-11 03:07:09'), +(13, 'Michelle', 'Montgomery', 'mmontgomeryc@scientificamerican.com', 'Female', '243.220.95.82', '2016-06-18 16:27:19'), +(14, 'Walter', 'Castillo', 'wcastillod@pagesperso-orange.fr', 'Male', '71.159.238.196', '2016-10-06 01:55:44'), +(15, 'Robin', 'Mills', 'rmillse@vkontakte.ru', 'Female', '172.190.5.50', '2016-10-31 11:41:21'), +(16, 'Raymond', 'Holmes', 'rholmesf@usgs.gov', 'Male', '148.153.166.95', '2016-10-03 08:16:38'), +(17, 'Gary', 'Bishop', 'gbishopg@plala.or.jp', 'Male', '161.108.182.13', '2016-08-29 19:35:20'), +(18, 'Anna', 'Riley', 'arileyh@nasa.gov', 'Female', '253.31.108.22', '2015-12-11 04:34:27'), +(19, 'Sarah', 'Knight', 'sknighti@foxnews.com', 'Female', '222.220.3.177', '2016-09-26 00:49:06'), +(20, 'Phyllis', 'Fox', null, 'Female', '163.191.232.95', '2016-08-21 10:35:19');""", + # populate snapshot table + """insert into {database}.{schema}.snapshot_expected ( + id, + first_name, + last_name, + email, + gender, + ip_address, + updated_at, + dbt_valid_from, + dbt_valid_to, + dbt_updated_at, + dbt_scd_id, + dbt_is_deleted +) +select + id, + first_name, + last_name, + email, + gender, + ip_address, + updated_at, + -- fields added by snapshotting + updated_at as dbt_valid_from, + CAST(null AS TIMESTAMP) as dbt_valid_to, + updated_at as dbt_updated_at, + md5(CAST(id AS STRING) || '-' || first_name || '|' || CAST(updated_at AS STRING)) as dbt_scd_id, + 'False' as dbt_is_deleted +from {database}.{schema}.seed;""", +] + +_spark_invalidate_sql_statements = [ + """-- Update records 11 - 21. Change email and updated_at field. +update {schema}.seed set + updated_at = updated_at + INTERVAL 1 HOURS, + email = case when id = 20 then 'pfoxj@creativecommons.org' else concat('new_', email) end +where id >= 10 and id <= 20;""", + """-- Update the expected snapshot data to reflect the changes we expect to the snapshot on the next run +update {schema}.snapshot_expected set + dbt_valid_to = updated_at + INTERVAL 1 HOURS +where id >= 10 and id <= 20; +""", +] + +_spark_update_sql = """ +-- insert v2 of the 11 - 21 records + +insert into {database}.{schema}.snapshot_expected ( + id, + first_name, + last_name, + email, + gender, + ip_address, + updated_at, + dbt_valid_from, + dbt_valid_to, + dbt_updated_at, + dbt_scd_id, + dbt_is_deleted +) + +select + id, + first_name, + last_name, + email, + gender, + ip_address, + updated_at, + -- fields added by snapshotting + updated_at as dbt_valid_from, + CAST(null AS TIMESTAMP) as dbt_valid_to, + updated_at as dbt_updated_at, + md5(CAST(id AS STRING) || '-' || first_name || '|' || CAST(updated_at AS STRING)) as dbt_scd_id, + 'False' as dbt_is_deleted +from {database}.{schema}.seed +where id >= 10 and id <= 20; +""" + + +class TestSnapshotNewRecordTimestampMode(BaseSnapshotNewRecordTimestampMode): + @pytest.fixture(scope="class") + def seed_new_record_mode_statements(self): + return _spark_seed_new_record_mode_statements + + @pytest.fixture(scope="class") + def invalidate_sql_statements(self): + return _spark_invalidate_sql_statements + + @pytest.fixture(scope="class") + def update_sql(self): + return _spark_update_sql + + +class TestSnapshotNewRecordCheckMode(BaseSnapshotNewRecordCheckMode): + @pytest.fixture(scope="class") + def seed_new_record_mode_statements(self): + return _spark_seed_new_record_mode_statements + + @pytest.fixture(scope="class") + def invalidate_sql_statements(self): + return _spark_invalidate_sql_statements + + @pytest.fixture(scope="class") + def update_sql(self): + return _spark_update_sql + + +class TestSnapshotNewRecordDbtValidToCurrent(BaseSnapshotNewRecordDbtValidToCurrent): + pass + + +class TestSnapshotEphemeralHardDeletes(BaseSnapshotEphemeralHardDeletes): + pass From 46273bdf1ddedf42b08b1dde43b086c43d2e5364 Mon Sep 17 00:00:00 2001 From: Colin Date: Thu, 9 Oct 2025 12:07:56 -0700 Subject: [PATCH 3/4] Revert "Add support for Iceberg REST w Glue databases" This reverts commit 29756327ffa2e1de813e27418869314041bbaa45. --- .../snowflake/catalogs/_iceberg_rest.py | 6 - .../macros/relations/table/create.sql | 76 ++---------- .../test_iceberg_rest_catalog_integrations.py | 111 ------------------ 3 files changed, 7 insertions(+), 186 deletions(-) diff --git a/dbt-snowflake/src/dbt/adapters/snowflake/catalogs/_iceberg_rest.py b/dbt-snowflake/src/dbt/adapters/snowflake/catalogs/_iceberg_rest.py index 65ba3a188..acd98c1d1 100644 --- a/dbt-snowflake/src/dbt/adapters/snowflake/catalogs/_iceberg_rest.py +++ b/dbt-snowflake/src/dbt/adapters/snowflake/catalogs/_iceberg_rest.py @@ -18,7 +18,6 @@ class IcebergRestCatalogRelation: catalog_name: Optional[str] = constants.DEFAULT_ICEBERG_REST_CATALOG.name table_format: Optional[str] = constants.ICEBERG_TABLE_FORMAT catalog_linked_database: Optional[str] = None - catalog_linked_database_type: Optional[str] = None # e.g., 'glue' for AWS Glue external_volume: Optional[str] = None file_format: Optional[str] = None target_file_size: Optional[str] = None @@ -34,7 +33,6 @@ class IcebergRestCatalogIntegration(CatalogIntegration): allows_writes = True auto_refresh = None catalog_linked_database: Optional[str] = None - catalog_linked_database_type: Optional[str] = None max_data_extension_time_in_days: Optional[int] = None target_file_size: Optional[str] = None @@ -45,9 +43,6 @@ def __init__(self, config: CatalogIntegrationConfig) -> None: self.external_volume: Optional[str] = config.external_volume if adapter_properties := config.adapter_properties: self.catalog_linked_database = adapter_properties.get("catalog_linked_database") - self.catalog_linked_database_type = adapter_properties.get( - "catalog_linked_database_type" - ) self.auto_refresh = adapter_properties.get("auto_refresh") self.target_file_size = adapter_properties.get("target_file_size") self.max_data_extension_time_in_days = adapter_properties.get( @@ -84,7 +79,6 @@ def build_relation(self, model: RelationConfig) -> IcebergRestCatalogRelation: catalog_name=self.name, external_volume=None, catalog_linked_database=self.catalog_linked_database, - catalog_linked_database_type=self.catalog_linked_database_type, auto_refresh=parse_model.auto_refresh(model) or self.auto_refresh, target_file_size=parse_model.target_file_size(model) or self.target_file_size, max_data_extension_time_in_days=parse_model.max_data_extension_time_in_days(model) diff --git a/dbt-snowflake/src/dbt/include/snowflake/macros/relations/table/create.sql b/dbt-snowflake/src/dbt/include/snowflake/macros/relations/table/create.sql index 7ebab72e8..7567923aa 100644 --- a/dbt-snowflake/src/dbt/include/snowflake/macros/relations/table/create.sql +++ b/dbt-snowflake/src/dbt/include/snowflake/macros/relations/table/create.sql @@ -10,12 +10,7 @@ {%- elif catalog_relation.catalog_type == 'BUILT_IN' -%} {{ snowflake__create_table_built_in_sql(relation, compiled_code) }} {%- elif catalog_relation.catalog_type == 'ICEBERG_REST' -%} - {%- if catalog_relation.catalog_linked_database_type is defined and - catalog_relation.catalog_linked_database_type == 'glue' -%} - {{ snowflake__create_table_iceberg_rest_with_glue(relation, compiled_code, catalog_relation) }} - {%- else -%} - {{ snowflake__create_table_iceberg_rest_sql(relation, compiled_code) }} - {%- endif -%} + {{ snowflake__create_table_iceberg_rest_sql(relation, compiled_code) }} {%- else -%} {% do exceptions.raise_compiler_error('Unexpected model config for: ' ~ relation) %} {%- endif -%} @@ -191,7 +186,7 @@ alter iceberg table {{ relation }} resume recluster; {% macro snowflake__create_table_iceberg_rest_sql(relation, compiled_code) -%} {#- - Implements CREATE ICEBERG TABLE for Iceberg REST catalogs with Catalog Linked Databases. + Implements CREATE ICEBERG TABLE ... CATALOG('catalog_name') (external REST catalog): https://docs.snowflake.com/en/sql-reference/sql/create-iceberg-table-rest Limitations: @@ -199,10 +194,6 @@ alter iceberg table {{ relation }} resume recluster; - Iceberg REST does not support CREATE OR REPLACE - Iceberg catalogs do not support table renaming operations - For existing tables, we must DROP the table first before creating the new one - - Note: Iceberg REST writes only work with Catalog Linked Databases (CLD). - Most CLDs support CTAS. For AWS Glue CLD (which doesn't support CTAS), - a 4-step process is handled at the materialization level (see table.sql). -#} {%- set catalog_relation = adapter.build_catalog_relation(config.model) -%} @@ -231,12 +222,16 @@ alter iceberg table {{ relation }} resume recluster; {% endif %} -{#- All Iceberg REST writes use CLD and support CTAS (except Glue, handled in table.sql) -#} +{# Create the table (works for both new and replacement scenarios) #} create iceberg table {{ relation }} {%- if contract_config.enforced %} {{ get_table_columns_and_constraints() }} {%- endif %} {{ optional('external_volume', catalog_relation.external_volume, "'") }} + {%- if not catalog_relation|attr('catalog_linked_database') -%} + catalog = '{{ catalog_relation.catalog_name }}' -- external REST catalog name + {{ optional('base_location', catalog_relation.base_location, "'") }} + {%- endif %} {{ optional('target_file_size', catalog_relation.target_file_size, "'") }} {{ optional('auto_refresh', catalog_relation.auto_refresh) }} {{ optional('max_data_extension_time_in_days', catalog_relation.max_data_extension_time_in_days)}} @@ -250,63 +245,6 @@ as ( {%- endmacro %} -{% macro snowflake__create_table_iceberg_rest_with_glue(relation, compiled_code, catalog_relation) -%} -{#- - Creates an Iceberg table for Catalog Linked Databases (e.g., AWS Glue) with explicit column definitions. - This is used when CTAS is not supported. - - This macro is specifically for CLD where we need to create the table with an explicit schema - because CTAS is not available. --#} - -{# Step 1: Get the schema from the compiled query #} -{% set sql_columns = get_column_schema_from_query(compiled_code) %} - -{# Step 2: Create the iceberg table in the CLD with explicit column definitions #} - -{%- set copy_grants = config.get('copy_grants', default=false) -%} -{%- set row_access_policy = config.get('row_access_policy', default=none) -%} -{%- set table_tag = config.get('table_tag', default=none) -%} - -{%- set sql_header = config.get('sql_header', none) -%} -{{ sql_header if sql_header is not none }} - -{# Check if relation exists and drop if necessary (CLD doesn't support CREATE OR REPLACE) #} -{% set existing_relation = adapter.get_relation(database=relation.database, schema=relation.schema, identifier=relation.identifier) %} -{% if existing_relation %} - drop table if exists {{ existing_relation }}; -{% endif %} - -{# Create the table with explicit column definitions #} -create iceberg table {{ relation }} ( - {%- for column in sql_columns -%} - {% if column.data_type == "FIXED" %} - {%- set data_type = "INT" -%} - {% elif "character varying" in column.data_type %} - {%- set data_type = "STRING" -%} - {% else %} - {%- set data_type = column.data_type -%} - {% endif %} - {{ adapter.quote(column.name.lower()) }} {{ data_type }} - {%- if not loop.last %}, {% endif -%} - {% endfor -%} -) -{{ optional('external_volume', catalog_relation.external_volume, "'") }} -{{ optional('target_file_size', catalog_relation.target_file_size, "'") }} -{{ optional('auto_refresh', catalog_relation.auto_refresh) }} -{{ optional('max_data_extension_time_in_days', catalog_relation.max_data_extension_time_in_days)}} -{% if row_access_policy -%} with row access policy {{ row_access_policy }} {%- endif %} -{% if table_tag -%} with tag ({{ table_tag }}) {%- endif %} -{% if copy_grants -%} copy grants {%- endif %} -; - -{# Step 3: Insert data from the view (in regular DB) into the table (in CLD) #} -insert into {{ relation }} - {{ compiled_code }}; - -{%- endmacro %} - - {% macro py_write_table(compiled_code, target_relation) %} {%- set catalog_relation = adapter.build_catalog_relation(config.model) -%} diff --git a/dbt-snowflake/tests/functional/adapter/catalog_integrations/test_iceberg_rest_catalog_integrations.py b/dbt-snowflake/tests/functional/adapter/catalog_integrations/test_iceberg_rest_catalog_integrations.py index 30c91dd29..a057f077f 100644 --- a/dbt-snowflake/tests/functional/adapter/catalog_integrations/test_iceberg_rest_catalog_integrations.py +++ b/dbt-snowflake/tests/functional/adapter/catalog_integrations/test_iceberg_rest_catalog_integrations.py @@ -51,7 +51,6 @@ class TestSnowflakeIcebergRestCatalogIntegration(BaseCatalogIntegrationValidation): - """Test for non-Glue Catalog Linked Databases (e.g., Polaris) that support CTAS""" @pytest.fixture(scope="class") def catalogs(self): @@ -69,7 +68,6 @@ def catalogs(self): "catalog_linked_database": os.getenv( "SNOWFLAKE_TEST_CATALOG_LINKED_DATABASE" ), - # No catalog_linked_database_type means standard CTAS is used "max_data_extension_time_in_days": 1, "target_file_size": "AUTO", "auto_refresh": "true", @@ -105,112 +103,3 @@ def test_basic_iceberg_rest_catalog_integration(self, project): result = run_dbt(["run"]) assert len(result) == 4 run_dbt(["run"]) - - -class TestSnowflakeIcebergRestGlueCatalogIntegration(BaseCatalogIntegrationValidation): - """Test for AWS Glue Catalog Linked Database that doesn't support CTAS - - Note: AWS Glue CLDs in Snowflake have several limitations: - 1. Require lowercase unquoted identifiers OR double-quoted identifiers - 2. May not support SHOW SCHEMAS and other metadata operations - 3. Require specific IAM permissions and AWS configuration - - This test is skipped by default. To test Glue CLD functionality: - 1. Set up a properly configured AWS Glue CLD in Snowflake - 2. Ensure proper IAM roles and permissions - 3. Manually create a schema in lowercase - 4. Remove the @pytest.mark.skip decorator and run the test - """ - - @pytest.fixture(scope="class") - def catalogs(self): - return { - "catalogs": [ - { - "name": "glue_iceberg_rest_catalog", - "active_write_integration": "glue_iceberg_rest_catalog_integration", - "write_integrations": [ - { - "name": "glue_iceberg_rest_catalog_integration", - "catalog_type": "iceberg_rest", - "table_format": "iceberg", - "adapter_properties": { - "catalog_linked_database": os.getenv( - "SNOWFLAKE_TEST_CATALOG_LINKED_DATABASE_GLUE" - ), - "catalog_linked_database_type": "glue", # Glue requires 4-step process - "max_data_extension_time_in_days": 1, - "target_file_size": "AUTO", - "auto_refresh": "true", - }, - } - ], - }, - ] - } - - @pytest.fixture(scope="class") - def project_config_update(self): - # Force quoting for Glue CLD compatibility - return { - "quoting": { - "database": False, - "schema": True, - "identifier": True, - } - } - - @pytest.fixture(scope="class", autouse=True) - def setup_glue_schema(self, project): - """Pre-create schema with quoted lowercase identifier for Glue CLD""" - adapter = project.adapter - glue_database = os.getenv("SNOWFLAKE_TEST_CATALOG_LINKED_DATABASE_GLUE") - schema_name = project.test_schema.lower() - - # Create schema with quoted identifier to preserve lowercase - create_schema_sql = f'CREATE SCHEMA IF NOT EXISTS {glue_database}."{schema_name}"' - adapter.execute(create_schema_sql, fetch=False) - - yield - - # Cleanup: drop schema after test - drop_schema_sql = f'DROP SCHEMA IF EXISTS {glue_database}."{schema_name}"' - try: - adapter.execute(drop_schema_sql, fetch=False) - except: - pass # Ignore cleanup errors - - # AWS Glue requires lowercase identifiers and alphanumeric characters only - @pytest.fixture(scope="class") - def unique_schema(self, request, prefix) -> str: - test_file = request.module.__name__ - # We only want the last part of the name - test_file = test_file.split(".")[-1] - unique_schema = f"{prefix}_{test_file}_glue" - # Remove underscores and convert to lowercase for Glue compatibility - return unique_schema.replace("_", "").lower() - - @pytest.fixture(scope="class") - def models(self): - # Use different catalog name for Glue models - return { - "models": { - "glue_basic_iceberg_table.sql": """ - {{ config(materialized='table', - catalog_name='glue_iceberg_rest_catalog') }} - select 1 as id, 'test' as name, 1.0 as price, '2021-01-01' as test_date - """, - "glue_iceberg_table_with_catalog_config.sql": """ - {{ config(materialized='table', catalog_name='glue_iceberg_rest_catalog', - target_file_size='16MB', max_data_extension_time_in_days=1, auto_refresh='true') }} - select 1 as id - """, - } - } - - def test_glue_iceberg_rest_catalog_integration(self, project): - """Test Glue CLD with 4-step table creation process""" - result = run_dbt(["run"]) - assert len(result) == 2 - # Run again to test update path - run_dbt(["run"]) From 54e52c39902d628aae16f6f5bda03c24d4050c4b Mon Sep 17 00:00:00 2001 From: Colin Date: Tue, 14 Oct 2025 10:01:51 -0700 Subject: [PATCH 4/4] skip session and spark --- .../adapter/test_snapshot_hard_deletes.py | 253 ++++++++++++++++-- 1 file changed, 224 insertions(+), 29 deletions(-) diff --git a/dbt-spark/tests/functional/adapter/test_snapshot_hard_deletes.py b/dbt-spark/tests/functional/adapter/test_snapshot_hard_deletes.py index 209a9711f..fb8409cc6 100644 --- a/dbt-spark/tests/functional/adapter/test_snapshot_hard_deletes.py +++ b/dbt-spark/tests/functional/adapter/test_snapshot_hard_deletes.py @@ -16,7 +16,7 @@ # Spark-compatible seed statements for timestamp mode tests _spark_seed_new_record_mode_statements = [ - """create table {database}.{schema}.seed ( + """create table {schema}.seed ( id INTEGER, first_name VARCHAR(50), last_name VARCHAR(50), @@ -24,7 +24,7 @@ gender VARCHAR(50), ip_address VARCHAR(20), updated_at TIMESTAMP);""", - """create table {database}.{schema}.snapshot_expected ( + """create table {schema}.snapshot_expected ( id INTEGER, first_name VARCHAR(50), last_name VARCHAR(50), @@ -41,29 +41,29 @@ dbt_is_deleted STRING );""", # seed inserts - """insert into {database}.{schema}.seed (id, first_name, last_name, email, gender, ip_address, updated_at) values -(1, 'Judith', 'Kennedy', '(not provided)', 'Female', '54.60.24.128', '2015-12-24 12:19:28'), -(2, 'Arthur', 'Kelly', '(not provided)', 'Male', '62.56.24.215', '2015-10-28 16:22:15'), -(3, 'Rachel', 'Moreno', 'rmoreno2@msu.edu', 'Female', '31.222.249.23', '2016-04-05 02:05:30'), -(4, 'Ralph', 'Turner', 'rturner3@hp.com', 'Male', '157.83.76.114', '2016-08-08 00:06:51'), -(5, 'Laura', 'Gonzales', 'lgonzales4@howstuffworks.com', 'Female', '30.54.105.168', '2016-09-01 08:25:38'), -(6, 'Katherine', 'Lopez', 'klopez5@yahoo.co.jp', 'Female', '169.138.46.89', '2016-08-30 18:52:11'), -(7, 'Jeremy', 'Hamilton', 'jhamilton6@mozilla.org', 'Male', '231.189.13.133', '2016-07-17 02:09:46'), -(8, 'Heather', 'Rose', 'hrose7@goodreads.com', 'Female', '87.165.201.65', '2015-12-29 22:03:56'), -(9, 'Gregory', 'Kelly', 'gkelly8@trellian.com', 'Male', '154.209.99.7', '2016-03-24 21:18:16'), -(10, 'Rachel', 'Lopez', 'rlopez9@themeforest.net', 'Female', '237.165.82.71', '2016-08-20 15:44:49'), -(11, 'Donna', 'Welch', 'dwelcha@shutterfly.com', 'Female', '103.33.110.138', '2016-02-27 01:41:48'), -(12, 'Russell', 'Lawrence', 'rlawrenceb@qq.com', 'Male', '189.115.73.4', '2016-06-11 03:07:09'), -(13, 'Michelle', 'Montgomery', 'mmontgomeryc@scientificamerican.com', 'Female', '243.220.95.82', '2016-06-18 16:27:19'), -(14, 'Walter', 'Castillo', 'wcastillod@pagesperso-orange.fr', 'Male', '71.159.238.196', '2016-10-06 01:55:44'), -(15, 'Robin', 'Mills', 'rmillse@vkontakte.ru', 'Female', '172.190.5.50', '2016-10-31 11:41:21'), -(16, 'Raymond', 'Holmes', 'rholmesf@usgs.gov', 'Male', '148.153.166.95', '2016-10-03 08:16:38'), -(17, 'Gary', 'Bishop', 'gbishopg@plala.or.jp', 'Male', '161.108.182.13', '2016-08-29 19:35:20'), -(18, 'Anna', 'Riley', 'arileyh@nasa.gov', 'Female', '253.31.108.22', '2015-12-11 04:34:27'), -(19, 'Sarah', 'Knight', 'sknighti@foxnews.com', 'Female', '222.220.3.177', '2016-09-26 00:49:06'), -(20, 'Phyllis', 'Fox', null, 'Female', '163.191.232.95', '2016-08-21 10:35:19');""", + """insert into {schema}.seed (id, first_name, last_name, email, gender, ip_address, updated_at) values +(1, 'Judith', 'Kennedy', '(not provided)', 'Female', '54.60.24.128', TIMESTAMP('2015-12-24 12:19:28')), +(2, 'Arthur', 'Kelly', '(not provided)', 'Male', '62.56.24.215', TIMESTAMP('2015-10-28 16:22:15')), +(3, 'Rachel', 'Moreno', 'rmoreno2@msu.edu', 'Female', '31.222.249.23', TIMESTAMP('2016-04-05 02:05:30')), +(4, 'Ralph', 'Turner', 'rturner3@hp.com', 'Male', '157.83.76.114', TIMESTAMP('2016-08-08 00:06:51')), +(5, 'Laura', 'Gonzales', 'lgonzales4@howstuffworks.com', 'Female', '30.54.105.168', TIMESTAMP('2016-09-01 08:25:38')), +(6, 'Katherine', 'Lopez', 'klopez5@yahoo.co.jp', 'Female', '169.138.46.89', TIMESTAMP('2016-08-30 18:52:11')), +(7, 'Jeremy', 'Hamilton', 'jhamilton6@mozilla.org', 'Male', '231.189.13.133', TIMESTAMP('2016-07-17 02:09:46')), +(8, 'Heather', 'Rose', 'hrose7@goodreads.com', 'Female', '87.165.201.65', TIMESTAMP('2015-12-29 22:03:56')), +(9, 'Gregory', 'Kelly', 'gkelly8@trellian.com', 'Male', '154.209.99.7', TIMESTAMP('2016-03-24 21:18:16')), +(10, 'Rachel', 'Lopez', 'rlopez9@themeforest.net', 'Female', '237.165.82.71', TIMESTAMP('2016-08-20 15:44:49')), +(11, 'Donna', 'Welch', 'dwelcha@shutterfly.com', 'Female', '103.33.110.138', TIMESTAMP('2016-02-27 01:41:48')), +(12, 'Russell', 'Lawrence', 'rlawrenceb@qq.com', 'Male', '189.115.73.4', TIMESTAMP('2016-06-11 03:07:09')), +(13, 'Michelle', 'Montgomery', 'mmontgomeryc@scientificamerican.com', 'Female', '243.220.95.82', TIMESTAMP('2016-06-18 16:27:19')), +(14, 'Walter', 'Castillo', 'wcastillod@pagesperso-orange.fr', 'Male', '71.159.238.196', TIMESTAMP('2016-10-06 01:55:44')), +(15, 'Robin', 'Mills', 'rmillse@vkontakte.ru', 'Female', '172.190.5.50', TIMESTAMP('2016-10-31 11:41:21')), +(16, 'Raymond', 'Holmes', 'rholmesf@usgs.gov', 'Male', '148.153.166.95', TIMESTAMP('2016-10-03 08:16:38')), +(17, 'Gary', 'Bishop', 'gbishopg@plala.or.jp', 'Male', '161.108.182.13', TIMESTAMP('2016-08-29 19:35:20')), +(18, 'Anna', 'Riley', 'arileyh@nasa.gov', 'Female', '253.31.108.22', TIMESTAMP('2015-12-11 04:34:27')), +(19, 'Sarah', 'Knight', 'sknighti@foxnews.com', 'Female', '222.220.3.177', TIMESTAMP('2016-09-26 00:49:06')), +(20, 'Phyllis', 'Fox', null, 'Female', '163.191.232.95', TIMESTAMP('2016-08-21 10:35:19'));""", # populate snapshot table - """insert into {database}.{schema}.snapshot_expected ( + """insert into {schema}.snapshot_expected ( id, first_name, last_name, @@ -91,7 +91,7 @@ updated_at as dbt_updated_at, md5(CAST(id AS STRING) || '-' || first_name || '|' || CAST(updated_at AS STRING)) as dbt_scd_id, 'False' as dbt_is_deleted -from {database}.{schema}.seed;""", +from {schema}.seed;""", ] _spark_invalidate_sql_statements = [ @@ -110,7 +110,7 @@ _spark_update_sql = """ -- insert v2 of the 11 - 21 records -insert into {database}.{schema}.snapshot_expected ( +insert into {schema}.snapshot_expected ( id, first_name, last_name, @@ -139,12 +139,152 @@ updated_at as dbt_updated_at, md5(CAST(id AS STRING) || '-' || first_name || '|' || CAST(updated_at AS STRING)) as dbt_scd_id, 'False' as dbt_is_deleted -from {database}.{schema}.seed +from {schema}.seed where id >= 10 and id <= 20; """ +_spark_reinsert_sql = """ +insert into {schema}.seed (id, first_name, last_name, email, gender, ip_address, updated_at) values +(1, 'Judith', 'Kennedy', '(not provided)', 'Female', '54.60.24.128', TIMESTAMP('2200-01-01 12:00:00')); +""" + +# Spark-specific snapshots.yml configurations +_spark_snapshots_yml_timestamp = """ +snapshots: + - name: snapshot_actual + config: + strategy: timestamp + updated_at: updated_at + hard_deletes: new_record + file_format: delta +""" + +_spark_snapshots_yml_check = """ +snapshots: + - name: snapshot_actual + config: + strategy: check + check_cols: all + hard_deletes: new_record + file_format: delta +""" + +_spark_snapshots_yml_dbt_valid_to_current = """ +snapshots: + - name: snapshot_actual + config: + unique_key: id + strategy: check + check_cols: all + hard_deletes: new_record + dbt_valid_to_current: "date('9999-12-31')" + file_format: delta +""" + +_ref_snapshot_sql = """ +select * from {{ ref('snapshot_actual') }} +""" + +# Spark-specific configurations for ephemeral hard deletes test +_spark_ephemeral_source_create_sql = """ +create table {schema}.src_customers ( + id INTEGER, + first_name VARCHAR(50), + last_name VARCHAR(50), + email VARCHAR(50), + updated_at TIMESTAMP +); +""" + +_spark_ephemeral_source_insert_sql = """ +insert into {schema}.src_customers (id, first_name, last_name, email, updated_at) values +(1, 'John', 'Doe', 'john.doe@example.com', TIMESTAMP('2023-01-01 10:00:00')), +(2, 'Jane', 'Smith', 'jane.smith@example.com', TIMESTAMP('2023-01-02 11:00:00')), +(3, 'Bob', 'Johnson', 'bob.johnson@example.com', TIMESTAMP('2023-01-03 12:00:00')); +""" + +_spark_ephemeral_source_alter_sql = """ +alter table {schema}.src_customers add column dummy_column VARCHAR(50) default 'dummy_value'; +""" + +_spark_ephemeral_snapshots_yml = """ +snapshots: + - name: snapshot_customers + relation: ref('ephemeral_customers') + config: + unique_key: id + strategy: check + check_cols: all + hard_deletes: new_record + file_format: delta +""" + +_ephemeral_customers_sql = """ +{{ config(materialized='ephemeral') }} + +select * from {{ source('test_source', 'src_customers') }} +""" + +_sources_yml = """ +version: 2 + +sources: + - name: test_source + schema: "{{ target.schema }}" + tables: + - name: src_customers +""" + +_ref_snapshot_customers_sql = """ +select * from {{ ref('snapshot_customers') }} +""" + +# Spark-compatible snapshot SQL (no database reference) +_spark_snapshot_actual_sql = """ +{% snapshot snapshot_actual %} + + {{ + config( + unique_key='id || ' ~ "'-'" ~ ' || first_name', + ) + }} + + select * from {{target.schema}}.seed + +{% endsnapshot %} +""" +# Spark-compatible snapshot SQL for dbt_valid_to_current test +_spark_snapshot_actual_simple_sql = """ +{% snapshot snapshot_actual %} + select * from {{target.schema}}.seed +{% endsnapshot %} +""" + +# Spark-compatible seed statements for dbt_valid_to_current test +_spark_seed_dbt_valid_to_current_statements = [ + "create table {schema}.seed (id INTEGER, first_name VARCHAR(50));", + "insert into {schema}.seed (id, first_name) values (1, 'Judith'), (2, 'Arthur');", +] + +_spark_delete_sql = """ +delete from {schema}.seed where id = 1 +""" + + +@pytest.mark.skip_profile("apache_spark", "spark_session") class TestSnapshotNewRecordTimestampMode(BaseSnapshotNewRecordTimestampMode): + @pytest.fixture(scope="class") + def snapshots(self): + return {"snapshot.sql": _spark_snapshot_actual_sql} + + @pytest.fixture(scope="class") + def models(self): + return { + "snapshots.yml": _spark_snapshots_yml_timestamp, + "ref_snapshot.sql": _ref_snapshot_sql, + } + @pytest.fixture(scope="class") def seed_new_record_mode_statements(self): return _spark_seed_new_record_mode_statements @@ -157,8 +297,24 @@ def invalidate_sql_statements(self): def update_sql(self): return _spark_update_sql + @pytest.fixture(scope="class") + def reinsert_sql(self): + return _spark_reinsert_sql + +@pytest.mark.skip_profile("apache_spark", "spark_session") class TestSnapshotNewRecordCheckMode(BaseSnapshotNewRecordCheckMode): + @pytest.fixture(scope="class") + def snapshots(self): + return {"snapshot.sql": _spark_snapshot_actual_sql} + + @pytest.fixture(scope="class") + def models(self): + return { + "snapshots.yml": _spark_snapshots_yml_check, + "ref_snapshot.sql": _ref_snapshot_sql, + } + @pytest.fixture(scope="class") def seed_new_record_mode_statements(self): return _spark_seed_new_record_mode_statements @@ -171,10 +327,49 @@ def invalidate_sql_statements(self): def update_sql(self): return _spark_update_sql + @pytest.fixture(scope="class") + def reinsert_sql(self): + return _spark_reinsert_sql + +@pytest.mark.skip_profile("apache_spark", "spark_session") class TestSnapshotNewRecordDbtValidToCurrent(BaseSnapshotNewRecordDbtValidToCurrent): - pass + @pytest.fixture(scope="class") + def snapshots(self): + return {"snapshot.sql": _spark_snapshot_actual_simple_sql} + + @pytest.fixture(scope="class") + def models(self): + return {"snapshots.yml": _spark_snapshots_yml_dbt_valid_to_current} + @pytest.fixture(scope="class") + def seed_new_record_mode_statements(self): + return _spark_seed_dbt_valid_to_current_statements + + @pytest.fixture(scope="class") + def delete_sql(self): + return _spark_delete_sql + +@pytest.mark.skip_profile("apache_spark", "spark_session") class TestSnapshotEphemeralHardDeletes(BaseSnapshotEphemeralHardDeletes): - pass + @pytest.fixture(scope="class") + def models(self): + return { + "_sources.yml": _sources_yml, + "ephemeral_customers.sql": _ephemeral_customers_sql, + "snapshots.yml": _spark_ephemeral_snapshots_yml, + "ref_snapshot.sql": _ref_snapshot_customers_sql, + } + + @pytest.fixture(scope="class") + def source_create_sql(self): + return _spark_ephemeral_source_create_sql + + @pytest.fixture(scope="class") + def source_insert_sql(self): + return _spark_ephemeral_source_insert_sql + + @pytest.fixture(scope="class") + def source_alter_sql(self): + return _spark_ephemeral_source_alter_sql