Skip to content

Commit 47a3000

Browse files
Add cluster_by in Snowflake dynamic table materialization config (#1305)
Co-authored-by: Colin Rogers <[email protected]>
1 parent 8be7ed7 commit 47a3000

File tree

5 files changed

+140
-1
lines changed

5 files changed

+140
-1
lines changed
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
kind: Features
2+
body: Add cluster by config in Snowflake for dynamic tables
3+
time: 2025-08-30T12:08:02.458481+02:00
4+
custom:
5+
Author: nazliander
6+
Issue: "706"

dbt-snowflake/src/dbt/adapters/snowflake/relation_configs/dynamic_table.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,13 @@
11
from dataclasses import dataclass
2-
from typing import Optional, Dict, Any, TYPE_CHECKING
2+
from typing import Optional, Dict, Any, TYPE_CHECKING, Union
33

44
from dbt.adapters.relation_configs import RelationConfigChange, RelationResults
55
from dbt.adapters.contracts.relation import RelationConfig
66
from dbt.adapters.contracts.relation import ComponentName
77
from dbt_common.dataclass_schema import StrEnum # doesn't exist in standard library until py3.11
88
from typing_extensions import Self
99

10+
from dbt.adapters.snowflake.parse_model import cluster_by
1011
from dbt.adapters.snowflake.relation_configs.base import SnowflakeRelationConfigBase
1112

1213
if TYPE_CHECKING:
@@ -45,6 +46,7 @@ class SnowflakeDynamicTableConfig(SnowflakeRelationConfigBase):
4546
- snowflake_warehouse: the name of the warehouse that provides the compute resources for refreshing the dynamic table
4647
- refresh_mode: specifies the refresh type for the dynamic table
4748
- initialize: specifies the behavior of the initial refresh of the dynamic table
49+
- cluster_by: specifies the columns to cluster on
4850
4951
There are currently no non-configurable parameters.
5052
"""
@@ -59,6 +61,7 @@ class SnowflakeDynamicTableConfig(SnowflakeRelationConfigBase):
5961
initialize: Optional[Initialize] = Initialize.default()
6062
row_access_policy: Optional[str] = None
6163
table_tag: Optional[str] = None
64+
cluster_by: Optional[Union[str, list[str]]] = None
6265

6366
@classmethod
6467
def from_dict(cls, config_dict: Dict[str, Any]) -> Self:
@@ -79,6 +82,7 @@ def from_dict(cls, config_dict: Dict[str, Any]) -> Self:
7982
"initialize": config_dict.get("initialize"),
8083
"row_access_policy": config_dict.get("row_access_policy"),
8184
"table_tag": config_dict.get("table_tag"),
85+
"cluster_by": config_dict.get("cluster_by"),
8286
}
8387

8488
return super().from_dict(kwargs_dict) # type:ignore
@@ -98,6 +102,7 @@ def parse_relation_config(cls, relation_config: RelationConfig) -> Dict[str, Any
98102
"row_access_policy"
99103
),
100104
"table_tag": relation_config.config.extra.get("table_tag"), # type:ignore
105+
"cluster_by": cluster_by(relation_config),
101106
}
102107

103108
if refresh_mode := relation_config.config.extra.get("refresh_mode"): # type:ignore
@@ -122,6 +127,7 @@ def parse_relation_results(cls, relation_results: RelationResults) -> Dict[str,
122127
"refresh_mode": dynamic_table.get("refresh_mode"),
123128
"row_access_policy": dynamic_table.get("row_access_policy"),
124129
"table_tag": dynamic_table.get("table_tag"),
130+
"cluster_by": dynamic_table.get("cluster_by"),
125131
# we don't get initialize since that's a one-time scheduler attribute, not a DT attribute
126132
}
127133

dbt-snowflake/src/dbt/include/snowflake/macros/relations/dynamic_table/create.sql

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
{{ optional('initialize', dynamic_table.initialize) }}
3939
{{ optional('with row access policy', dynamic_table.row_access_policy, equals_char='') }}
4040
{{ optional('with tag', dynamic_table.table_tag, quote_char='(', equals_char='') }}
41+
{{ optional('cluster by', dynamic_table.cluster_by, quote_char='(', equals_char='') }}
4142
as (
4243
{{ sql }}
4344
)
@@ -74,6 +75,7 @@
7475
{{ optional('initialize', dynamic_table.initialize) }}
7576
{{ optional('row_access_policy', dynamic_table.row_access_policy) }}
7677
{{ optional('table_tag', dynamic_table.table_tag) }}
78+
{{ optional('cluster by', dynamic_table.cluster_by, quote_char='(', equals_char='') }}
7779
as (
7880
{{ sql }}
7981
)

dbt-snowflake/src/dbt/include/snowflake/macros/relations/dynamic_table/replace.sql

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ create or replace dynamic table {{ relation }}
5151
{{ optional('initialize', dynamic_table.initialize) }}
5252
{{ optional('with row access policy', dynamic_table.row_access_policy, equals_char='') }}
5353
{{ optional('with tag', dynamic_table.table_tag, quote_char='(', equals_char='') }}
54+
{{ optional('cluster by', dynamic_table.cluster_by, quote_char='(', equals_char='') }}
5455
as (
5556
{{ sql }}
5657
)
@@ -87,6 +88,7 @@ create or replace dynamic iceberg table {{ relation }}
8788
{{ optional('initialize', dynamic_table.initialize) }}
8889
{{ optional('row_access_policy', dynamic_table.row_access_policy) }}
8990
{{ optional('table_tag', dynamic_table.table_tag) }}
91+
{{ optional('cluster by', dynamic_table.cluster_by, quote_char='(', equals_char='') }}
9092
as (
9193
{{ sql }}
9294
)
Lines changed: 123 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,123 @@
1+
import pytest
2+
from dbt.tests.util import check_table_does_exist, run_dbt
3+
4+
_SEED_CSV = """
5+
id,first_name,last_name,email,product_id
6+
1,Jack,Hunter,[email protected],1
7+
2,Kathryn,Walker,[email protected],1
8+
3,Gerald,Ryan,[email protected],3
9+
4,Jack,Hunter,[email protected],4
10+
5,Kathryn,Walker,[email protected],5
11+
6,Gerald,Ryan,[email protected],6
12+
""".lstrip()
13+
14+
15+
class TestClusterBy:
16+
@pytest.fixture(scope="class")
17+
def seeds(self):
18+
return {"seed.csv": _SEED_CSV}
19+
20+
@pytest.fixture(scope="class")
21+
def models(self, dbt_profile_target):
22+
warehouse_name = dbt_profile_target["warehouse"]
23+
24+
_DYNAMIC_TABLE_1_SQL = f"""
25+
{{{{ config(materialized='dynamic_table', snowflake_warehouse='{warehouse_name}', target_lag='1 minute') }}}}
26+
select * from {{{{ ref('seed') }}}}
27+
""".lstrip()
28+
29+
_DYNAMIC_TABLE_2_SQL = f"""
30+
{{{{ config(materialized='dynamic_table', cluster_by=['last_name'], snowflake_warehouse='{warehouse_name}', target_lag='1 minute') }}}}
31+
select * from {{{{ ref('dynamic_table_1') }}}}
32+
""".lstrip()
33+
34+
_DYNAMIC_TABLE_3_SQL = f"""
35+
{{{{ config(materialized='dynamic_table', cluster_by=['last_name', 'first_name'], snowflake_warehouse='{warehouse_name}', target_lag='1 minute') }}}}
36+
select
37+
last_name,
38+
first_name,
39+
count(*) as count
40+
from {{{{ ref('seed') }}}}
41+
group by 1, 2
42+
""".lstrip()
43+
44+
_DYNAMIC_TABLE_4_SQL = f"""
45+
{{{{ config(materialized='dynamic_table', cluster_by=['last_name', 'product_id % 3'], snowflake_warehouse='{warehouse_name}', target_lag='1 minute') }}}}
46+
select
47+
last_name,
48+
first_name,
49+
product_id,
50+
count(*) as count
51+
from {{{{ ref('seed') }}}}
52+
group by 1, 2, 3
53+
""".lstrip()
54+
55+
return {
56+
"dynamic_table_1.sql": _DYNAMIC_TABLE_1_SQL,
57+
"dynamic_table_2.sql": _DYNAMIC_TABLE_2_SQL,
58+
"dynamic_table_3.sql": _DYNAMIC_TABLE_3_SQL,
59+
"dynamic_table_4.sql": _DYNAMIC_TABLE_4_SQL,
60+
}
61+
62+
def test_snowflake_dynamic_table_cluster_by(self, project):
63+
64+
run_dbt(["seed"])
65+
66+
db_with_schema = f"{project.database}.{project.test_schema}"
67+
68+
check_table_does_exist(
69+
project.adapter, f"{db_with_schema}.{self._available_models_in_setup()['seed_table']}"
70+
)
71+
72+
run_dbt()
73+
74+
# Check that all dynamic tables exist
75+
check_table_does_exist(
76+
project.adapter,
77+
f"{db_with_schema}.{self._available_models_in_setup()['dynamic_table_1']}",
78+
)
79+
check_table_does_exist(
80+
project.adapter,
81+
f"{db_with_schema}.{self._available_models_in_setup()['dynamic_table_2']}",
82+
)
83+
check_table_does_exist(
84+
project.adapter,
85+
f"{db_with_schema}.{self._available_models_in_setup()['dynamic_table_3']}",
86+
)
87+
check_table_does_exist(
88+
project.adapter,
89+
f"{db_with_schema}.{self._available_models_in_setup()['dynamic_table_4']}",
90+
)
91+
92+
with project.adapter.connection_named("__test"):
93+
# Check if cluster_by is applied to dynamic_table_2 (should cluster by last_name)
94+
cluster_by = self._get_dynamic_table_ddl(
95+
project, self._available_models_in_setup()["dynamic_table_2"]
96+
)
97+
assert "CLUSTER BY (LAST_NAME)" in cluster_by.upper()
98+
99+
# Check if cluster_by is applied to dynamic_table_3 (should cluster by last_name, first_name)
100+
cluster_by = self._get_dynamic_table_ddl(
101+
project, self._available_models_in_setup()["dynamic_table_3"]
102+
)
103+
assert "CLUSTER BY (LAST_NAME, FIRST_NAME)" in cluster_by.upper()
104+
105+
# Check if cluster_by is applied to dynamic_table_4 (should cluster by last_name, product_id % 3)
106+
cluster_by = self._get_dynamic_table_ddl(
107+
project, self._available_models_in_setup()["dynamic_table_4"]
108+
)
109+
assert "CLUSTER BY (LAST_NAME, PRODUCT_ID % 3)" in cluster_by.upper()
110+
111+
def _get_dynamic_table_ddl(self, project, table_name: str) -> str:
112+
ddl_query = f"SELECT GET_DDL('DYNAMIC_TABLE', '{project.database}.{project.test_schema}.{table_name}')"
113+
ddl = project.run_sql(ddl_query, fetch="one")
114+
return ddl[0]
115+
116+
def _available_models_in_setup(self) -> dict[str, str]:
117+
return dict(
118+
seed_table="SEED",
119+
dynamic_table_1="DYNAMIC_TABLE_1",
120+
dynamic_table_2="DYNAMIC_TABLE_2",
121+
dynamic_table_3="DYNAMIC_TABLE_3",
122+
dynamic_table_4="DYNAMIC_TABLE_4",
123+
)

0 commit comments

Comments
 (0)