diff --git a/sqlmesh_utils/materializations/non_idempotent_incremental_by_time_range.py b/sqlmesh_utils/materializations/non_idempotent_incremental_by_time_range.py index 66955b0..515ea68 100644 --- a/sqlmesh_utils/materializations/non_idempotent_incremental_by_time_range.py +++ b/sqlmesh_utils/materializations/non_idempotent_incremental_by_time_range.py @@ -92,6 +92,12 @@ def insert( self.adapter.ctas( table_name=table_name, query_or_df=model.ctas_query(**render_kwargs), + table_format=model.table_format, + storage_format=model.storage_format, + partitioned_by=model.partitioned_by, + partition_interval_unit=model.partition_interval_unit, + clustered_by=model.clustered_by, + table_properties=kwargs.get("physical_properties", model.physical_properties), ) columns_to_types, source_columns = self._get_target_and_source_columns( diff --git a/tests/materializations/integration/test_integration_non_idempotent_incremental_by_time_range.py b/tests/materializations/integration/test_integration_non_idempotent_incremental_by_time_range.py index 7a73f61..7843155 100644 --- a/tests/materializations/integration/test_integration_non_idempotent_incremental_by_time_range.py +++ b/tests/materializations/integration/test_integration_non_idempotent_incremental_by_time_range.py @@ -1,7 +1,12 @@ from sqlglot import exp import sqlmesh.core.dialect as d -from sqlmesh.utils.date import to_datetime, to_ds +from sqlmesh.utils.date import to_datetime, to_ds, to_timestamp, now +from sqlmesh.core.macros import RuntimeStage +from unittest.mock import patch +from sqlmesh_utils.materializations.non_idempotent_incremental_by_time_range import ( + NonIdempotentIncrementalByTimeRangeMaterialization, +) from tests.materializations.integration.conftest import Project @@ -232,3 +237,124 @@ def test_partial_restatement(project: Project): ) assert not any(["CHANGED" in r[2] for r in remaining_records]) + + +def test_physical_properties_integration(project: Project): + upstream_table_name = f"{project.test_schema}.library_data" + + upstream_data = [ + (101, "fiction", "978-0-123456-78-9", to_datetime("2024-01-01 09:15:00")), + (102, "science", "978-0-234567-89-0", to_datetime("2024-01-02 14:30:00")), + (103, "history", "978-0-345678-90-1", to_datetime("2024-01-03 11:45:00")), + ] + + upstream_table_columns = { + "book_id": exp.DataType.build("int"), + "category": exp.DataType.build("varchar"), + "isbn": exp.DataType.build("varchar"), + "borrowed_at": exp.DataType.build("timestamp"), + } + + project.engine_adapter.create_table( + upstream_table_name, target_columns_to_types=upstream_table_columns + ) + project.engine_adapter.insert_append( + upstream_table_name, + query_or_df=next( + d.select_from_values(upstream_data, columns_to_types=upstream_table_columns) + ), + ) + + project.write_model( + "test_library_with_properties.sql", + definition=f""" + MODEL ( + name {project.test_schema}.library_borrowings, + kind CUSTOM ( + materialization 'non_idempotent_incremental_by_time_range', + materialization_properties ( + time_column = borrowed_at, + primary_key = (book_id, category) + ), + batch_size 1, + batch_concurrency 1 + ), + start '2024-01-01', + end '2024-01-03', + physical_properties ( + extra_props = ( + enable_compression = true + ) + ), + partitioned_by [category, borrowed_at] + ); + + SELECT book_id, category, isbn, borrowed_at + FROM {upstream_table_name} WHERE borrowed_at BETWEEN @start_dt AND @end_dt; + """, + ) + + ctx = project.context + assert len(ctx.models) > 0 + + model = ctx.get_model(f"{project.test_schema}.library_borrowings") + assert model is not None + assert model.partitioned_by is not None + assert len(model.partitioned_by) == 2 + assert model.physical_properties + assert "extra_props" in model.physical_properties + + strategy = NonIdempotentIncrementalByTimeRangeMaterialization(project.engine_adapter) + + # tracks CTAS calls to verify table_properties are passed + ctas_calls = [] + + def mock_ctas(**kwargs): + ctas_calls.append(kwargs) + + mock_columns = { + "book_id": exp.DataType.build("int"), + "category": exp.DataType.build("varchar"), + "isbn": exp.DataType.build("varchar"), + "borrowed_at": exp.DataType.build("timestamp"), + } + + def mock_merge(**kwargs): + pass + + with patch.object(project.engine_adapter, "ctas", side_effect=mock_ctas), patch.object( + project.engine_adapter, "table_exists", return_value=False + ), patch.object(project.engine_adapter, "columns", return_value=mock_columns), patch.object( + project.engine_adapter, "merge", side_effect=mock_merge + ): + start = to_timestamp("2024-01-01") + end = to_timestamp("2024-01-03") + + strategy.insert( + f"{project.test_schema}.library_borrowings", + query_or_df=model.render_query( + start=start, end=end, execution_time=now(), runtime_stage=RuntimeStage.EVALUATING + ), + model=model, + is_first_insert=True, + start=start, + end=end, + render_kwargs={}, + ) + + # Assert that CTAS was called with table_properties and partitioned_by + assert len(ctas_calls) == 1 + ctas_kwargs = ctas_calls[0] + + assert "table_properties" in ctas_kwargs + table_props = ctas_kwargs["table_properties"] + + assert table_props is not None + assert "extra_props" in table_props + + assert "partitioned_by" in ctas_kwargs + partitioned_by = ctas_kwargs["partitioned_by"] + assert partitioned_by == [ + exp.Column(this=exp.Identifier(this="category", quoted=True)), + exp.Column(this=exp.Identifier(this="borrowed_at", quoted=True)), + ]