diff --git a/README.md b/README.md index a1ec6db..47e1785 100644 --- a/README.md +++ b/README.md @@ -50,8 +50,10 @@ MODEL ( name my_db.my_model, kind CUSTOM ( materialization 'non_idempotent_incremental_by_time_range', - time_column event_timestamp, - primary_key (event_id, event_source) + materialization_properties ( + time_column = event_timestamp, + primary_key = (event_id, event_source) + ) ) ); diff --git a/pyproject.toml b/pyproject.toml index 864cc28..a1f1021 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -5,7 +5,7 @@ description = "Utilities for SQLMesh" readme = "README.md" requires-python = ">= 3.9" dependencies = [ - "sqlmesh>=0.159.0" + "sqlmesh>=0.160.0" ] [project.optional-dependencies] diff --git a/sqlmesh_utils/materializations/non_idempotent_incremental_by_time_range.py b/sqlmesh_utils/materializations/non_idempotent_incremental_by_time_range.py index 902bb35..f7d82ad 100644 --- a/sqlmesh_utils/materializations/non_idempotent_incremental_by_time_range.py +++ b/sqlmesh_utils/materializations/non_idempotent_incremental_by_time_range.py @@ -6,38 +6,35 @@ from sqlglot import exp from sqlmesh.utils.date import make_inclusive from sqlmesh.utils.errors import ConfigError, SQLMeshError -from pydantic import field_validator, model_validator, ValidationInfo +from pydantic import model_validator from sqlmesh.utils.pydantic import list_of_fields_validator from sqlmesh.utils.date import TimeLike from sqlmesh.core.engine_adapter.base import MERGE_SOURCE_ALIAS, MERGE_TARGET_ALIAS from sqlmesh import CustomKind from sqlmesh.utils import columns_to_types_all_known -from sqlglot.optimizer.simplify import gen -import sqlmesh.core.dialect as d -from sqlmesh.core.model.kind import _property if t.TYPE_CHECKING: from sqlmesh.core.engine_adapter._typing import QueryOrDF class NonIdempotentIncrementalByTimeRangeKind(CustomKind): - time_column: TimeColumn + _time_column: TimeColumn # this is deliberately primary_key instead of unique_key to direct away from INCREMENTAL_BY_UNIQUE_KEY - primary_key: t.List[exp.Expression] + _primary_key: t.List[exp.Expression] - _time_column_validator = TimeColumn.validator() + @model_validator(mode="after") + def _validate_model(self): + self._time_column = TimeColumn.create( + self.materialization_properties.get("time_column"), dialect=self.dialect + ) - @field_validator("primary_key", mode="before") - @classmethod - def _validate_primary_key(cls, value: t.Any, info: ValidationInfo) -> t.Any: - expressions = list_of_fields_validator(value, info.data) - if not expressions: + pk_expressions = list_of_fields_validator( + self.materialization_properties.get("primary_key"), dict(dialect=self.dialect) + ) + if not pk_expressions: raise ConfigError("`primary_key` must be specified") + self._primary_key = pk_expressions - return expressions - - @model_validator(mode="after") - def _validate_model(self): time_column_present_in_primary_key = self.time_column.column in { col for expr in self.primary_key for col in expr.find_all(exp.Column) } @@ -50,24 +47,12 @@ def _validate_model(self): return self @property - def data_hash_values(self) -> t.List[t.Optional[str]]: - return [ - *super().data_hash_values, - gen(self.time_column.column), - self.time_column.format, - *(gen(k) for k in self.primary_key), - ] + def time_column(self) -> TimeColumn: + return self._time_column - def to_expression( - self, expressions: t.Optional[t.List[exp.Expression]] = None, **kwargs: t.Any - ) -> d.ModelKind: - return super().to_expression( - expressions=[ - *(expressions or []), - self.time_column.to_property(kwargs.get("dialect") or ""), - _property(name="primary_key", value=self.primary_key), - ] - ) + @property + def primary_key(self) -> t.List[exp.Expression]: + return self._primary_key class NonIdempotentIncrementalByTimeRangeMaterialization( diff --git a/tests/materializations/integration/test_integration_non_idempotent_incremental_by_time_range.py b/tests/materializations/integration/test_integration_non_idempotent_incremental_by_time_range.py index 219070d..4e27fd8 100644 --- a/tests/materializations/integration/test_integration_non_idempotent_incremental_by_time_range.py +++ b/tests/materializations/integration/test_integration_non_idempotent_incremental_by_time_range.py @@ -47,8 +47,10 @@ def test_basic_usage(project: Project): name {project.test_schema}.model, kind CUSTOM ( materialization 'non_idempotent_incremental_by_time_range', - time_column event_timestamp, - primary_key (event_id, event_source), + materialization_properties ( + time_column = event_timestamp, + primary_key = (event_id, event_source) + ), batch_size 1, batch_concurrency 1 ), @@ -137,8 +139,10 @@ def test_partial_restatement(project: Project): name {project.test_schema}.model, kind CUSTOM ( materialization 'non_idempotent_incremental_by_time_range', - time_column event_timestamp, - primary_key (event_id, event_source), + materialization_properties ( + time_column = event_timestamp, + primary_key = (event_id, event_source), + ), batch_size 1, batch_concurrency 1 ), diff --git a/tests/materializations/test_non_idempotent_incremental_by_time_range.py b/tests/materializations/test_non_idempotent_incremental_by_time_range.py index 74c52f4..0da880a 100644 --- a/tests/materializations/test_non_idempotent_incremental_by_time_range.py +++ b/tests/materializations/test_non_idempotent_incremental_by_time_range.py @@ -10,7 +10,6 @@ from tests.materializations.conftest import to_sql_calls, MockedEngineAdapterMaker from sqlmesh.core.engine_adapter.trino import TrinoEngineAdapter from sqlmesh.utils.errors import ConfigError -from pydantic import ValidationError from sqlmesh.utils.date import to_timestamp, now from sqlmesh.core.macros import RuntimeStage @@ -21,8 +20,9 @@ def make_model() -> ModelMaker: def _make(properties: t.Union[str, t.List[str]], dialect: t.Optional[str] = None) -> Model: if isinstance(properties, list): - properties = ",\n".join(properties) + "," + properties = ",\n".join(properties) + properties_sql = f"materialization_properties ({properties})," if properties else "" dialect_sql = f"dialect {dialect}," if dialect else "" expressions = d.parse(f""" @@ -30,7 +30,7 @@ def _make(properties: t.Union[str, t.List[str]], dialect: t.Optional[str] = None name test.model, kind CUSTOM ( materialization 'non_idempotent_incremental_by_time_range', - {properties} + {properties_sql} batch_size 1, batch_concurrency 1 ), @@ -48,7 +48,7 @@ def _make(properties: t.Union[str, t.List[str]], dialect: t.Optional[str] = None def test_kind(make_model: ModelMaker): # basic usage - model = make_model(["time_column ds", "primary_key (id, ds)"]) + model = make_model(["time_column = ds", "primary_key = (id, ds)"]) assert isinstance(model.kind, NonIdempotentIncrementalByTimeRangeKind) assert model.kind.time_column.column == exp.to_column("ds", quoted=True) @@ -58,22 +58,22 @@ def test_kind(make_model: ModelMaker): ] # required fields - with pytest.raises(ValidationError, match=r"time_column\n.*Field required"): + with pytest.raises(ConfigError, match=r"Invalid time_column"): model = make_model([]) - with pytest.raises(ValidationError, match=r"primary_key\n.*Field required"): - model = make_model(["time_column ds"]) + with pytest.raises(ConfigError, match=r"`primary_key` must be specified"): + model = make_model(["time_column = ds"]) with pytest.raises(ConfigError, match=r"`primary_key` must be specified"): - model = make_model(["time_column ds", "primary_key ()"]) + model = make_model(["time_column = ds", "primary_key = ()"]) # primary_key cant be the same as time_column with pytest.raises(ConfigError, match=r"primary_key` cannot be just the time_column"): - model = make_model(["time_column ds", "primary_key ds"]) + model = make_model(["time_column = ds", "primary_key = ds"]) def test_insert(make_model: ModelMaker, make_mocked_engine_adapter: MockedEngineAdapterMaker): - model: Model = make_model(["time_column ds", "primary_key name"], dialect="trino") + model: Model = make_model(["time_column = ds", "primary_key = name"], dialect="trino") adapter = make_mocked_engine_adapter(TrinoEngineAdapter) strategy = NonIdempotentIncrementalByTimeRangeMaterialization(adapter) @@ -117,7 +117,7 @@ def test_insert(make_model: ModelMaker, make_mocked_engine_adapter: MockedEngine def test_append(make_model: ModelMaker, make_mocked_engine_adapter: MockedEngineAdapterMaker): - model: Model = make_model(["time_column ds", "primary_key name"], dialect="trino") + model: Model = make_model(["time_column = ds", "primary_key = name"], dialect="trino") adapter = make_mocked_engine_adapter(TrinoEngineAdapter) strategy = NonIdempotentIncrementalByTimeRangeMaterialization(adapter)