Skip to content

Commit afe6b3a

Browse files
Fix: Pass properties in ctas call of non idempotent incremental (#10)
1 parent 0616d7a commit afe6b3a

File tree

2 files changed

+133
-1
lines changed

2 files changed

+133
-1
lines changed

sqlmesh_utils/materializations/non_idempotent_incremental_by_time_range.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,12 @@ def insert(
9292
self.adapter.ctas(
9393
table_name=table_name,
9494
query_or_df=model.ctas_query(**render_kwargs),
95+
table_format=model.table_format,
96+
storage_format=model.storage_format,
97+
partitioned_by=model.partitioned_by,
98+
partition_interval_unit=model.partition_interval_unit,
99+
clustered_by=model.clustered_by,
100+
table_properties=kwargs.get("physical_properties", model.physical_properties),
95101
)
96102

97103
columns_to_types, source_columns = self._get_target_and_source_columns(

tests/materializations/integration/test_integration_non_idempotent_incremental_by_time_range.py

Lines changed: 127 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,12 @@
11
from sqlglot import exp
22
import sqlmesh.core.dialect as d
3-
from sqlmesh.utils.date import to_datetime, to_ds
3+
from sqlmesh.utils.date import to_datetime, to_ds, to_timestamp, now
4+
from sqlmesh.core.macros import RuntimeStage
5+
from unittest.mock import patch
46

7+
from sqlmesh_utils.materializations.non_idempotent_incremental_by_time_range import (
8+
NonIdempotentIncrementalByTimeRangeMaterialization,
9+
)
510
from tests.materializations.integration.conftest import Project
611

712

@@ -232,3 +237,124 @@ def test_partial_restatement(project: Project):
232237
)
233238

234239
assert not any(["CHANGED" in r[2] for r in remaining_records])
240+
241+
242+
def test_physical_properties_integration(project: Project):
243+
upstream_table_name = f"{project.test_schema}.library_data"
244+
245+
upstream_data = [
246+
(101, "fiction", "978-0-123456-78-9", to_datetime("2024-01-01 09:15:00")),
247+
(102, "science", "978-0-234567-89-0", to_datetime("2024-01-02 14:30:00")),
248+
(103, "history", "978-0-345678-90-1", to_datetime("2024-01-03 11:45:00")),
249+
]
250+
251+
upstream_table_columns = {
252+
"book_id": exp.DataType.build("int"),
253+
"category": exp.DataType.build("varchar"),
254+
"isbn": exp.DataType.build("varchar"),
255+
"borrowed_at": exp.DataType.build("timestamp"),
256+
}
257+
258+
project.engine_adapter.create_table(
259+
upstream_table_name, target_columns_to_types=upstream_table_columns
260+
)
261+
project.engine_adapter.insert_append(
262+
upstream_table_name,
263+
query_or_df=next(
264+
d.select_from_values(upstream_data, columns_to_types=upstream_table_columns)
265+
),
266+
)
267+
268+
project.write_model(
269+
"test_library_with_properties.sql",
270+
definition=f"""
271+
MODEL (
272+
name {project.test_schema}.library_borrowings,
273+
kind CUSTOM (
274+
materialization 'non_idempotent_incremental_by_time_range',
275+
materialization_properties (
276+
time_column = borrowed_at,
277+
primary_key = (book_id, category)
278+
),
279+
batch_size 1,
280+
batch_concurrency 1
281+
),
282+
start '2024-01-01',
283+
end '2024-01-03',
284+
physical_properties (
285+
extra_props = (
286+
enable_compression = true
287+
)
288+
),
289+
partitioned_by [category, borrowed_at]
290+
);
291+
292+
SELECT book_id, category, isbn, borrowed_at
293+
FROM {upstream_table_name} WHERE borrowed_at BETWEEN @start_dt AND @end_dt;
294+
""",
295+
)
296+
297+
ctx = project.context
298+
assert len(ctx.models) > 0
299+
300+
model = ctx.get_model(f"{project.test_schema}.library_borrowings")
301+
assert model is not None
302+
assert model.partitioned_by is not None
303+
assert len(model.partitioned_by) == 2
304+
assert model.physical_properties
305+
assert "extra_props" in model.physical_properties
306+
307+
strategy = NonIdempotentIncrementalByTimeRangeMaterialization(project.engine_adapter)
308+
309+
# tracks CTAS calls to verify table_properties are passed
310+
ctas_calls = []
311+
312+
def mock_ctas(**kwargs):
313+
ctas_calls.append(kwargs)
314+
315+
mock_columns = {
316+
"book_id": exp.DataType.build("int"),
317+
"category": exp.DataType.build("varchar"),
318+
"isbn": exp.DataType.build("varchar"),
319+
"borrowed_at": exp.DataType.build("timestamp"),
320+
}
321+
322+
def mock_merge(**kwargs):
323+
pass
324+
325+
with patch.object(project.engine_adapter, "ctas", side_effect=mock_ctas), patch.object(
326+
project.engine_adapter, "table_exists", return_value=False
327+
), patch.object(project.engine_adapter, "columns", return_value=mock_columns), patch.object(
328+
project.engine_adapter, "merge", side_effect=mock_merge
329+
):
330+
start = to_timestamp("2024-01-01")
331+
end = to_timestamp("2024-01-03")
332+
333+
strategy.insert(
334+
f"{project.test_schema}.library_borrowings",
335+
query_or_df=model.render_query(
336+
start=start, end=end, execution_time=now(), runtime_stage=RuntimeStage.EVALUATING
337+
),
338+
model=model,
339+
is_first_insert=True,
340+
start=start,
341+
end=end,
342+
render_kwargs={},
343+
)
344+
345+
# Assert that CTAS was called with table_properties and partitioned_by
346+
assert len(ctas_calls) == 1
347+
ctas_kwargs = ctas_calls[0]
348+
349+
assert "table_properties" in ctas_kwargs
350+
table_props = ctas_kwargs["table_properties"]
351+
352+
assert table_props is not None
353+
assert "extra_props" in table_props
354+
355+
assert "partitioned_by" in ctas_kwargs
356+
partitioned_by = ctas_kwargs["partitioned_by"]
357+
assert partitioned_by == [
358+
exp.Column(this=exp.Identifier(this="category", quoted=True)),
359+
exp.Column(this=exp.Identifier(this="borrowed_at", quoted=True)),
360+
]

0 commit comments

Comments
 (0)