Skip to content

Commit b0223a8

Browse files
colin-rogers-dbtzqureshitnk-yskVersusFacitsandeepmullangi2
authored
update incremental sql to support iceberg REST w CLD (#1287)
Co-authored-by: Zeeshan Qureshi <[email protected]> Co-authored-by: tnk-ysk <[email protected]> Co-authored-by: Mila Page <[email protected]> Co-authored-by: sandeepmullangi2 <[email protected]> Co-authored-by: Daisuke Taniwaki <[email protected]> Co-authored-by: Doug Beatty <[email protected]>
1 parent 317e809 commit b0223a8

File tree

9 files changed

+141
-22
lines changed

9 files changed

+141
-22
lines changed
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
kind: Features
2+
body: 'Add support for incremental materializations using iceberg rest catalog / catalog
3+
linked db '
4+
time: 2025-09-02T12:59:17.884349-07:00
5+
custom:
6+
Author: colin-rogers-dbt
7+
Issue: "1123"

dbt-snowflake/src/dbt/adapters/snowflake/catalogs/_iceberg_rest.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ class IcebergRestCatalogIntegration(CatalogIntegration):
3939
def __init__(self, config: CatalogIntegrationConfig) -> None:
4040
# we overwrite this because the base provides too much config
4141
self.name: str = config.name
42+
self.catalog_name: Optional[str] = config.catalog_name
4243
self.external_volume: Optional[str] = config.external_volume
4344
if adapter_properties := config.adapter_properties:
4445
self.catalog_linked_database = adapter_properties.get("catalog_linked_database")
@@ -75,7 +76,7 @@ def build_relation(self, model: RelationConfig) -> IcebergRestCatalogRelation:
7576
)
7677

7778
return IcebergRestCatalogRelation(
78-
catalog_name=self.catalog_name,
79+
catalog_name=self.name,
7980
external_volume=None,
8081
catalog_linked_database=self.catalog_linked_database,
8182
auto_refresh=parse_model.auto_refresh(model) or self.auto_refresh,

dbt-snowflake/src/dbt/include/snowflake/macros/adapters.sql

Lines changed: 34 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -229,6 +229,35 @@
229229

230230

231231

232+
{% macro snowflake__is_catalog_linked_database(relation=none, catalog_relation=none) -%}
233+
{#-- Helper macro to detect if we're in a catalog-linked database context --#}
234+
{%- if catalog_relation is not none -%}
235+
{#-- Direct catalog_relation object provided --#}
236+
{%- if catalog_relation|attr('catalog_linked_database') -%}
237+
{{ return(true) }}
238+
{%- else -%}
239+
{{ return(false) }}
240+
{%- endif -%}
241+
{%- elif relation and relation.config -%}
242+
{%- set catalog_relation = adapter.build_catalog_relation(relation) -%}
243+
{%- if catalog_relation is not none and catalog_relation|attr('catalog_linked_database') -%}
244+
{{ return(true) }}
245+
{%- else -%}
246+
{{ return(false) }}
247+
{%- endif -%}
248+
{%- elif relation and relation.catalog -%}
249+
{#-- Relation with catalog attribute --#}
250+
{%- set catalog_integration = adapter.get_catalog_integration(relation.catalog) -%}
251+
{%- if catalog_integration is not none and catalog_integration|attr('catalog_linked_database') -%}
252+
{{ return(true) }}
253+
{%- else -%}
254+
{{ return(false) }}
255+
{%- endif -%}
256+
{%- else -%}
257+
{{ return(false) }}
258+
{%- endif -%}
259+
{%- endmacro %}
260+
232261
{% macro snowflake_dml_explicit_transaction(dml) %}
233262
{#
234263
Use this macro to wrap all INSERT, MERGE, UPDATE, DELETE, and TRUNCATE
@@ -251,6 +280,10 @@
251280
truncate table {{ relation.render() }}
252281
{% endset %}
253282
{% call statement('truncate_relation') -%}
254-
{{ snowflake_dml_explicit_transaction(truncate_dml) }}
283+
{% if snowflake__is_catalog_linked_database(relation=config.model) %}
284+
{{ truncate_dml }}
285+
{% else %}
286+
{{ snowflake_dml_explicit_transaction(truncate_dml) }}
287+
{% endif %}
255288
{%- endcall %}
256289
{% endmacro %}

dbt-snowflake/src/dbt/include/snowflake/macros/materializations/incremental.sql

Lines changed: 24 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,15 @@
2323
When unique_key is none, the delete+insert and microbatch strategies can use a view beacuse a
2424
single INSERT statement is run with no DELETES as part of the statement.
2525
Otherwise, play it safe by using a temporary table.
26+
27+
Catalog-linked databases (Iceberg tables) does not support using temporary relations.
2628
#} */
2729

30+
{#-- Always use table for catalog-linked databases (Iceberg) --#}
31+
{% if snowflake__is_catalog_linked_database(relation=config.model) %}
32+
{{ return("table") }}
33+
{% endif %}
34+
2835
{% if language == "python" and tmp_relation_type is not none %}
2936
{% do exceptions.raise_compiler_error(
3037
"Python models currently only support 'table' for tmp_relation_type but "
@@ -67,12 +74,15 @@
6774
{%- set identifier = this.name -%}
6875
{%- set catalog_relation = adapter.build_catalog_relation(config.model) -%}
6976

77+
{%- set is_catalog_linked_db = snowflake__is_catalog_linked_database(relation=none, catalog_relation=catalog_relation) -%}
78+
7079
{%- set target_relation = api.Relation.create(
7180
identifier=identifier,
7281
schema=schema,
7382
database=database,
7483
type='table',
75-
table_format=catalog_relation.table_format
84+
table_format=catalog_relation.table_format,
85+
catalog=config.model.catalog,
7686
) -%}
7787

7888
{% set existing_relation = load_relation(this) %}
@@ -81,7 +91,12 @@
8191
{%- set unique_key = config.get('unique_key') -%}
8292
{% set incremental_strategy = config.get('incremental_strategy') or 'default' %}
8393
{% set tmp_relation_type = dbt_snowflake_get_tmp_relation_type(incremental_strategy, unique_key, language) %}
84-
{% set tmp_relation = make_temp_relation(this).incorporate(type=tmp_relation_type) %}
94+
95+
{% if is_catalog_linked_db %}
96+
{% set tmp_relation = make_temp_relation(this).incorporate(type=tmp_relation_type, catalog=catalog_relation.catalog_name, is_table=true) %}
97+
{% else %}
98+
{% set tmp_relation = make_temp_relation(this).incorporate(type=tmp_relation_type) %}
99+
{% endif %}
85100

86101
{% set grant_config = config.get('grants') %}
87102

@@ -118,7 +133,11 @@
118133

119134
{% else %}
120135
{#-- Create the temp relation, either as a view or as a temp table --#}
121-
{% if tmp_relation_type == 'view' %}
136+
{% if is_catalog_linked_db %}
137+
{%- call statement('create_tmp_relation', language=language) -%}
138+
{{ create_table_as(False, tmp_relation, compiled_code, language) }}
139+
{%- endcall -%}
140+
{% elif tmp_relation_type == 'view' %}
122141
{%- call statement('create_tmp_relation') -%}
123142
{{ snowflake__create_view_as_with_temp_flag(tmp_relation, compiled_code, True) }}
124143
{%- endcall -%}
@@ -140,13 +159,14 @@
140159
{#-- Get the incremental_strategy, the macro to use for the strategy, and build the sql --#}
141160
{% set incremental_predicates = config.get('predicates', none) or config.get('incremental_predicates', none) %}
142161
{% set strategy_sql_macro_func = adapter.get_incremental_strategy_macro(context, incremental_strategy) %}
143-
{% set strategy_arg_dict = ({'target_relation': target_relation, 'temp_relation': tmp_relation, 'unique_key': unique_key, 'dest_columns': dest_columns, 'incremental_predicates': incremental_predicates }) %}
162+
{% set strategy_arg_dict = ({'target_relation': target_relation, 'temp_relation': tmp_relation, 'unique_key': unique_key, 'dest_columns': dest_columns, 'incremental_predicates': incremental_predicates, 'catalog_relation': catalog_relation }) %}
144163

145164
{%- call statement('main') -%}
146165
{{ strategy_sql_macro_func(strategy_arg_dict) }}
147166
{%- endcall -%}
148167
{% endif %}
149168

169+
150170
{% do drop_relation_if_exists(tmp_relation) %}
151171

152172
{{ run_hooks(post_hooks) }}

dbt-snowflake/src/dbt/include/snowflake/macros/materializations/incremental/insert_overwrite.sql

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,11 @@
3232

3333
{%- endset -%}
3434

35-
{% do return(snowflake_dml_explicit_transaction(dml)) %}
35+
{#-- Skip transaction wrapping for catalog-linked databases --#}
36+
{% if snowflake__is_catalog_linked_database(relation=config.model) %}
37+
{% do return(dml) %}
38+
{% else %}
39+
{% do return(snowflake_dml_explicit_transaction(dml)) %}
40+
{% endif %}
3641

3742
{% endmacro %}

dbt-snowflake/src/dbt/include/snowflake/macros/materializations/incremental/merge.sql

Lines changed: 25 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
12
{% macro snowflake__get_merge_sql(target, source_sql, unique_key, dest_columns, incremental_predicates) -%}
23

34
{#
@@ -27,26 +28,46 @@
2728
{%- endif -%}
2829
{%- endset -%}
2930

30-
{% do return(snowflake_dml_explicit_transaction(dml)) %}
31+
{#-- Skip transaction wrapping for catalog-linked databases --#}
32+
{% if snowflake__is_catalog_linked_database(relation=config.model) %}
33+
{% do return(dml) %}
34+
{% else %}
35+
{% do return(snowflake_dml_explicit_transaction(dml)) %}
36+
{% endif %}
3137

3238
{% endmacro %}
3339

3440

3541
{% macro snowflake__get_delete_insert_merge_sql(target, source, unique_key, dest_columns, incremental_predicates) %}
3642
{% set dml = default__get_delete_insert_merge_sql(target, source, unique_key, dest_columns, incremental_predicates) %}
37-
{% do return(snowflake_dml_explicit_transaction(dml)) %}
43+
{#-- Skip transaction wrapping for catalog-linked databases --#}
44+
{% if snowflake__is_catalog_linked_database(relation=config.model) %}
45+
{% do return(dml) %}
46+
{% else %}
47+
{% do return(snowflake_dml_explicit_transaction(dml)) %}
48+
{% endif %}
3849
{% endmacro %}
3950

4051

4152
{% macro snowflake__snapshot_merge_sql(target, source, insert_cols) %}
4253
{% set dml = default__snapshot_merge_sql(target, source, insert_cols) %}
43-
{% do return(snowflake_dml_explicit_transaction(dml)) %}
54+
{#-- Skip transaction wrapping for catalog-linked databases --#}
55+
{% if snowflake__is_catalog_linked_database(relation=target) %}
56+
{% do return(dml) %}
57+
{% else %}
58+
{% do return(snowflake_dml_explicit_transaction(dml)) %}
59+
{% endif %}
4460
{% endmacro %}
4561

4662

4763
{% macro snowflake__get_incremental_append_sql(get_incremental_append_sql) %}
4864
{% set dml = default__get_incremental_append_sql(get_incremental_append_sql) %}
49-
{% do return(snowflake_dml_explicit_transaction(dml)) %}
65+
{#-- Skip transaction wrapping for catalog-linked databases --#}
66+
{% if snowflake__is_catalog_linked_database(config.model) %}
67+
{% do return(dml) %}
68+
{% else %}
69+
{% do return(snowflake_dml_explicit_transaction(dml)) %}
70+
{% endif %}
5071
{% endmacro %}
5172

5273

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,9 @@
1-
{% macro snowflake__get_drop_table_sql(relation) %}
2-
drop table if exists {{ relation }} cascade
1+
{% macro snowflake__drop_table(relation) %}
2+
{#-- CASCADE is not supported in catalog-linked databases --#}
3+
4+
{% if snowflake__is_catalog_linked_database(relation=relation) %}
5+
drop table if exists {{ relation }}
6+
{% else %}
7+
drop table if exists {{ relation }} cascade
8+
{% endif %}
39
{% endmacro %}

dbt-snowflake/tests/functional/adapter/catalog_integrations/test_iceberg_rest_catalog_integrations.py

Lines changed: 35 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,38 @@
1717
select 1 as id
1818
"""
1919

20+
MODEL__INCREMENTAL_ICEBERG_REST = """
21+
{{
22+
config(
23+
materialized='incremental',
24+
catalog_name='basic_iceberg_rest_catalog',
25+
incremental_strategy='merge',
26+
unique_key="id",
27+
)
28+
}}
29+
select * from {{ ref('basic_iceberg_table') }}
30+
31+
{% if is_incremental() %}
32+
where id > 2
33+
{% endif %}
34+
"""
35+
36+
MODEL__INCREMENTAL_ICEBERG_REST_INSERT_OVERWRITE = """
37+
{{
38+
config(
39+
materialized='incremental',
40+
catalog_name='basic_iceberg_rest_catalog',
41+
incremental_strategy='insert_overwrite',
42+
unique_key="id",
43+
)
44+
}}
45+
select * from {{ ref('basic_iceberg_table') }}
46+
47+
{% if is_incremental() %}
48+
where id > 2
49+
{% endif %}
50+
"""
51+
2052

2153
class TestSnowflakeIcebergRestCatalogIntegration(BaseCatalogIntegrationValidation):
2254

@@ -62,10 +94,12 @@ def models(self):
6294
"models": {
6395
"basic_iceberg_table.sql": MODEL__BASIC_ICEBERG_TABLE,
6496
"iceberg_table_with_catalog_config.sql": MODEL__ICEBERG_TABLE_WITH_CATALOG_CONFIG,
97+
"incremental_iceberg_rest.sql": MODEL__INCREMENTAL_ICEBERG_REST,
98+
"incremental_iceberg_rest_insert_overwrite.sql": MODEL__INCREMENTAL_ICEBERG_REST_INSERT_OVERWRITE,
6599
}
66100
}
67101

68102
def test_basic_iceberg_rest_catalog_integration(self, project):
69103
result = run_dbt(["run"])
70-
assert len(result) == 2
104+
assert len(result) == 4
71105
run_dbt(["run"])

dbt-snowflake/tests/unit/test_iceberg_rest_catalog_integration.py

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -128,14 +128,6 @@ def test_integration_with_environment_variable_for_macro(self):
128128
# Simulate the macro check
129129
assert hasattr(relation, "catalog_linked_database")
130130

131-
# The macro would use catalog_name when catalog_linked_database is available
132-
if relation is not None and hasattr(relation, "catalog_linked_database"):
133-
result = relation.catalog_name
134-
else:
135-
result = "target_database"
136-
137-
assert result == "POLARIS"
138-
139131
def test_catalog_relation_all_attributes_present(self):
140132
"""Test that all expected attributes are present on the catalog relation."""
141133
relation = IcebergRestCatalogRelation(

0 commit comments

Comments
 (0)