Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
kind: Fixes
body: Fix dbt clone for Iceberg tables by using CREATE ICEBERG TABLE syntax when source is an Iceberg table
time: 2026-03-17T10:04:03.215641-04:00
custom:
Author: jcolvin
Issue: "1767"
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,23 @@
{% endmacro %}

{% macro snowflake__create_or_replace_clone(this_relation, defer_relation) %}
{%- set is_iceberg = snowflake__is_iceberg_table(defer_relation) -%}
create or replace
{{ "transient" if config.get("transient", true) }}
{% if is_iceberg -%}
iceberg
{%- else -%}
{{ "transient" if config.get("transient", true) }}
{%- endif %}
table {{ this_relation }}
clone {{ defer_relation }}
{{ "copy grants" if config.get("copy_grants", false) }}
{% endmacro %}

{% macro snowflake__is_iceberg_table(relation) %}
{%- set source_relation = load_cached_relation(relation) -%}
{%- if source_relation is not none -%}
{{ return(source_relation.is_iceberg_format) }}
{%- endif -%}

{{ return(false) }}
{% endmacro %}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,22 @@ def clean_up(self, project):
pass


def copy_state(project_root):
state_path = os.path.join(project_root, "state")
if not os.path.exists(state_path):
os.makedirs(state_path)
shutil.copyfile(
os.path.join(project_root, "target", "manifest.json"),
os.path.join(project_root, "state", "manifest.json"),
)


def run_and_save_state(project_root):
results = run_dbt(["run"])
assert len(results) == 1
copy_state(project_root)


table_model_1_sql = """
{{ config(
materialized='table',
Expand Down Expand Up @@ -55,23 +71,57 @@ def profiles_config_update(self, dbt_profile_target, unique_schema, other_schema
outputs["otherschema"]["schema"] = other_schema
return {"test": {"outputs": outputs, "target": "default"}}

def copy_state(self, project_root):
state_path = os.path.join(project_root, "state")
if not os.path.exists(state_path):
os.makedirs(state_path)
shutil.copyfile(
f"{project_root}/target/manifest.json", f"{project_root}/state/manifest.json"
)
def test_can_clone_transient_table(self, project, other_schema):
project.create_test_schema(other_schema)
run_and_save_state(project.project_root)

def run_and_save_state(self, project_root, with_snapshot=False):
results = run_dbt(["run"])
clone_args = [
"clone",
"--state",
"state",
"--target",
"otherschema",
]

results = run_dbt(clone_args)
assert len(results) == 1

self.copy_state(project_root)

def test_can_clone_transient_table(self, project, other_schema):
ICEBERG_EXTERNAL_VOLUME = os.getenv("SNOWFLAKE_TEST_ICEBERG_EXTERNAL_VOLUME", "s3_iceberg_snow")

iceberg_table_model_sql = f"""
{{{{ config(
materialized='table',
table_format='iceberg',
external_volume='{ICEBERG_EXTERNAL_VOLUME}',
base_location_subpath='clone_test',
) }}}}

select 1 as id
"""


class TestSnowflakeCloneIcebergTable:
@pytest.fixture(scope="class")
def models(self):
return {
"iceberg_model.sql": iceberg_table_model_sql,
}

@pytest.fixture(scope="class")
def other_schema(self, unique_schema):
return unique_schema + "_other"

@pytest.fixture(scope="class")
def profiles_config_update(self, dbt_profile_target, unique_schema, other_schema):
outputs = {"default": dbt_profile_target, "otherschema": deepcopy(dbt_profile_target)}
outputs["default"]["schema"] = unique_schema
outputs["otherschema"]["schema"] = other_schema
return {"test": {"outputs": outputs, "target": "default"}}

def test_can_clone_iceberg_table(self, project, other_schema):
project.create_test_schema(other_schema)
self.run_and_save_state(project.project_root)
run_and_save_state(project.project_root)

clone_args = [
"clone",
Expand All @@ -84,6 +134,14 @@ def test_can_clone_transient_table(self, project, other_schema):
results = run_dbt(clone_args)
assert len(results) == 1

# Verify the cloned relation is also an Iceberg table
with project.adapter.connection_named("__test"):
schema_relations = project.adapter.list_relations(
database=project.database, schema=other_schema
)
assert len(schema_relations) == 1
assert schema_relations[0].is_iceberg_format


class TestSnowflakeCloneSameSourceAndTarget(BaseCloneSameSourceAndTarget):
pass
Loading