Skip to content

Commit 4c5a6d3

Browse files
Fix: Respect mssql_merge_exists flag and use merge as a strategy (#5544)
1 parent 2bff831 commit 4c5a6d3

File tree

3 files changed

+98
-8
lines changed

3 files changed

+98
-8
lines changed

sqlmesh/core/engine_adapter/base.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -551,11 +551,13 @@ def replace_query(
551551
target_table,
552552
source_queries,
553553
target_columns_to_types,
554+
**kwargs,
554555
)
555556
return self._insert_overwrite_by_condition(
556557
target_table,
557558
source_queries,
558559
target_columns_to_types,
560+
**kwargs,
559561
)
560562

561563
def create_index(
@@ -1614,7 +1616,7 @@ def _insert_overwrite_by_time_partition(
16141616
**kwargs: t.Any,
16151617
) -> None:
16161618
return self._insert_overwrite_by_condition(
1617-
table_name, source_queries, target_columns_to_types, where
1619+
table_name, source_queries, target_columns_to_types, where, **kwargs
16181620
)
16191621

16201622
def _values_to_sql(

sqlmesh/core/engine_adapter/mssql.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -423,7 +423,9 @@ def _insert_overwrite_by_condition(
423423
insert_overwrite_strategy_override: t.Optional[InsertOverwriteStrategy] = None,
424424
**kwargs: t.Any,
425425
) -> None:
426-
if not where or where == exp.true():
426+
# note that this is passed as table_properties here rather than physical_properties
427+
use_merge_strategy = kwargs.get("table_properties", {}).get("mssql_merge_exists")
428+
if (not where or where == exp.true()) and not use_merge_strategy:
427429
# this is a full table replacement, call the base strategy to do DELETE+INSERT
428430
# which will result in TRUNCATE+INSERT due to how we have overridden self.delete_from()
429431
return EngineAdapter._insert_overwrite_by_condition(
@@ -436,7 +438,7 @@ def _insert_overwrite_by_condition(
436438
**kwargs,
437439
)
438440

439-
# For actual conditional overwrites, use MERGE from InsertOverwriteWithMergeMixin
441+
# For conditional overwrites or when mssql_merge_exists is set use MERGE
440442
return super()._insert_overwrite_by_condition(
441443
table_name=table_name,
442444
source_queries=source_queries,

tests/core/engine_adapter/test_mssql.py

Lines changed: 91 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -9,14 +9,14 @@
99
from sqlglot import expressions as exp
1010
from sqlglot import parse_one
1111

12+
from pathlib import Path
13+
from sqlmesh import model
1214
from sqlmesh.core.engine_adapter.mssql import MSSQLEngineAdapter
13-
from sqlmesh.core.snapshot import SnapshotEvaluator, SnapshotChangeCategory
15+
from sqlmesh.core.snapshot import SnapshotEvaluator, SnapshotChangeCategory, Snapshot
1416
from sqlmesh.core.model import load_sql_based_model
17+
from sqlmesh.core.model.kind import SCDType2ByTimeKind
1518
from sqlmesh.core import dialect as d
16-
from sqlmesh.core.engine_adapter.shared import (
17-
DataObject,
18-
DataObjectType,
19-
)
19+
from sqlmesh.core.engine_adapter.shared import DataObject, DataObjectType, SourceQuery
2020
from sqlmesh.utils.date import to_ds
2121
from tests.core.engine_adapter import to_sql_calls
2222

@@ -916,3 +916,89 @@ def test_replace_query_strategy(adapter: MSSQLEngineAdapter, mocker: MockerFixtu
916916
"TRUNCATE TABLE [test_table];",
917917
"INSERT INTO [test_table] ([a], [b]) SELECT [a] AS [a], [b] AS [b] FROM [db].[upstream_table] AS [upstream_table];",
918918
]
919+
920+
921+
def test_mssql_merge_exists_switches_strategy_from_truncate_to_merge(
922+
make_mocked_engine_adapter: t.Callable, mocker: MockerFixture
923+
):
924+
adapter = make_mocked_engine_adapter(MSSQLEngineAdapter)
925+
926+
query = exp.select("*").from_("source")
927+
source_queries = [SourceQuery(query_factory=lambda: query)]
928+
929+
# Test WITHOUT mssql_merge_exists, should use DELETE+INSERT strategy
930+
base_insert_overwrite = mocker.patch(
931+
"sqlmesh.core.engine_adapter.base.EngineAdapter._insert_overwrite_by_condition"
932+
)
933+
934+
adapter._insert_overwrite_by_condition(
935+
table_name="target",
936+
source_queries=source_queries,
937+
target_columns_to_types={
938+
"id": exp.DataType.build("INT"),
939+
"value": exp.DataType.build("VARCHAR"),
940+
},
941+
where=None,
942+
)
943+
944+
# Should call base DELETE+INSERT strategy
945+
assert base_insert_overwrite.called
946+
base_insert_overwrite.reset_mock()
947+
948+
# Test WITH mssql_merge_exists uses MERGE strategy
949+
super_insert_overwrite = mocker.patch(
950+
"sqlmesh.core.engine_adapter.base.EngineAdapterWithIndexSupport._insert_overwrite_by_condition"
951+
)
952+
953+
adapter._insert_overwrite_by_condition(
954+
table_name="target",
955+
source_queries=source_queries,
956+
target_columns_to_types={
957+
"id": exp.DataType.build("INT"),
958+
"value": exp.DataType.build("VARCHAR"),
959+
},
960+
where=None,
961+
table_properties={"mssql_merge_exists": True},
962+
)
963+
964+
# Should call super's MERGE strategy, not base DELETE+INSERT
965+
assert super_insert_overwrite.called
966+
assert not base_insert_overwrite.called
967+
968+
969+
def test_python_scd2_model_preserves_physical_properties(make_snapshot):
970+
@model(
971+
"test_schema.python_scd2_with_mssql_merge",
972+
kind=SCDType2ByTimeKind(
973+
unique_key=["id"],
974+
valid_from_name="valid_from",
975+
valid_to_name="valid_to",
976+
updated_at_name="updated_at",
977+
),
978+
columns={
979+
"id": "INT",
980+
"value": "VARCHAR",
981+
"updated_at": "TIMESTAMP",
982+
"valid_from": "TIMESTAMP",
983+
"valid_to": "TIMESTAMP",
984+
},
985+
physical_properties={"mssql_merge_exists": True},
986+
)
987+
def python_scd2_model(context, **kwargs):
988+
import pandas as pd
989+
990+
return pd.DataFrame(
991+
{"id": [1, 2], "value": ["a", "b"], "updated_at": ["2024-01-01", "2024-01-02"]}
992+
)
993+
994+
m = model.get_registry()["test_schema.python_scd2_with_mssql_merge"].model(
995+
module_path=Path("."),
996+
path=Path("."),
997+
dialect="tsql",
998+
)
999+
1000+
# verify model has physical_properties that trigger merge strategy
1001+
assert "mssql_merge_exists" in m.physical_properties
1002+
snapshot: Snapshot = make_snapshot(m)
1003+
assert snapshot.node.physical_properties == m.physical_properties
1004+
assert snapshot.node.physical_properties.get("mssql_merge_exists")

0 commit comments

Comments
 (0)