Skip to content

Commit 101e73b

Browse files
authored
fix: scd type 2 support table properties (#5317)
1 parent b8bc224 commit 101e73b

File tree

3 files changed

+99
-45
lines changed

3 files changed

+99
-45
lines changed

sqlmesh/core/snapshot/evaluator.py

Lines changed: 15 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -2241,6 +2241,11 @@ def insert(
22412241
column_descriptions=model.column_descriptions,
22422242
truncate=is_first_insert,
22432243
source_columns=source_columns,
2244+
storage_format=model.storage_format,
2245+
partitioned_by=model.partitioned_by,
2246+
partition_interval_unit=model.partition_interval_unit,
2247+
clustered_by=model.clustered_by,
2248+
table_properties=kwargs.get("physical_properties", model.physical_properties),
22442249
)
22452250
elif isinstance(model.kind, SCDType2ByColumnKind):
22462251
self.adapter.scd_type_2_by_column(
@@ -2259,6 +2264,11 @@ def insert(
22592264
column_descriptions=model.column_descriptions,
22602265
truncate=is_first_insert,
22612266
source_columns=source_columns,
2267+
storage_format=model.storage_format,
2268+
partitioned_by=model.partitioned_by,
2269+
partition_interval_unit=model.partition_interval_unit,
2270+
clustered_by=model.clustered_by,
2271+
table_properties=kwargs.get("physical_properties", model.physical_properties),
22622272
)
22632273
else:
22642274
raise SQLMeshError(
@@ -2273,51 +2283,14 @@ def append(
22732283
render_kwargs: t.Dict[str, t.Any],
22742284
**kwargs: t.Any,
22752285
) -> None:
2276-
# Source columns from the underlying table to prevent unintentional table schema changes during restatement of incremental models.
2277-
columns_to_types, source_columns = self._get_target_and_source_columns(
2278-
model,
2286+
return self.insert(
22792287
table_name,
2288+
query_or_df,
2289+
model,
2290+
is_first_insert=False,
22802291
render_kwargs=render_kwargs,
2281-
force_get_columns_from_target=True,
2292+
**kwargs,
22822293
)
2283-
if isinstance(model.kind, SCDType2ByTimeKind):
2284-
self.adapter.scd_type_2_by_time(
2285-
target_table=table_name,
2286-
source_table=query_or_df,
2287-
unique_key=model.unique_key,
2288-
valid_from_col=model.kind.valid_from_name,
2289-
valid_to_col=model.kind.valid_to_name,
2290-
updated_at_col=model.kind.updated_at_name,
2291-
invalidate_hard_deletes=model.kind.invalidate_hard_deletes,
2292-
updated_at_as_valid_from=model.kind.updated_at_as_valid_from,
2293-
target_columns_to_types=columns_to_types,
2294-
table_format=model.table_format,
2295-
table_description=model.description,
2296-
column_descriptions=model.column_descriptions,
2297-
source_columns=source_columns,
2298-
**kwargs,
2299-
)
2300-
elif isinstance(model.kind, SCDType2ByColumnKind):
2301-
self.adapter.scd_type_2_by_column(
2302-
target_table=table_name,
2303-
source_table=query_or_df,
2304-
unique_key=model.unique_key,
2305-
valid_from_col=model.kind.valid_from_name,
2306-
valid_to_col=model.kind.valid_to_name,
2307-
check_columns=model.kind.columns,
2308-
target_columns_to_types=columns_to_types,
2309-
table_format=model.table_format,
2310-
invalidate_hard_deletes=model.kind.invalidate_hard_deletes,
2311-
execution_time_as_valid_from=model.kind.execution_time_as_valid_from,
2312-
table_description=model.description,
2313-
column_descriptions=model.column_descriptions,
2314-
source_columns=source_columns,
2315-
**kwargs,
2316-
)
2317-
else:
2318-
raise SQLMeshError(
2319-
f"Unexpected SCD Type 2 kind: {model.kind}. This is not expected and please report this as a bug."
2320-
)
23212294

23222295

23232296
class ViewStrategy(PromotableStrategy):

tests/core/engine_adapter/test_bigquery.py

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
# type: ignore
22
import typing as t
3+
from datetime import datetime
34

45
import pandas as pd # noqa: TID253
56
import pytest
@@ -1173,3 +1174,36 @@ def test_drop_cascade(adapter: BigQueryEngineAdapter):
11731174
"DROP SCHEMA IF EXISTS `foo` CASCADE",
11741175
"DROP SCHEMA IF EXISTS `foo`",
11751176
]
1177+
1178+
1179+
def test_scd_type_2_by_partitioning(adapter: BigQueryEngineAdapter):
1180+
adapter.scd_type_2_by_time(
1181+
target_table="target",
1182+
source_table=t.cast(
1183+
exp.Select, parse_one("SELECT id, name, price, test_UPDATED_at FROM source")
1184+
),
1185+
unique_key=[
1186+
exp.to_column("id"),
1187+
],
1188+
updated_at_col=exp.column("test_UPDATED_at", quoted=True),
1189+
valid_from_col=exp.to_column("valid_from", quoted=True),
1190+
valid_to_col=exp.to_column("valid_to", quoted=True),
1191+
target_columns_to_types={
1192+
"id": exp.DataType.build("INT"),
1193+
"name": exp.DataType.build("VARCHAR"),
1194+
"price": exp.DataType.build("DOUBLE"),
1195+
"test_UPDATED_at": exp.DataType.build("TIMESTAMP"),
1196+
"valid_from": exp.DataType.build("TIMESTAMP"),
1197+
"valid_to": exp.DataType.build("TIMESTAMP"),
1198+
},
1199+
execution_time=datetime(2020, 1, 1, 0, 0, 0),
1200+
partitioned_by=[parse_one("TIMESTAMP_TRUNC(valid_from, DAY)")],
1201+
)
1202+
1203+
calls = _to_sql_calls(adapter)
1204+
1205+
# Initial call to create the table and then another to replace since it is self-referencing
1206+
assert len(calls) == 2
1207+
# Both calls should contain the partition logic (the scd logic is already covered by other tests)
1208+
assert "PARTITION BY TIMESTAMP_TRUNC(`valid_from`, DAY)" in calls[0]
1209+
assert "PARTITION BY TIMESTAMP_TRUNC(`valid_from`, DAY)" in calls[1]

tests/core/test_snapshot_evaluator.py

Lines changed: 50 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
from __future__ import annotations
22
import typing as t
3+
34
from typing_extensions import Self
45
from unittest.mock import call, patch, Mock
56
import re
@@ -2062,7 +2063,7 @@ def test_create_scd_type_2_by_time(adapter_mock, make_snapshot):
20622063
)
20632064

20642065

2065-
def test_create_ctas_scd_type_2_by_time(adapter_mock, make_snapshot):
2066+
def test_create_ctas_scd_type_2_by_time(adapter_mock, make_snapshot, mocker):
20662067
evaluator = SnapshotEvaluator(adapter_mock)
20672068
model = load_sql_based_model(
20682069
parse( # type: ignore
@@ -2073,7 +2074,8 @@ def test_create_ctas_scd_type_2_by_time(adapter_mock, make_snapshot):
20732074
unique_key id,
20742075
time_data_type TIMESTAMPTZ,
20752076
invalidate_hard_deletes false
2076-
)
2077+
),
2078+
partitioned_by cola
20772079
);
20782080
20792081
SELECT * FROM tbl;
@@ -2086,6 +2088,7 @@ def test_create_ctas_scd_type_2_by_time(adapter_mock, make_snapshot):
20862088

20872089
evaluator.create([snapshot], {}, deployability_index=DeployabilityIndex.none_deployable())
20882090

2091+
source_query = parse_one('SELECT * FROM "tbl" AS "tbl"')
20892092
query = parse_one(
20902093
"""SELECT *, CAST(NULL AS TIMESTAMPTZ) AS valid_from, CAST(NULL AS TIMESTAMPTZ) AS valid_to FROM "tbl" AS "tbl" WHERE FALSE LIMIT 0"""
20912094
)
@@ -2094,7 +2097,9 @@ def test_create_ctas_scd_type_2_by_time(adapter_mock, make_snapshot):
20942097
common_kwargs = dict(
20952098
table_format=None,
20962099
storage_format=None,
2097-
partitioned_by=[],
2100+
partitioned_by=[
2101+
exp.to_column("cola", quoted=True),
2102+
],
20982103
partition_interval_unit=None,
20992104
clustered_by=[],
21002105
table_properties={},
@@ -2113,6 +2118,38 @@ def test_create_ctas_scd_type_2_by_time(adapter_mock, make_snapshot):
21132118
]
21142119
)
21152120

2121+
adapter_mock.reset_mock()
2122+
2123+
evaluator.evaluate(
2124+
snapshot,
2125+
start="2020-01-01",
2126+
end="2020-01-02",
2127+
execution_time="2020-01-02",
2128+
snapshots={},
2129+
deployability_index=DeployabilityIndex.none_deployable(),
2130+
)
2131+
2132+
adapter_mock.scd_type_2_by_time.assert_has_calls(
2133+
[
2134+
call(
2135+
column_descriptions={},
2136+
execution_time="2020-01-02",
2137+
invalidate_hard_deletes=False,
2138+
source_columns=None,
2139+
source_table=source_query,
2140+
target_columns_to_types=mocker.ANY,
2141+
target_table=snapshot.table_name(is_deployable=False),
2142+
truncate=True,
2143+
unique_key=[exp.to_column("id", quoted=True)],
2144+
updated_at_as_valid_from=False,
2145+
updated_at_col=exp.column("updated_at", quoted=True),
2146+
valid_from_col=exp.column("valid_from", quoted=True),
2147+
valid_to_col=exp.column("valid_to", quoted=True),
2148+
**common_kwargs,
2149+
),
2150+
]
2151+
)
2152+
21162153

21172154
@pytest.mark.parametrize(
21182155
"intervals,truncate",
@@ -2178,6 +2215,11 @@ def test_insert_into_scd_type_2_by_time(
21782215
updated_at_as_valid_from=False,
21792216
truncate=truncate,
21802217
source_columns=None,
2218+
clustered_by=[],
2219+
partition_interval_unit=None,
2220+
partitioned_by=[],
2221+
storage_format=None,
2222+
table_properties={},
21812223
)
21822224
adapter_mock.columns.assert_called_once_with(snapshot.table_name())
21832225

@@ -2347,6 +2389,11 @@ def test_insert_into_scd_type_2_by_column(
23472389
column_descriptions={},
23482390
truncate=truncate,
23492391
source_columns=None,
2392+
clustered_by=[],
2393+
partition_interval_unit=None,
2394+
partitioned_by=[],
2395+
storage_format=None,
2396+
table_properties={},
23502397
)
23512398
adapter_mock.columns.assert_called_once_with(snapshot.table_name())
23522399

0 commit comments

Comments
 (0)