Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,10 @@ MODEL (
name my_db.my_model,
kind CUSTOM (
materialization 'non_idempotent_incremental_by_time_range',
time_column event_timestamp,
primary_key (event_id, event_source)
materialization_properties (
time_column = event_timestamp,
primary_key = (event_id, event_source)
)
)
);

Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ description = "Utilities for SQLMesh"
readme = "README.md"
requires-python = ">= 3.9"
dependencies = [
"sqlmesh>=0.159.0"
"sqlmesh>=0.160.0"
]

[project.optional-dependencies]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,38 +6,35 @@
from sqlglot import exp
from sqlmesh.utils.date import make_inclusive
from sqlmesh.utils.errors import ConfigError, SQLMeshError
from pydantic import field_validator, model_validator, ValidationInfo
from pydantic import model_validator
from sqlmesh.utils.pydantic import list_of_fields_validator
from sqlmesh.utils.date import TimeLike
from sqlmesh.core.engine_adapter.base import MERGE_SOURCE_ALIAS, MERGE_TARGET_ALIAS
from sqlmesh import CustomKind
from sqlmesh.utils import columns_to_types_all_known
from sqlglot.optimizer.simplify import gen
import sqlmesh.core.dialect as d
from sqlmesh.core.model.kind import _property

if t.TYPE_CHECKING:
from sqlmesh.core.engine_adapter._typing import QueryOrDF


class NonIdempotentIncrementalByTimeRangeKind(CustomKind):
time_column: TimeColumn
_time_column: TimeColumn
# this is deliberately primary_key instead of unique_key to direct away from INCREMENTAL_BY_UNIQUE_KEY
primary_key: t.List[exp.Expression]
_primary_key: t.List[exp.Expression]

_time_column_validator = TimeColumn.validator()
@model_validator(mode="after")
def _validate_model(self):
self._time_column = TimeColumn.create(
self.materialization_properties.get("time_column"), dialect=self.dialect
)

@field_validator("primary_key", mode="before")
@classmethod
def _validate_primary_key(cls, value: t.Any, info: ValidationInfo) -> t.Any:
expressions = list_of_fields_validator(value, info.data)
if not expressions:
pk_expressions = list_of_fields_validator(
self.materialization_properties.get("primary_key"), dict(dialect=self.dialect)
)
if not pk_expressions:
raise ConfigError("`primary_key` must be specified")
self._primary_key = pk_expressions

return expressions

@model_validator(mode="after")
def _validate_model(self):
time_column_present_in_primary_key = self.time_column.column in {
col for expr in self.primary_key for col in expr.find_all(exp.Column)
}
Expand All @@ -50,24 +47,12 @@ def _validate_model(self):
return self

@property
def data_hash_values(self) -> t.List[t.Optional[str]]:
return [
*super().data_hash_values,
gen(self.time_column.column),
self.time_column.format,
*(gen(k) for k in self.primary_key),
]
def time_column(self) -> TimeColumn:
return self._time_column

def to_expression(
self, expressions: t.Optional[t.List[exp.Expression]] = None, **kwargs: t.Any
) -> d.ModelKind:
return super().to_expression(
expressions=[
*(expressions or []),
self.time_column.to_property(kwargs.get("dialect") or ""),
_property(name="primary_key", value=self.primary_key),
]
)
@property
def primary_key(self) -> t.List[exp.Expression]:
return self._primary_key


class NonIdempotentIncrementalByTimeRangeMaterialization(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,10 @@ def test_basic_usage(project: Project):
name {project.test_schema}.model,
kind CUSTOM (
materialization 'non_idempotent_incremental_by_time_range',
time_column event_timestamp,
primary_key (event_id, event_source),
materialization_properties (
time_column = event_timestamp,
primary_key = (event_id, event_source)
),
batch_size 1,
batch_concurrency 1
),
Expand Down Expand Up @@ -137,8 +139,10 @@ def test_partial_restatement(project: Project):
name {project.test_schema}.model,
kind CUSTOM (
materialization 'non_idempotent_incremental_by_time_range',
time_column event_timestamp,
primary_key (event_id, event_source),
materialization_properties (
time_column = event_timestamp,
primary_key = (event_id, event_source),
),
batch_size 1,
batch_concurrency 1
),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
from tests.materializations.conftest import to_sql_calls, MockedEngineAdapterMaker
from sqlmesh.core.engine_adapter.trino import TrinoEngineAdapter
from sqlmesh.utils.errors import ConfigError
from pydantic import ValidationError
from sqlmesh.utils.date import to_timestamp, now
from sqlmesh.core.macros import RuntimeStage

Expand All @@ -21,16 +20,17 @@
def make_model() -> ModelMaker:
def _make(properties: t.Union[str, t.List[str]], dialect: t.Optional[str] = None) -> Model:
if isinstance(properties, list):
properties = ",\n".join(properties) + ","
properties = ",\n".join(properties)

properties_sql = f"materialization_properties ({properties})," if properties else ""
dialect_sql = f"dialect {dialect}," if dialect else ""

expressions = d.parse(f"""
MODEL (
name test.model,
kind CUSTOM (
materialization 'non_idempotent_incremental_by_time_range',
{properties}
{properties_sql}
batch_size 1,
batch_concurrency 1
),
Expand All @@ -48,7 +48,7 @@ def _make(properties: t.Union[str, t.List[str]], dialect: t.Optional[str] = None

def test_kind(make_model: ModelMaker):
# basic usage
model = make_model(["time_column ds", "primary_key (id, ds)"])
model = make_model(["time_column = ds", "primary_key = (id, ds)"])
assert isinstance(model.kind, NonIdempotentIncrementalByTimeRangeKind)

assert model.kind.time_column.column == exp.to_column("ds", quoted=True)
Expand All @@ -58,22 +58,22 @@ def test_kind(make_model: ModelMaker):
]

# required fields
with pytest.raises(ValidationError, match=r"time_column\n.*Field required"):
with pytest.raises(ConfigError, match=r"Invalid time_column"):
model = make_model([])

with pytest.raises(ValidationError, match=r"primary_key\n.*Field required"):
model = make_model(["time_column ds"])
with pytest.raises(ConfigError, match=r"`primary_key` must be specified"):
model = make_model(["time_column = ds"])

with pytest.raises(ConfigError, match=r"`primary_key` must be specified"):
model = make_model(["time_column ds", "primary_key ()"])
model = make_model(["time_column = ds", "primary_key = ()"])

# primary_key cant be the same as time_column
with pytest.raises(ConfigError, match=r"primary_key` cannot be just the time_column"):
model = make_model(["time_column ds", "primary_key ds"])
model = make_model(["time_column = ds", "primary_key = ds"])


def test_insert(make_model: ModelMaker, make_mocked_engine_adapter: MockedEngineAdapterMaker):
model: Model = make_model(["time_column ds", "primary_key name"], dialect="trino")
model: Model = make_model(["time_column = ds", "primary_key = name"], dialect="trino")
adapter = make_mocked_engine_adapter(TrinoEngineAdapter)
strategy = NonIdempotentIncrementalByTimeRangeMaterialization(adapter)

Expand Down Expand Up @@ -117,7 +117,7 @@ def test_insert(make_model: ModelMaker, make_mocked_engine_adapter: MockedEngine


def test_append(make_model: ModelMaker, make_mocked_engine_adapter: MockedEngineAdapterMaker):
model: Model = make_model(["time_column ds", "primary_key name"], dialect="trino")
model: Model = make_model(["time_column = ds", "primary_key = name"], dialect="trino")
adapter = make_mocked_engine_adapter(TrinoEngineAdapter)
strategy = NonIdempotentIncrementalByTimeRangeMaterialization(adapter)

Expand Down
Loading