Skip to content

Commit affbd61

Browse files
authored
fix: Diff issues with STs that lead to always triggering config change (#1181)
### Description In order to not always trigger on_config_change with streaming tables, we need to track when config changes actually happen; further, as part of debugging the failing tests, discovered that the Refresh schedule would always trigger config change due to difference between None and default of Utc that Databricks stores with. ### Checklist - [x] I have run this code in development and it appears to resolve the stated issue - [x] This PR includes tests, or tests are not required/relevant for this PR - [x] I have updated the `CHANGELOG.md` and added information about my change to the "dbt-databricks next" section.
1 parent 64c6347 commit affbd61

File tree

10 files changed

+63
-14
lines changed

10 files changed

+63
-14
lines changed

CHANGELOG.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,9 @@
11
## dbt-databricks 1.10.13 (TBD)
22

3+
### Fixes
4+
5+
- Fix issue causing MV/STs to always trigger as having their config changed ([1181](http://github.com/databricks/dbt-databricks/pull/1181))
6+
37
## dbt-databricks 1.10.12 (September 8, 2025)
48

59
### Under the hood

dbt/adapters/databricks/relation_configs/query.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,3 +42,11 @@ def from_relation_config(cls, relation_config: RelationConfig) -> QueryConfig:
4242
raise DbtRuntimeError(
4343
f"Cannot compile model {relation_config.identifier} with no SQL query"
4444
)
45+
46+
47+
class DescribeQueryProcessor(QueryProcessor):
48+
@classmethod
49+
def from_relation_results(cls, result: RelationResults) -> QueryConfig:
50+
table = result["describe_extended"]
51+
row = next(x for x in table if x[0] == "View Text")
52+
return QueryConfig(query=SqlUtils.clean_sql(row[1]))

dbt/adapters/databricks/relation_configs/refresh.py

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import re
2-
from typing import ClassVar, Optional
2+
from typing import Any, ClassVar, Optional
33

44
from dbt_common.exceptions import DbtRuntimeError
55

@@ -25,6 +25,18 @@ class RefreshConfig(DatabricksComponentConfig):
2525
# switching from manual refresh to scheduled or vice versa.
2626
is_altered: bool = False
2727

28+
def __eq__(self, other: Any) -> bool:
29+
if not isinstance(other, RefreshConfig):
30+
return False
31+
return self.cron == other.cron and (
32+
(
33+
self.time_zone_value is None
34+
and other.time_zone_value
35+
and "utc" in other.time_zone_value.lower()
36+
)
37+
or (other.time_zone_value == other.time_zone_value)
38+
)
39+
2840
def get_diff(self, other: "RefreshConfig") -> Optional["RefreshConfig"]:
2941
if self != other:
3042
return RefreshConfig(

dbt/adapters/databricks/relation_configs/streaming_table.py

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
from dbt.adapters.databricks.relation_configs.partitioning import (
1212
PartitionedByProcessor,
1313
)
14+
from dbt.adapters.databricks.relation_configs.query import DescribeQueryProcessor
1415
from dbt.adapters.databricks.relation_configs.refresh import RefreshConfig, RefreshProcessor
1516
from dbt.adapters.databricks.relation_configs.tblproperties import (
1617
TblPropertiesProcessor,
@@ -23,6 +24,7 @@ class StreamingTableConfig(DatabricksRelationConfigBase):
2324
CommentProcessor,
2425
TblPropertiesProcessor,
2526
RefreshProcessor,
27+
DescribeQueryProcessor,
2628
]
2729

2830
def get_changeset(
@@ -33,16 +35,21 @@ def get_changeset(
3335
"""
3436
changes: dict[str, DatabricksComponentConfig] = {}
3537
requires_refresh = False
38+
requires_replace = False
3639

3740
for component in self.config_components:
3841
key = component.name
3942
value = self.config[key]
4043
diff = value.get_diff(existing.config[key])
4144
if key == "partition_by" and diff is not None:
4245
requires_refresh = True
46+
if diff and diff != RefreshConfig():
47+
requires_replace = True
4348
diff = diff or value
44-
4549
if diff != RefreshConfig():
4650
changes[key] = diff
47-
48-
return DatabricksRelationChangeSet(changes=changes, requires_full_refresh=requires_refresh)
51+
if requires_replace:
52+
return DatabricksRelationChangeSet(
53+
changes=changes, requires_full_refresh=requires_refresh
54+
)
55+
return None

dbt/include/databricks/macros/materializations/streaming_table.sql

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
{% set on_configuration_change = config.get('on_configuration_change') %}
3636
{% set configuration_changes = get_configuration_changes(existing_relation) %}
3737
{% if configuration_changes is none %}
38+
{{ log("REFRESHING STREAMING TABLE: " ~ target_relation) }}
3839
{% set build_sql = refresh_streaming_table(target_relation, sql) %}
3940

4041
{% elif on_configuration_change == 'apply' %}

dbt/include/databricks/macros/relations/streaming_table/refresh.sql

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,5 @@
33
{%- endmacro %}
44

55
{% macro databricks__refresh_streaming_table(relation, sql) -%}
6-
create or refresh streaming table {{ relation.render() }}
7-
as
8-
{{ sql }}
6+
refresh streaming table {{ relation.render() }}
97
{% endmacro %}

tests/functional/adapter/incremental/fixtures.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -564,6 +564,7 @@ def model(dbt, spark):
564564
models:
565565
- name: tags
566566
config:
567+
use_notebook: true
567568
tags: ["python"]
568569
databricks_tags:
569570
a: b
@@ -587,6 +588,7 @@ def model(dbt, spark):
587588
models:
588589
- name: tblproperties
589590
config:
591+
use_notebook: true
590592
tags: ["python"]
591593
tblproperties:
592594
a: b

tests/functional/adapter/streaming_tables/test_st_basic.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -149,8 +149,11 @@ def test_streaming_table_create(self, project, my_streaming_table):
149149
def test_streaming_table_create_idempotent(self, project, my_streaming_table):
150150
# setup creates it once; verify it's there and run once
151151
assert self.query_relation_type(project, my_streaming_table) == "streaming_table"
152-
util.run_dbt(["run", "--models", my_streaming_table.identifier])
152+
_, logs = util.run_dbt_and_capture(
153+
["--debug", "run", "--models", my_streaming_table.identifier]
154+
)
153155
assert self.query_relation_type(project, my_streaming_table) == "streaming_table"
156+
util.assert_message_in_logs("REFRESHING STREAMING TABLE", logs)
154157

155158
def test_streaming_table_full_refresh(self, project, my_streaming_table):
156159
_, logs = util.run_dbt_and_capture(

tests/functional/adapter/streaming_tables/test_st_changes.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -243,3 +243,11 @@ def test_change_is_not_applied_via_replace(self, project, my_streaming_table):
243243
)
244244
util.assert_message_in_logs(f"Applying ALTER to: {my_streaming_table}", logs, False)
245245
util.assert_message_in_logs(f"Applying REPLACE to: {my_streaming_table}", logs, False)
246+
247+
def test_idempotent_run_does_not_fail(self, project, my_streaming_table):
248+
assert self.query_relation_type(project, my_streaming_table) == "streaming_table"
249+
_, log = util.run_dbt_and_capture(
250+
["--debug", "run", "--models", my_streaming_table.identifier]
251+
)
252+
assert self.query_relation_type(project, my_streaming_table) == "streaming_table"
253+
util.assert_message_in_logs("REFRESHING STREAMING TABLE", log)

tests/unit/relation_configs/test_streaming_table_config.py

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
from dbt.adapters.databricks.relation_configs.comment import CommentConfig
44
from dbt.adapters.databricks.relation_configs.partitioning import PartitionedByConfig
5+
from dbt.adapters.databricks.relation_configs.query import QueryConfig
56
from dbt.adapters.databricks.relation_configs.refresh import RefreshConfig
67
from dbt.adapters.databricks.relation_configs.streaming_table import (
78
StreamingTableConfig,
@@ -22,6 +23,7 @@ def test_from_results(self):
2223
["Catalog:", "default", None],
2324
["Comment", "This is the table comment", None],
2425
["Refresh Schedule", "MANUAL", None],
26+
["View Text", "select * from foo", None],
2527
],
2628
),
2729
"show_tblproperties": fixtures.gen_tblproperties([["prop", "1"], ["other", "other"]]),
@@ -35,6 +37,7 @@ def test_from_results(self):
3537
"comment": CommentConfig(comment="This is the table comment"),
3638
"tblproperties": TblPropertiesConfig(tblproperties={"prop": "1", "other": "other"}),
3739
"refresh": RefreshConfig(),
40+
"query": QueryConfig(query="select * from foo"),
3841
}
3942
)
4043

@@ -59,6 +62,7 @@ def test_from_model_node(self):
5962
"comment": CommentConfig(comment="This is the table comment", persist=False),
6063
"tblproperties": TblPropertiesConfig(tblproperties={"prop": "1", "other": "other"}),
6164
"refresh": RefreshConfig(),
65+
"query": QueryConfig(query="select * from foo"),
6266
}
6367
)
6468

@@ -69,6 +73,7 @@ def test_get_changeset__no_changes(self):
6973
"comment": CommentConfig(comment="This is the table comment"),
7074
"tblproperties": TblPropertiesConfig(tblproperties={"prop": "1", "other": "other"}),
7175
"refresh": RefreshConfig(),
76+
"query": QueryConfig(query="select * from foo"),
7277
}
7378
)
7479
new = StreamingTableConfig(
@@ -77,16 +82,13 @@ def test_get_changeset__no_changes(self):
7782
"comment": CommentConfig(comment="This is the table comment"),
7883
"tblproperties": TblPropertiesConfig(tblproperties={"prop": "1", "other": "other"}),
7984
"refresh": RefreshConfig(),
85+
"query": QueryConfig(query="select * from foo"),
8086
}
8187
)
8288

8389
changeset = new.get_changeset(old)
84-
assert not changeset.requires_full_refresh
85-
assert changeset.changes == {
86-
"tblproperties": TblPropertiesConfig(tblproperties={"prop": "1", "other": "other"}),
87-
"comment": CommentConfig(comment="This is the table comment"),
88-
"partition_by": PartitionedByConfig(partition_by=["col_a", "col_b"]),
89-
}
90+
# Based on the new logic, when there are no changes, get_changeset returns None
91+
assert changeset is None
9092

9193
def test_get_changeset__some_changes(self):
9294
old = StreamingTableConfig(
@@ -95,6 +97,7 @@ def test_get_changeset__some_changes(self):
9597
"comment": CommentConfig(comment="This is the table comment"),
9698
"tblproperties": TblPropertiesConfig(tblproperties={"prop": "1", "other": "other"}),
9799
"refresh": RefreshConfig(),
100+
"query": QueryConfig(query="select * from foo"),
98101
}
99102
)
100103
new = StreamingTableConfig(
@@ -103,15 +106,18 @@ def test_get_changeset__some_changes(self):
103106
"comment": CommentConfig(comment="This is the table comment"),
104107
"tblproperties": TblPropertiesConfig(tblproperties={"prop": "1", "other": "other"}),
105108
"refresh": RefreshConfig(cron="*/5 * * * *"),
109+
"query": QueryConfig(query="select * from foo"),
106110
}
107111
)
108112

109113
changeset = new.get_changeset(old)
114+
assert changeset is not None
110115
assert changeset.has_changes
111116
assert changeset.requires_full_refresh
112117
assert changeset.changes == {
113118
"partition_by": PartitionedByConfig(partition_by=["col_a"]),
114119
"comment": CommentConfig(comment="This is the table comment"),
115120
"tblproperties": TblPropertiesConfig(tblproperties={"prop": "1", "other": "other"}),
116121
"refresh": RefreshConfig(cron="*/5 * * * *"),
122+
"query": QueryConfig(query="select * from foo"),
117123
}

0 commit comments

Comments
 (0)