Skip to content

Commit 6b8099c

Browse files
authored
Fix: Refactor to work with sqlmesh main (#7)
1 parent 8dc4f34 commit 6b8099c

File tree

5 files changed

+42
-51
lines changed

5 files changed

+42
-51
lines changed

README.md

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -50,8 +50,10 @@ MODEL (
5050
name my_db.my_model,
5151
kind CUSTOM (
5252
materialization 'non_idempotent_incremental_by_time_range',
53-
time_column event_timestamp,
54-
primary_key (event_id, event_source)
53+
materialization_properties (
54+
time_column = event_timestamp,
55+
primary_key = (event_id, event_source)
56+
)
5557
)
5658
);
5759

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ description = "Utilities for SQLMesh"
55
readme = "README.md"
66
requires-python = ">= 3.9"
77
dependencies = [
8-
"sqlmesh>=0.159.0"
8+
"sqlmesh>=0.160.0"
99
]
1010

1111
[project.optional-dependencies]

sqlmesh_utils/materializations/non_idempotent_incremental_by_time_range.py

Lines changed: 18 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -6,38 +6,35 @@
66
from sqlglot import exp
77
from sqlmesh.utils.date import make_inclusive
88
from sqlmesh.utils.errors import ConfigError, SQLMeshError
9-
from pydantic import field_validator, model_validator, ValidationInfo
9+
from pydantic import model_validator
1010
from sqlmesh.utils.pydantic import list_of_fields_validator
1111
from sqlmesh.utils.date import TimeLike
1212
from sqlmesh.core.engine_adapter.base import MERGE_SOURCE_ALIAS, MERGE_TARGET_ALIAS
1313
from sqlmesh import CustomKind
1414
from sqlmesh.utils import columns_to_types_all_known
15-
from sqlglot.optimizer.simplify import gen
16-
import sqlmesh.core.dialect as d
17-
from sqlmesh.core.model.kind import _property
1815

1916
if t.TYPE_CHECKING:
2017
from sqlmesh.core.engine_adapter._typing import QueryOrDF
2118

2219

2320
class NonIdempotentIncrementalByTimeRangeKind(CustomKind):
24-
time_column: TimeColumn
21+
_time_column: TimeColumn
2522
# this is deliberately primary_key instead of unique_key to direct away from INCREMENTAL_BY_UNIQUE_KEY
26-
primary_key: t.List[exp.Expression]
23+
_primary_key: t.List[exp.Expression]
2724

28-
_time_column_validator = TimeColumn.validator()
25+
@model_validator(mode="after")
26+
def _validate_model(self):
27+
self._time_column = TimeColumn.create(
28+
self.materialization_properties.get("time_column"), dialect=self.dialect
29+
)
2930

30-
@field_validator("primary_key", mode="before")
31-
@classmethod
32-
def _validate_primary_key(cls, value: t.Any, info: ValidationInfo) -> t.Any:
33-
expressions = list_of_fields_validator(value, info.data)
34-
if not expressions:
31+
pk_expressions = list_of_fields_validator(
32+
self.materialization_properties.get("primary_key"), dict(dialect=self.dialect)
33+
)
34+
if not pk_expressions:
3535
raise ConfigError("`primary_key` must be specified")
36+
self._primary_key = pk_expressions
3637

37-
return expressions
38-
39-
@model_validator(mode="after")
40-
def _validate_model(self):
4138
time_column_present_in_primary_key = self.time_column.column in {
4239
col for expr in self.primary_key for col in expr.find_all(exp.Column)
4340
}
@@ -50,24 +47,12 @@ def _validate_model(self):
5047
return self
5148

5249
@property
53-
def data_hash_values(self) -> t.List[t.Optional[str]]:
54-
return [
55-
*super().data_hash_values,
56-
gen(self.time_column.column),
57-
self.time_column.format,
58-
*(gen(k) for k in self.primary_key),
59-
]
50+
def time_column(self) -> TimeColumn:
51+
return self._time_column
6052

61-
def to_expression(
62-
self, expressions: t.Optional[t.List[exp.Expression]] = None, **kwargs: t.Any
63-
) -> d.ModelKind:
64-
return super().to_expression(
65-
expressions=[
66-
*(expressions or []),
67-
self.time_column.to_property(kwargs.get("dialect") or ""),
68-
_property(name="primary_key", value=self.primary_key),
69-
]
70-
)
53+
@property
54+
def primary_key(self) -> t.List[exp.Expression]:
55+
return self._primary_key
7156

7257

7358
class NonIdempotentIncrementalByTimeRangeMaterialization(

tests/materializations/integration/test_integration_non_idempotent_incremental_by_time_range.py

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -47,8 +47,10 @@ def test_basic_usage(project: Project):
4747
name {project.test_schema}.model,
4848
kind CUSTOM (
4949
materialization 'non_idempotent_incremental_by_time_range',
50-
time_column event_timestamp,
51-
primary_key (event_id, event_source),
50+
materialization_properties (
51+
time_column = event_timestamp,
52+
primary_key = (event_id, event_source)
53+
),
5254
batch_size 1,
5355
batch_concurrency 1
5456
),
@@ -137,8 +139,10 @@ def test_partial_restatement(project: Project):
137139
name {project.test_schema}.model,
138140
kind CUSTOM (
139141
materialization 'non_idempotent_incremental_by_time_range',
140-
time_column event_timestamp,
141-
primary_key (event_id, event_source),
142+
materialization_properties (
143+
time_column = event_timestamp,
144+
primary_key = (event_id, event_source),
145+
),
142146
batch_size 1,
143147
batch_concurrency 1
144148
),

tests/materializations/test_non_idempotent_incremental_by_time_range.py

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@
1010
from tests.materializations.conftest import to_sql_calls, MockedEngineAdapterMaker
1111
from sqlmesh.core.engine_adapter.trino import TrinoEngineAdapter
1212
from sqlmesh.utils.errors import ConfigError
13-
from pydantic import ValidationError
1413
from sqlmesh.utils.date import to_timestamp, now
1514
from sqlmesh.core.macros import RuntimeStage
1615

@@ -21,16 +20,17 @@
2120
def make_model() -> ModelMaker:
2221
def _make(properties: t.Union[str, t.List[str]], dialect: t.Optional[str] = None) -> Model:
2322
if isinstance(properties, list):
24-
properties = ",\n".join(properties) + ","
23+
properties = ",\n".join(properties)
2524

25+
properties_sql = f"materialization_properties ({properties})," if properties else ""
2626
dialect_sql = f"dialect {dialect}," if dialect else ""
2727

2828
expressions = d.parse(f"""
2929
MODEL (
3030
name test.model,
3131
kind CUSTOM (
3232
materialization 'non_idempotent_incremental_by_time_range',
33-
{properties}
33+
{properties_sql}
3434
batch_size 1,
3535
batch_concurrency 1
3636
),
@@ -48,7 +48,7 @@ def _make(properties: t.Union[str, t.List[str]], dialect: t.Optional[str] = None
4848

4949
def test_kind(make_model: ModelMaker):
5050
# basic usage
51-
model = make_model(["time_column ds", "primary_key (id, ds)"])
51+
model = make_model(["time_column = ds", "primary_key = (id, ds)"])
5252
assert isinstance(model.kind, NonIdempotentIncrementalByTimeRangeKind)
5353

5454
assert model.kind.time_column.column == exp.to_column("ds", quoted=True)
@@ -58,22 +58,22 @@ def test_kind(make_model: ModelMaker):
5858
]
5959

6060
# required fields
61-
with pytest.raises(ValidationError, match=r"time_column\n.*Field required"):
61+
with pytest.raises(ConfigError, match=r"Invalid time_column"):
6262
model = make_model([])
6363

64-
with pytest.raises(ValidationError, match=r"primary_key\n.*Field required"):
65-
model = make_model(["time_column ds"])
64+
with pytest.raises(ConfigError, match=r"`primary_key` must be specified"):
65+
model = make_model(["time_column = ds"])
6666

6767
with pytest.raises(ConfigError, match=r"`primary_key` must be specified"):
68-
model = make_model(["time_column ds", "primary_key ()"])
68+
model = make_model(["time_column = ds", "primary_key = ()"])
6969

7070
# primary_key cant be the same as time_column
7171
with pytest.raises(ConfigError, match=r"primary_key` cannot be just the time_column"):
72-
model = make_model(["time_column ds", "primary_key ds"])
72+
model = make_model(["time_column = ds", "primary_key = ds"])
7373

7474

7575
def test_insert(make_model: ModelMaker, make_mocked_engine_adapter: MockedEngineAdapterMaker):
76-
model: Model = make_model(["time_column ds", "primary_key name"], dialect="trino")
76+
model: Model = make_model(["time_column = ds", "primary_key = name"], dialect="trino")
7777
adapter = make_mocked_engine_adapter(TrinoEngineAdapter)
7878
strategy = NonIdempotentIncrementalByTimeRangeMaterialization(adapter)
7979

@@ -117,7 +117,7 @@ def test_insert(make_model: ModelMaker, make_mocked_engine_adapter: MockedEngine
117117

118118

119119
def test_append(make_model: ModelMaker, make_mocked_engine_adapter: MockedEngineAdapterMaker):
120-
model: Model = make_model(["time_column ds", "primary_key name"], dialect="trino")
120+
model: Model = make_model(["time_column = ds", "primary_key = name"], dialect="trino")
121121
adapter = make_mocked_engine_adapter(TrinoEngineAdapter)
122122
strategy = NonIdempotentIncrementalByTimeRangeMaterialization(adapter)
123123

0 commit comments

Comments
 (0)