Skip to content

Commit dbc7de6

Browse files
authored
Fix: Regression that caused view snapshos not to be migrated (#5389)
1 parent d7dda8f commit dbc7de6

File tree

9 files changed

+75
-27
lines changed

9 files changed

+75
-27
lines changed

pyproject.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -225,7 +225,8 @@ module = [
225225
"pydantic_core.*",
226226
"dlt.*",
227227
"bigframes.*",
228-
"json_stream.*"
228+
"json_stream.*",
229+
"duckdb.*"
229230
]
230231
ignore_missing_imports = true
231232

sqlmesh/core/plan/stages.py

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
Snapshot,
1313
SnapshotTableInfo,
1414
SnapshotId,
15+
snapshots_to_dag,
1516
)
1617

1718

@@ -248,6 +249,7 @@ def build(self, plan: EvaluatablePlan) -> t.List[PlanStage]:
248249
stored_snapshots = self.state_reader.get_snapshots(plan.environment.snapshots)
249250
snapshots = {**new_snapshots, **stored_snapshots}
250251
snapshots_by_name = {s.name: s for s in snapshots.values()}
252+
dag = snapshots_to_dag(snapshots.values())
251253

252254
all_selected_for_backfill_snapshots = {
253255
s.snapshot_id for s in snapshots.values() if plan.is_selected_for_backfill(s.name)
@@ -271,8 +273,15 @@ def build(self, plan: EvaluatablePlan) -> t.List[PlanStage]:
271273
after_promote_snapshots = all_selected_for_backfill_snapshots - before_promote_snapshots
272274
deployability_index = DeployabilityIndex.all_deployable()
273275

276+
snapshot_ids_with_schema_migration = [
277+
s.snapshot_id for s in snapshots.values() if s.requires_schema_migration_in_prod
278+
]
279+
# Include all upstream dependencies of snapshots that require schema migration to make sure
280+
# the upstream tables are created before the schema updates are applied
274281
snapshots_with_schema_migration = [
275-
s for s in snapshots.values() if s.requires_schema_migration_in_prod
282+
snapshots[s_id]
283+
for s_id in dag.subdag(*snapshot_ids_with_schema_migration)
284+
if snapshots[s_id].supports_schema_migration_in_prod
276285
]
277286

278287
snapshots_to_intervals = self._missing_intervals(

sqlmesh/core/snapshot/definition.py

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1477,19 +1477,19 @@ def expiration_ts(self) -> int:
14771477
check_categorical_relative_expression=False,
14781478
)
14791479

1480+
@property
1481+
def supports_schema_migration_in_prod(self) -> bool:
1482+
"""Returns whether or not this snapshot supports schema migration when deployed to production."""
1483+
return self.is_paused and self.is_model and not self.is_symbolic
1484+
14801485
@property
14811486
def requires_schema_migration_in_prod(self) -> bool:
14821487
"""Returns whether or not this snapshot requires a schema migration when deployed to production."""
1483-
return (
1484-
self.is_paused
1485-
and self.is_model
1486-
and self.is_materialized
1487-
and (
1488-
(self.previous_version and self.previous_version.version == self.version)
1489-
or self.model.forward_only
1490-
or bool(self.model.physical_version)
1491-
or not self.virtual_environment_mode.is_full
1492-
)
1488+
return self.supports_schema_migration_in_prod and (
1489+
(self.previous_version and self.previous_version.version == self.version)
1490+
or self.model.forward_only
1491+
or bool(self.model.physical_version)
1492+
or not self.virtual_environment_mode.is_full
14931493
)
14941494

14951495
@property

sqlmesh/core/snapshot/evaluator.py

Lines changed: 18 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -489,15 +489,14 @@ def migrate(
489489
allow_destructive_snapshots = allow_destructive_snapshots or set()
490490
allow_additive_snapshots = allow_additive_snapshots or set()
491491
snapshots_by_name = {s.name: s for s in snapshots.values()}
492-
snapshots_with_data_objects = [snapshots[s_id] for s_id in target_data_objects]
493492
with self.concurrent_context():
494493
# Only migrate snapshots for which there's an existing data object
495494
concurrent_apply_to_snapshots(
496-
snapshots_with_data_objects,
495+
snapshots_by_name.values(),
497496
lambda s: self._migrate_snapshot(
498497
s,
499498
snapshots_by_name,
500-
target_data_objects[s.snapshot_id],
499+
target_data_objects.get(s.snapshot_id),
501500
allow_destructive_snapshots,
502501
allow_additive_snapshots,
503502
self.get_adapter(s.model_gateway),
@@ -1059,7 +1058,7 @@ def _migrate_snapshot(
10591058
adapter: EngineAdapter,
10601059
deployability_index: DeployabilityIndex,
10611060
) -> None:
1062-
if not snapshot.requires_schema_migration_in_prod:
1061+
if not snapshot.is_model or snapshot.is_symbolic:
10631062
return
10641063

10651064
deployability_index = DeployabilityIndex.all_deployable()
@@ -1081,20 +1080,32 @@ def _migrate_snapshot(
10811080
):
10821081
table_exists = False
10831082

1083+
rendered_physical_properties = snapshot.model.render_physical_properties(
1084+
**render_kwargs
1085+
)
1086+
10841087
if table_exists:
10851088
self._migrate_target_table(
10861089
target_table_name=target_table_name,
10871090
snapshot=snapshot,
10881091
snapshots=snapshots,
10891092
deployability_index=deployability_index,
10901093
render_kwargs=render_kwargs,
1091-
rendered_physical_properties=snapshot.model.render_physical_properties(
1092-
**render_kwargs
1093-
),
1094+
rendered_physical_properties=rendered_physical_properties,
10941095
allow_destructive_snapshots=allow_destructive_snapshots,
10951096
allow_additive_snapshots=allow_additive_snapshots,
10961097
run_pre_post_statements=True,
10971098
)
1099+
else:
1100+
self._execute_create(
1101+
snapshot=snapshot,
1102+
table_name=snapshot.table_name(is_deployable=True),
1103+
is_table_deployable=True,
1104+
deployability_index=deployability_index,
1105+
create_render_kwargs=render_kwargs,
1106+
rendered_physical_properties=rendered_physical_properties,
1107+
dry_run=True,
1108+
)
10981109

10991110
def _migrate_target_table(
11001111
self,

tests/core/test_context.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1593,7 +1593,7 @@ def test_raw_code_handling(sushi_test_dbt_context: Context):
15931593
hook = model.render_pre_statements()[0]
15941594
assert (
15951595
hook.sql()
1596-
== f'''CREATE TABLE "t" AS SELECT 'Length is {raw_code_length}' AS "length_col"'''
1596+
== f'''CREATE TABLE IF NOT EXISTS "t" AS SELECT 'Length is {raw_code_length}' AS "length_col"'''
15971597
)
15981598

15991599

tests/core/test_integration.py

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -942,6 +942,26 @@ def test_forward_only_parent_created_in_dev_child_created_in_prod(
942942
context.apply(plan)
943943

944944

945+
@time_machine.travel("2023-01-08 15:00:00 UTC")
946+
def test_forward_only_view_migration(
947+
init_and_plan_context: t.Callable,
948+
):
949+
context, plan = init_and_plan_context("examples/sushi")
950+
context.apply(plan)
951+
952+
model = context.get_model("sushi.top_waiters")
953+
assert model.kind.is_view
954+
model = add_projection_to_model(t.cast(SqlModel, model))
955+
context.upsert_model(model)
956+
957+
# Apply a forward-only plan
958+
context.plan("prod", skip_tests=True, no_prompts=True, auto_apply=True, forward_only=True)
959+
960+
# Make sure that the new column got reflected in the view schema
961+
df = context.fetchdf("SELECT one FROM sushi.top_waiters LIMIT 1")
962+
assert len(df) == 1
963+
964+
945965
@time_machine.travel("2023-01-08 00:00:00 UTC")
946966
def test_new_forward_only_model(init_and_plan_context: t.Callable):
947967
context, _ = init_and_plan_context("examples/sushi")

tests/core/test_plan_stages.py

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1661,16 +1661,17 @@ def test_build_plan_stages_indirect_non_breaking_view_migration(
16611661
stages = build_plan_stages(plan, state_reader, None)
16621662

16631663
# Verify stages
1664-
assert len(stages) == 8
1664+
assert len(stages) == 9
16651665

16661666
assert isinstance(stages[0], CreateSnapshotRecordsStage)
16671667
assert isinstance(stages[1], PhysicalLayerSchemaCreationStage)
16681668
assert isinstance(stages[2], BackfillStage)
16691669
assert isinstance(stages[3], EnvironmentRecordUpdateStage)
1670-
assert isinstance(stages[4], UnpauseStage)
1671-
assert isinstance(stages[5], BackfillStage)
1672-
assert isinstance(stages[6], VirtualLayerUpdateStage)
1673-
assert isinstance(stages[7], FinalizeEnvironmentStage)
1670+
assert isinstance(stages[4], MigrateSchemasStage)
1671+
assert isinstance(stages[5], UnpauseStage)
1672+
assert isinstance(stages[6], BackfillStage)
1673+
assert isinstance(stages[7], VirtualLayerUpdateStage)
1674+
assert isinstance(stages[8], FinalizeEnvironmentStage)
16741675

16751676

16761677
def test_build_plan_stages_virtual_environment_mode_filtering(

tests/core/test_snapshot_evaluator.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1402,7 +1402,13 @@ def test_migrate_view(
14021402
evaluator = SnapshotEvaluator(adapter)
14031403
evaluator.migrate([snapshot], {})
14041404

1405-
adapter.cursor.execute.assert_not_called()
1405+
adapter.cursor.execute.assert_has_calls(
1406+
[
1407+
call(
1408+
f'CREATE OR REPLACE VIEW "sqlmesh__test_schema"."test_schema__test_model__{snapshot.version}" ("c", "a") AS SELECT "c" AS "c", "a" AS "a" FROM "tbl" AS "tbl"'
1409+
),
1410+
]
1411+
)
14061412

14071413

14081414
def test_migrate_snapshot_data_object_type_mismatch(

tests/fixtures/dbt/sushi_test/models/model_with_raw_code.sql

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
{{
22
config(
3-
pre_hook=['CREATE TABLE t AS SELECT \'Length is {{ model.raw_code|length }}\' AS length_col']
3+
pre_hook=['CREATE TABLE IF NOT EXISTS t AS SELECT \'Length is {{ model.raw_code|length }}\' AS length_col']
44
)
55
}}
66

0 commit comments

Comments
 (0)