Skip to content

Commit 96d5be2

Browse files
committed
ensure unique_key is included in physical properties for incremental unique key model
1 parent 870675d commit 96d5be2

File tree

1 file changed

+40
-16
lines changed

1 file changed

+40
-16
lines changed

sqlmesh/core/snapshot/evaluator.py

Lines changed: 40 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1753,6 +1753,28 @@ def demote(self, view_name: str, **kwargs: t.Any) -> None:
17531753
self.adapter.drop_view(view_name, cascade=False)
17541754

17551755

1756+
def _add_unique_key_to_physical_properties_for_doris(
1757+
model: Model, physical_properties: t.Dict[str, t.Any]
1758+
) -> t.Dict[str, t.Any]:
1759+
"""
1760+
For Doris dialect with INCREMENTAL_BY_UNIQUE_KEY models, ensure unique_key is added
1761+
to physical properties if not already present.
1762+
"""
1763+
if (
1764+
model.dialect == "doris"
1765+
and model.kind.is_incremental_by_unique_key
1766+
and model.unique_key
1767+
and "unique_key" not in physical_properties
1768+
):
1769+
physical_properties = dict(physical_properties)
1770+
physical_properties["unique_key"] = (
1771+
model.unique_key[0]
1772+
if len(model.unique_key) == 1
1773+
else exp.Tuple(expressions=model.unique_key)
1774+
)
1775+
return physical_properties
1776+
1777+
17561778
class MaterializableStrategy(PromotableStrategy, abc.ABC):
17571779
def create(
17581780
self,
@@ -1764,19 +1786,9 @@ def create(
17641786
) -> None:
17651787
ctas_query = model.ctas_query(**render_kwargs)
17661788
physical_properties = kwargs.get("physical_properties", model.physical_properties)
1767-
# If Doris and incremental-by-unique-key, ensure unique_key is present for creation
1768-
if (
1769-
model.dialect == "doris"
1770-
and getattr(model.kind, "is_incremental_by_unique_key", False)
1771-
and model.unique_key
1772-
and "unique_key" not in physical_properties
1773-
):
1774-
physical_properties = dict(physical_properties)
1775-
physical_properties["unique_key"] = (
1776-
model.unique_key[0]
1777-
if len(model.unique_key) == 1
1778-
else exp.Tuple(expressions=model.unique_key)
1779-
)
1789+
physical_properties = _add_unique_key_to_physical_properties_for_doris(
1790+
model, physical_properties
1791+
)
17801792

17811793
logger.info("Creating table '%s'", table_name)
17821794
if model.annotated:
@@ -1874,6 +1886,10 @@ def _replace_query_for_model(
18741886
except Exception:
18751887
columns_to_types, source_columns = None, None
18761888

1889+
physical_properties = kwargs.get("physical_properties", model.physical_properties)
1890+
physical_properties = _add_unique_key_to_physical_properties_for_doris(
1891+
model, physical_properties
1892+
)
18771893
self.adapter.replace_query(
18781894
name,
18791895
query_or_df,
@@ -1882,7 +1898,7 @@ def _replace_query_for_model(
18821898
partitioned_by=model.partitioned_by,
18831899
partition_interval_unit=model.partition_interval_unit,
18841900
clustered_by=model.clustered_by,
1885-
table_properties=kwargs.get("physical_properties", model.physical_properties),
1901+
table_properties=physical_properties,
18861902
table_description=model.description,
18871903
column_descriptions=model.column_descriptions,
18881904
target_columns_to_types=columns_to_types,
@@ -2005,6 +2021,10 @@ def insert(
20052021
table_name,
20062022
render_kwargs=render_kwargs,
20072023
)
2024+
physical_properties = kwargs.get("physical_properties", model.physical_properties)
2025+
physical_properties = _add_unique_key_to_physical_properties_for_doris(
2026+
model, physical_properties
2027+
)
20082028
self.adapter.merge(
20092029
table_name,
20102030
query_or_df,
@@ -2016,7 +2036,7 @@ def insert(
20162036
end=kwargs.get("end"),
20172037
execution_time=kwargs.get("execution_time"),
20182038
),
2019-
physical_properties=kwargs.get("physical_properties", model.physical_properties),
2039+
physical_properties=physical_properties,
20202040
source_columns=source_columns,
20212041
)
20222042

@@ -2573,13 +2593,17 @@ def create(
25732593
if is_table_deployable and is_snapshot_deployable:
25742594
# We could deploy this to prod; create a proper managed table
25752595
logger.info("Creating managed table: %s", table_name)
2596+
physical_properties = kwargs.get("physical_properties", model.physical_properties)
2597+
physical_properties = _add_unique_key_to_physical_properties_for_doris(
2598+
model, physical_properties
2599+
)
25762600
self.adapter.create_managed_table(
25772601
table_name=table_name,
25782602
query=model.render_query_or_raise(**render_kwargs),
25792603
target_columns_to_types=model.columns_to_types,
25802604
partitioned_by=model.partitioned_by,
25812605
clustered_by=model.clustered_by,
2582-
table_properties=kwargs.get("physical_properties", model.physical_properties),
2606+
table_properties=physical_properties,
25832607
table_description=model.description,
25842608
column_descriptions=model.column_descriptions,
25852609
table_format=model.table_format,

0 commit comments

Comments
 (0)