Skip to content

Commit c8cb67e

Browse files
committed
fix!: support target_columns_to_types
1 parent a3222d8 commit c8cb67e

File tree

3 files changed

+22
-10
lines changed

3 files changed

+22
-10
lines changed

sqlmesh_utils/materializations/non_idempotent_incremental_by_time_range.py

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@
1111
from sqlmesh.utils.date import TimeLike
1212
from sqlmesh.core.engine_adapter.base import MERGE_SOURCE_ALIAS, MERGE_TARGET_ALIAS
1313
from sqlmesh import CustomKind
14-
from sqlmesh.utils import columns_to_types_all_known
1514

1615
if t.TYPE_CHECKING:
1716
from sqlmesh.core.engine_adapter._typing import QueryOrDF
@@ -76,6 +75,7 @@ def insert(
7675
query_or_df: QueryOrDF,
7776
model: Model,
7877
is_first_insert: bool,
78+
render_kwargs: t.Dict[str, t.Any],
7979
**kwargs: t.Any,
8080
) -> None:
8181
# sanity check
@@ -88,9 +88,15 @@ def insert(
8888
start: TimeLike = kwargs["start"]
8989
end: TimeLike = kwargs["end"]
9090

91-
columns_to_types = model.columns_to_types
92-
if not columns_to_types or not columns_to_types_all_known(columns_to_types):
93-
columns_to_types = self.adapter.columns(table_name)
91+
if is_first_insert and not self.adapter.table_exists(table_name):
92+
self.adapter.ctas(
93+
table_name=table_name,
94+
query_or_df=model.ctas_query(**render_kwargs),
95+
)
96+
97+
columns_to_types, source_columns = self._get_target_and_source_columns(
98+
model, table_name, render_kwargs=render_kwargs
99+
)
94100

95101
low, high = [
96102
model.convert_to_time_column(dt, columns_to_types)
@@ -116,22 +122,25 @@ def _inject_alias(node: exp.Expression, alias: str) -> exp.Expression:
116122
self.adapter.merge(
117123
target_table=table_name,
118124
source_table=query_or_df,
119-
columns_to_types=columns_to_types,
125+
target_columns_to_types=columns_to_types,
120126
unique_key=model.kind.primary_key,
121127
merge_filter=exp.and_(*betweens),
128+
source_columns=source_columns,
122129
)
123130

124131
def append(
125132
self,
126133
table_name: str,
127134
query_or_df: QueryOrDF,
128135
model: Model,
136+
render_kwargs: t.Dict[str, t.Any],
129137
**kwargs: t.Any,
130138
) -> None:
131139
self.insert(
132140
table_name=table_name,
133141
query_or_df=query_or_df,
134142
model=model,
135143
is_first_insert=False,
144+
render_kwargs=render_kwargs,
136145
**kwargs,
137146
)

tests/materializations/integration/test_integration_non_idempotent_incremental_by_time_range.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ def test_basic_usage(project: Project):
3030
}
3131

3232
project.engine_adapter.create_table(
33-
upstream_table_name, columns_to_types=upstream_table_columns
33+
upstream_table_name, target_columns_to_types=upstream_table_columns
3434
)
3535
project.engine_adapter.insert_append(
3636
upstream_table_name,
@@ -122,7 +122,7 @@ def test_partial_restatement(project: Project):
122122
}
123123

124124
project.engine_adapter.create_table(
125-
upstream_table_name, columns_to_types=upstream_table_columns
125+
upstream_table_name, target_columns_to_types=upstream_table_columns
126126
)
127127
project.engine_adapter.insert_append(
128128
upstream_table_name,
@@ -174,7 +174,7 @@ def test_partial_restatement(project: Project):
174174
# change upstream data
175175
project.engine_adapter.drop_table(upstream_table_name)
176176
project.engine_adapter.create_table(
177-
upstream_table_name, columns_to_types=upstream_table_columns
177+
upstream_table_name, target_columns_to_types=upstream_table_columns
178178
)
179179
project.engine_adapter.insert_append(
180180
upstream_table_name,

tests/materializations/test_non_idempotent_incremental_by_time_range.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -92,9 +92,11 @@ def test_insert(make_model: ModelMaker, make_mocked_engine_adapter: MockedEngine
9292
is_first_insert=True,
9393
start=start,
9494
end=end,
95+
render_kwargs={},
9596
)
9697

9798
assert to_sql_calls(adapter) == [
99+
'DESCRIBE "test"."snapshot_table"',
98100
parse_one(
99101
"""
100102
MERGE INTO "test"."snapshot_table" AS "__merge_target__"
@@ -115,7 +117,7 @@ def test_insert(make_model: ModelMaker, make_mocked_engine_adapter: MockedEngine
115117
WHEN NOT MATCHED THEN INSERT ("name", "ds") VALUES ("__MERGE_SOURCE__"."name", "__MERGE_SOURCE__"."ds")
116118
""",
117119
dialect=adapter.dialect,
118-
).sql(dialect=adapter.dialect)
120+
).sql(dialect=adapter.dialect),
119121
]
120122

121123

@@ -135,6 +137,7 @@ def test_append(make_model: ModelMaker, make_mocked_engine_adapter: MockedEngine
135137
model=model,
136138
start=start,
137139
end=end,
140+
render_kwargs={},
138141
)
139142

140143
assert to_sql_calls(adapter) == [
@@ -158,7 +161,7 @@ def test_append(make_model: ModelMaker, make_mocked_engine_adapter: MockedEngine
158161
WHEN NOT MATCHED THEN INSERT ("name", "ds") VALUES ("__MERGE_SOURCE__"."name", "__MERGE_SOURCE__"."ds")
159162
""",
160163
dialect=adapter.dialect,
161-
).sql(dialect=adapter.dialect)
164+
).sql(dialect=adapter.dialect),
162165
]
163166

164167

0 commit comments

Comments
 (0)