Skip to content

Commit df7d61b

Browse files
authored
Fix: Make table schema migration retriable (#5395)
1 parent 69206c7 commit df7d61b

File tree

9 files changed

+27
-10
lines changed

9 files changed

+27
-10
lines changed

sqlmesh/core/engine_adapter/base.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1044,6 +1044,7 @@ def clone_table(
10441044
target_table_name: TableName,
10451045
source_table_name: TableName,
10461046
replace: bool = False,
1047+
exists: bool = True,
10471048
clone_kwargs: t.Optional[t.Dict[str, t.Any]] = None,
10481049
**kwargs: t.Any,
10491050
) -> None:
@@ -1053,6 +1054,7 @@ def clone_table(
10531054
target_table_name: The name of the table that should be created.
10541055
source_table_name: The name of the source table that should be cloned.
10551056
replace: Whether or not to replace an existing table.
1057+
exists: Indicates whether to include the IF NOT EXISTS check.
10561058
"""
10571059
if not self.SUPPORTS_CLONING:
10581060
raise NotImplementedError(f"Engine does not support cloning: {type(self)}")
@@ -1063,6 +1065,7 @@ def clone_table(
10631065
this=exp.to_table(target_table_name),
10641066
kind="TABLE",
10651067
replace=replace,
1068+
exists=exists,
10661069
clone=exp.Clone(
10671070
this=exp.to_table(source_table_name),
10681071
**(clone_kwargs or {}),

sqlmesh/core/engine_adapter/databricks.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -299,6 +299,7 @@ def clone_table(
299299
target_table_name: TableName,
300300
source_table_name: TableName,
301301
replace: bool = False,
302+
exists: bool = True,
302303
clone_kwargs: t.Optional[t.Dict[str, t.Any]] = None,
303304
**kwargs: t.Any,
304305
) -> None:

sqlmesh/core/engine_adapter/snowflake.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -610,6 +610,7 @@ def clone_table(
610610
target_table_name: TableName,
611611
source_table_name: TableName,
612612
replace: bool = False,
613+
exists: bool = True,
613614
clone_kwargs: t.Optional[t.Dict[str, t.Any]] = None,
614615
**kwargs: t.Any,
615616
) -> None:

sqlmesh/core/snapshot/evaluator.py

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232

3333
from sqlglot import exp, select
3434
from sqlglot.executor import execute
35+
from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_not_exception_type
3536

3637
from sqlmesh.core import constants as c
3738
from sqlmesh.core import dialect as d
@@ -76,6 +77,7 @@
7677
from sqlmesh.utils.errors import (
7778
ConfigError,
7879
DestructiveChangeError,
80+
MigrationNotSupportedError,
7981
SQLMeshError,
8082
format_destructive_change_msg,
8183
format_additive_change_msg,
@@ -1035,7 +1037,6 @@ def _clone_snapshot_in_dev(
10351037
adapter.clone_table(
10361038
target_table_name,
10371039
snapshot.table_name(),
1038-
replace=True,
10391040
rendered_physical_properties=rendered_physical_properties,
10401041
)
10411042
self._migrate_target_table(
@@ -1111,6 +1112,15 @@ def _migrate_snapshot(
11111112
dry_run=True,
11121113
)
11131114

1115+
# Retry in case when the table is migrated concurrently from another plan application
1116+
@retry(
1117+
reraise=True,
1118+
stop=stop_after_attempt(5),
1119+
wait=wait_exponential(min=1, max=16),
1120+
retry=retry_if_not_exception_type(
1121+
(DestructiveChangeError, AdditiveChangeError, MigrationNotSupportedError)
1122+
),
1123+
)
11141124
def _migrate_target_table(
11151125
self,
11161126
target_table_name: str,
@@ -2672,7 +2682,7 @@ def migrate(
26722682
)
26732683
if len(potential_alter_operations) > 0:
26742684
# this can happen if a user changes a managed model and deliberately overrides a plan to be forward only, eg `sqlmesh plan --forward-only`
2675-
raise SQLMeshError(
2685+
raise MigrationNotSupportedError(
26762686
f"The schema of the managed model '{target_table_name}' cannot be updated in a forward-only fashion."
26772687
)
26782688

sqlmesh/utils/errors.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -151,6 +151,10 @@ class AdditiveChangeError(SQLMeshError):
151151
pass
152152

153153

154+
class MigrationNotSupportedError(SQLMeshError):
155+
pass
156+
157+
154158
class NotificationTargetError(SQLMeshError):
155159
pass
156160

tests/core/engine_adapter/test_base.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3347,7 +3347,7 @@ def test_clone_table(make_mocked_engine_adapter: t.Callable):
33473347
adapter.clone_table("target_table", "source_table")
33483348

33493349
adapter.cursor.execute.assert_called_once_with(
3350-
"CREATE TABLE `target_table` CLONE `source_table`"
3350+
"CREATE TABLE IF NOT EXISTS `target_table` CLONE `source_table`"
33513351
)
33523352

33533353

tests/core/engine_adapter/test_databricks.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,7 @@ def test_clone_table(mocker: MockFixture, make_mocked_engine_adapter: t.Callable
106106
adapter = make_mocked_engine_adapter(DatabricksEngineAdapter, default_catalog="test_catalog")
107107
adapter.clone_table("target_table", "source_table")
108108
adapter.cursor.execute.assert_called_once_with(
109-
"CREATE TABLE `target_table` SHALLOW CLONE `source_table`"
109+
"CREATE TABLE IF NOT EXISTS `target_table` SHALLOW CLONE `source_table`"
110110
)
111111

112112

tests/core/engine_adapter/test_snowflake.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -688,7 +688,7 @@ def test_clone_table(mocker: MockerFixture, make_mocked_engine_adapter: t.Callab
688688
adapter = make_mocked_engine_adapter(SnowflakeEngineAdapter, default_catalog="test_catalog")
689689
adapter.clone_table("target_table", "source_table")
690690
adapter.cursor.execute.assert_called_once_with(
691-
'CREATE TABLE "target_table" CLONE "source_table"'
691+
'CREATE TABLE IF NOT EXISTS "target_table" CLONE "source_table"'
692692
)
693693

694694
# Validate with transient type we create the clone table accordingly
@@ -700,7 +700,7 @@ def test_clone_table(mocker: MockerFixture, make_mocked_engine_adapter: t.Callab
700700
"target_table", "source_table", rendered_physical_properties=rendered_physical_properties
701701
)
702702
adapter.cursor.execute.assert_called_once_with(
703-
'CREATE TRANSIENT TABLE "target_table" CLONE "source_table"'
703+
'CREATE TRANSIENT TABLE IF NOT EXISTS "target_table" CLONE "source_table"'
704704
)
705705

706706
# Validate other engine adapters would work as usual even when we pass the properties
@@ -710,7 +710,7 @@ def test_clone_table(mocker: MockerFixture, make_mocked_engine_adapter: t.Callab
710710
"target_table", "source_table", rendered_physical_properties=rendered_physical_properties
711711
)
712712
adapter.cursor.execute.assert_called_once_with(
713-
'CREATE TABLE "target_table" CLONE "source_table"'
713+
'CREATE TABLE IF NOT EXISTS "target_table" CLONE "source_table"'
714714
)
715715

716716

tests/core/test_snapshot_evaluator.py

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1678,7 +1678,6 @@ def test_create_clone_in_dev(mocker: MockerFixture, adapter_mock, make_snapshot)
16781678
adapter_mock.clone_table.assert_called_once_with(
16791679
f"sqlmesh__test_schema.test_schema__test_model__{snapshot.dev_version}__dev",
16801680
f"sqlmesh__test_schema.test_schema__test_model__{snapshot.version}",
1681-
replace=True,
16821681
rendered_physical_properties={},
16831682
)
16841683

@@ -1701,7 +1700,7 @@ def test_drop_clone_in_dev_when_migration_fails(mocker: MockerFixture, adapter_m
17011700
adapter_mock.get_alter_operations.return_value = []
17021701
evaluator = SnapshotEvaluator(adapter_mock)
17031702

1704-
adapter_mock.alter_table.side_effect = Exception("Migration failed")
1703+
adapter_mock.alter_table.side_effect = DestructiveChangeError("Migration failed")
17051704

17061705
model = load_sql_based_model(
17071706
parse( # type: ignore
@@ -1728,7 +1727,6 @@ def test_drop_clone_in_dev_when_migration_fails(mocker: MockerFixture, adapter_m
17281727
adapter_mock.clone_table.assert_called_once_with(
17291728
f"sqlmesh__test_schema.test_schema__test_model__{snapshot.version}__dev",
17301729
f"sqlmesh__test_schema.test_schema__test_model__{snapshot.version}",
1731-
replace=True,
17321730
rendered_physical_properties={},
17331731
)
17341732

0 commit comments

Comments
 (0)