Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,12 @@ def insert(
self.adapter.ctas(
table_name=table_name,
query_or_df=model.ctas_query(**render_kwargs),
table_format=model.table_format,
storage_format=model.storage_format,
partitioned_by=model.partitioned_by,
partition_interval_unit=model.partition_interval_unit,
clustered_by=model.clustered_by,
table_properties=kwargs.get("physical_properties", model.physical_properties),
)

columns_to_types, source_columns = self._get_target_and_source_columns(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,12 @@
from sqlglot import exp
import sqlmesh.core.dialect as d
from sqlmesh.utils.date import to_datetime, to_ds
from sqlmesh.utils.date import to_datetime, to_ds, to_timestamp, now
from sqlmesh.core.macros import RuntimeStage
from unittest.mock import patch

from sqlmesh_utils.materializations.non_idempotent_incremental_by_time_range import (
NonIdempotentIncrementalByTimeRangeMaterialization,
)
from tests.materializations.integration.conftest import Project


Expand Down Expand Up @@ -232,3 +237,124 @@ def test_partial_restatement(project: Project):
)

assert not any(["CHANGED" in r[2] for r in remaining_records])


def test_physical_properties_integration(project: Project):
upstream_table_name = f"{project.test_schema}.library_data"

upstream_data = [
(101, "fiction", "978-0-123456-78-9", to_datetime("2024-01-01 09:15:00")),
(102, "science", "978-0-234567-89-0", to_datetime("2024-01-02 14:30:00")),
(103, "history", "978-0-345678-90-1", to_datetime("2024-01-03 11:45:00")),
]

upstream_table_columns = {
"book_id": exp.DataType.build("int"),
"category": exp.DataType.build("varchar"),
"isbn": exp.DataType.build("varchar"),
"borrowed_at": exp.DataType.build("timestamp"),
}

project.engine_adapter.create_table(
upstream_table_name, target_columns_to_types=upstream_table_columns
)
project.engine_adapter.insert_append(
upstream_table_name,
query_or_df=next(
d.select_from_values(upstream_data, columns_to_types=upstream_table_columns)
),
)

project.write_model(
"test_library_with_properties.sql",
definition=f"""
MODEL (
name {project.test_schema}.library_borrowings,
kind CUSTOM (
materialization 'non_idempotent_incremental_by_time_range',
materialization_properties (
time_column = borrowed_at,
primary_key = (book_id, category)
),
batch_size 1,
batch_concurrency 1
),
start '2024-01-01',
end '2024-01-03',
physical_properties (
extra_props = (
enable_compression = true
)
),
partitioned_by [category, borrowed_at]
);

SELECT book_id, category, isbn, borrowed_at
FROM {upstream_table_name} WHERE borrowed_at BETWEEN @start_dt AND @end_dt;
""",
)

ctx = project.context
assert len(ctx.models) > 0

model = ctx.get_model(f"{project.test_schema}.library_borrowings")
assert model is not None
assert model.partitioned_by is not None
assert len(model.partitioned_by) == 2
assert model.physical_properties
assert "extra_props" in model.physical_properties

strategy = NonIdempotentIncrementalByTimeRangeMaterialization(project.engine_adapter)

# tracks CTAS calls to verify table_properties are passed
ctas_calls = []

def mock_ctas(**kwargs):
ctas_calls.append(kwargs)

mock_columns = {
"book_id": exp.DataType.build("int"),
"category": exp.DataType.build("varchar"),
"isbn": exp.DataType.build("varchar"),
"borrowed_at": exp.DataType.build("timestamp"),
}

def mock_merge(**kwargs):
pass

with patch.object(project.engine_adapter, "ctas", side_effect=mock_ctas), patch.object(
project.engine_adapter, "table_exists", return_value=False
), patch.object(project.engine_adapter, "columns", return_value=mock_columns), patch.object(
project.engine_adapter, "merge", side_effect=mock_merge
):
start = to_timestamp("2024-01-01")
end = to_timestamp("2024-01-03")

strategy.insert(
f"{project.test_schema}.library_borrowings",
query_or_df=model.render_query(
start=start, end=end, execution_time=now(), runtime_stage=RuntimeStage.EVALUATING
),
model=model,
is_first_insert=True,
start=start,
end=end,
render_kwargs={},
)

# Assert that CTAS was called with table_properties and partitioned_by
assert len(ctas_calls) == 1
ctas_kwargs = ctas_calls[0]

assert "table_properties" in ctas_kwargs
table_props = ctas_kwargs["table_properties"]

assert table_props is not None
assert "extra_props" in table_props

assert "partitioned_by" in ctas_kwargs
partitioned_by = ctas_kwargs["partitioned_by"]
assert partitioned_by == [
exp.Column(this=exp.Identifier(this="category", quoted=True)),
exp.Column(this=exp.Identifier(this="borrowed_at", quoted=True)),
]