Skip to content

Commit b8bc224

Browse files
authored
Chore!: Remove old migration scripts up to and including v0060 (#5309)
1 parent 971eccf commit b8bc224

File tree

76 files changed

+152
-3417
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

76 files changed

+152
-3417
lines changed

sqlmesh/core/context.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -588,7 +588,7 @@ def state_sync(self) -> StateSync:
588588

589589
if self._state_sync.get_versions(validate=False).schema_version == 0:
590590
self.console.log_status_update("Initializing new project state...")
591-
self._state_sync.migrate(default_catalog=self.default_catalog)
591+
self._state_sync.migrate()
592592
self._state_sync.get_versions()
593593
self._state_sync = CachingStateSync(self._state_sync) # type: ignore
594594
return self._state_sync
@@ -2356,7 +2356,6 @@ def migrate(self) -> None:
23562356
self._load_materializations()
23572357
try:
23582358
self._new_state_sync().migrate(
2359-
default_catalog=self.default_catalog,
23602359
promoted_snapshots_only=self.config.migration.promoted_snapshots_only,
23612360
)
23622361
except Exception as e:

sqlmesh/core/state_sync/base.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -61,11 +61,14 @@ def _schema_version_validator(cls, v: t.Any) -> int:
6161
return 0 if v is None else int(v)
6262

6363

64+
MIN_SCHEMA_VERSION = 60
65+
MIN_SQLMESH_VERSION = "0.134.0"
6466
MIGRATIONS = [
6567
importlib.import_module(f"sqlmesh.migrations.{migration}")
6668
for migration in sorted(info.name for info in pkgutil.iter_modules(migrations.__path__))
6769
]
68-
SCHEMA_VERSION: int = len(MIGRATIONS)
70+
# -1 to account for the baseline script
71+
SCHEMA_VERSION: int = MIN_SCHEMA_VERSION + len(MIGRATIONS) - 1
6972

7073

7174
class PromotionResult(PydanticModel):
@@ -469,7 +472,6 @@ def compact_intervals(self) -> None:
469472
@abc.abstractmethod
470473
def migrate(
471474
self,
472-
default_catalog: t.Optional[str],
473475
skip_backup: bool = False,
474476
promoted_snapshots_only: bool = True,
475477
) -> None:

sqlmesh/core/state_sync/db/facade.py

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@
2222
from pathlib import Path
2323
from datetime import datetime
2424

25-
from sqlglot import exp
2625

2726
from sqlmesh.core.console import Console, get_console
2827
from sqlmesh.core.engine_adapter import EngineAdapter
@@ -90,7 +89,6 @@ def __init__(
9089
console: t.Optional[Console] = None,
9190
cache_dir: Path = Path(),
9291
):
93-
self.plan_dags_table = exp.table_("_plan_dags", db=schema)
9492
self.interval_state = IntervalState(engine_adapter, schema=schema)
9593
self.environment_state = EnvironmentState(engine_adapter, schema=schema)
9694
self.snapshot_state = SnapshotState(engine_adapter, schema=schema, cache_dir=cache_dir)
@@ -101,7 +99,6 @@ def __init__(
10199
snapshot_state=self.snapshot_state,
102100
environment_state=self.environment_state,
103101
interval_state=self.interval_state,
104-
plan_dags_table=self.plan_dags_table,
105102
console=console,
106103
)
107104
# Make sure that if an empty string is provided that we treat it as None
@@ -308,7 +305,6 @@ def remove_state(self, including_backup: bool = False) -> None:
308305
self.environment_state.environments_table,
309306
self.environment_state.environment_statements_table,
310307
self.interval_state.intervals_table,
311-
self.plan_dags_table,
312308
self.version_state.versions_table,
313309
):
314310
self.engine_adapter.drop_table(table)
@@ -453,14 +449,12 @@ def close(self) -> None:
453449
@transactional()
454450
def migrate(
455451
self,
456-
default_catalog: t.Optional[str],
457452
skip_backup: bool = False,
458453
promoted_snapshots_only: bool = True,
459454
) -> None:
460455
"""Migrate the state sync to the latest SQLMesh / SQLGlot version."""
461456
self.migrator.migrate(
462457
self,
463-
default_catalog,
464458
skip_backup=skip_backup,
465459
promoted_snapshots_only=promoted_snapshots_only,
466460
)

sqlmesh/core/state_sync/db/migrator.py

Lines changed: 19 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,8 @@
2727
)
2828
from sqlmesh.core.state_sync.base import (
2929
MIGRATIONS,
30+
MIN_SCHEMA_VERSION,
31+
MIN_SQLMESH_VERSION,
3032
)
3133
from sqlmesh.core.state_sync.base import StateSync
3234
from sqlmesh.core.state_sync.db.environment import EnvironmentState
@@ -41,7 +43,7 @@
4143
from sqlmesh.utils import major_minor
4244
from sqlmesh.utils.dag import DAG
4345
from sqlmesh.utils.date import now_timestamp
44-
from sqlmesh.utils.errors import SQLMeshError
46+
from sqlmesh.utils.errors import SQLMeshError, StateMigrationError
4547

4648
logger = logging.getLogger(__name__)
4749

@@ -61,7 +63,6 @@ def __init__(
6163
snapshot_state: SnapshotState,
6264
environment_state: EnvironmentState,
6365
interval_state: IntervalState,
64-
plan_dags_table: TableName,
6566
console: t.Optional[Console] = None,
6667
):
6768
self.engine_adapter = engine_adapter
@@ -70,7 +71,6 @@ def __init__(
7071
self.snapshot_state = snapshot_state
7172
self.environment_state = environment_state
7273
self.interval_state = interval_state
73-
self.plan_dags_table = plan_dags_table
7474

7575
self._state_tables = [
7676
self.snapshot_state.snapshots_table,
@@ -79,15 +79,13 @@ def __init__(
7979
]
8080
self._optional_state_tables = [
8181
self.interval_state.intervals_table,
82-
self.plan_dags_table,
8382
self.snapshot_state.auto_restatements_table,
8483
self.environment_state.environment_statements_table,
8584
]
8685

8786
def migrate(
8887
self,
8988
state_sync: StateSync,
90-
default_catalog: t.Optional[str],
9189
skip_backup: bool = False,
9290
promoted_snapshots_only: bool = True,
9391
) -> None:
@@ -96,15 +94,13 @@ def migrate(
9694
migration_start_ts = time.perf_counter()
9795

9896
try:
99-
migrate_rows = self._apply_migrations(state_sync, default_catalog, skip_backup)
97+
migrate_rows = self._apply_migrations(state_sync, skip_backup)
10098

10199
if not migrate_rows and major_minor(SQLMESH_VERSION) == versions.minor_sqlmesh_version:
102100
return
103101

104102
if migrate_rows:
105103
self._migrate_rows(promoted_snapshots_only)
106-
# Cleanup plan DAGs since we currently don't migrate snapshot records that are in there.
107-
self.engine_adapter.delete_from(self.plan_dags_table, "TRUE")
108104
self.version_state.update_versions()
109105

110106
analytics.collector.on_migration_end(
@@ -126,6 +122,8 @@ def migrate(
126122
)
127123

128124
self.console.log_migration_status(success=False)
125+
if isinstance(e, StateMigrationError):
126+
raise
129127
raise SQLMeshError("SQLMesh migration failed.") from e
130128

131129
self.console.log_migration_status()
@@ -156,11 +154,20 @@ def rollback(self) -> None:
156154
def _apply_migrations(
157155
self,
158156
state_sync: StateSync,
159-
default_catalog: t.Optional[str],
160157
skip_backup: bool,
161158
) -> bool:
162159
versions = self.version_state.get_versions()
163-
migrations = MIGRATIONS[versions.schema_version :]
160+
first_script_index = 0
161+
if versions.schema_version and versions.schema_version < MIN_SCHEMA_VERSION:
162+
raise StateMigrationError(
163+
"The current state belongs to an old version of SQLMesh that is no longer supported. "
164+
f"Please upgrade to {MIN_SQLMESH_VERSION} first before upgrading to {SQLMESH_VERSION}."
165+
)
166+
elif versions.schema_version > 0:
167+
# -1 to skip the baseline migration script
168+
first_script_index = versions.schema_version - (MIN_SCHEMA_VERSION - 1)
169+
170+
migrations = MIGRATIONS[first_script_index:]
164171
should_backup = any(
165172
[
166173
migrations,
@@ -177,10 +184,10 @@ def _apply_migrations(
177184

178185
for migration in migrations:
179186
logger.info(f"Applying migration {migration}")
180-
migration.migrate_schemas(state_sync, default_catalog=default_catalog)
187+
migration.migrate_schemas(state_sync)
181188
if state_table_exist:
182189
# No need to run DML for the initial migration since all tables are empty
183-
migration.migrate_rows(state_sync, default_catalog=default_catalog)
190+
migration.migrate_rows(state_sync)
184191

185192
snapshot_count_after = self.snapshot_state.count()
186193

sqlmesh/migrations/v0056_restore_table_indexes.py renamed to sqlmesh/migrations/v0000_baseline.py

Lines changed: 27 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -1,31 +1,27 @@
1-
"""Readds indexes and primary keys in case tables were restored from a backup."""
1+
"""The baseline migration script that sets up the initial state tables."""
22

33
from sqlglot import exp
4-
from sqlmesh.utils import random_id
5-
from sqlmesh.utils.migration import index_text_type
6-
from sqlmesh.utils.migration import blob_text_type
4+
from sqlmesh.utils.migration import blob_text_type, index_text_type
75

86

97
def migrate_schemas(state_sync, **kwargs): # type: ignore
108
schema = state_sync.schema
119
engine_adapter = state_sync.engine_adapter
12-
if not engine_adapter.SUPPORTS_INDEXES:
13-
return
1410

1511
intervals_table = "_intervals"
1612
snapshots_table = "_snapshots"
1713
environments_table = "_environments"
14+
versions_table = "_versions"
1815
if state_sync.schema:
16+
engine_adapter.create_schema(schema)
1917
intervals_table = f"{schema}.{intervals_table}"
2018
snapshots_table = f"{schema}.{snapshots_table}"
2119
environments_table = f"{schema}.{environments_table}"
22-
23-
table_suffix = random_id(short=True)
20+
versions_table = f"{schema}.{versions_table}"
2421

2522
index_type = index_text_type(engine_adapter.dialect)
2623
blob_type = blob_text_type(engine_adapter.dialect)
2724

28-
new_snapshots_table = f"{snapshots_table}__{table_suffix}"
2925
snapshots_columns_to_types = {
3026
"name": exp.DataType.build(index_type),
3127
"identifier": exp.DataType.build(index_type),
@@ -38,7 +34,6 @@ def migrate_schemas(state_sync, **kwargs): # type: ignore
3834
"unrestorable": exp.DataType.build("boolean"),
3935
}
4036

41-
new_environments_table = f"{environments_table}__{table_suffix}"
4237
environments_columns_to_types = {
4338
"name": exp.DataType.build(index_type),
4439
"snapshots": exp.DataType.build(blob_type),
@@ -53,9 +48,9 @@ def migrate_schemas(state_sync, **kwargs): # type: ignore
5348
"catalog_name_override": exp.DataType.build("text"),
5449
"previous_finalized_snapshots": exp.DataType.build(blob_type),
5550
"normalize_name": exp.DataType.build("boolean"),
51+
"requirements": exp.DataType.build(blob_type),
5652
}
5753

58-
new_intervals_table = f"{intervals_table}__{table_suffix}"
5954
intervals_columns_to_types = {
6055
"id": exp.DataType.build(index_type),
6156
"created_ts": exp.DataType.build("bigint"),
@@ -69,53 +64,34 @@ def migrate_schemas(state_sync, **kwargs): # type: ignore
6964
"is_compacted": exp.DataType.build("boolean"),
7065
}
7166

72-
# Recreate the snapshots table and its indexes.
73-
engine_adapter.create_table(
74-
new_snapshots_table, snapshots_columns_to_types, primary_key=("name", "identifier")
75-
)
76-
engine_adapter.create_index(
77-
new_snapshots_table, "_snapshots_name_version_idx", ("name", "version")
78-
)
79-
engine_adapter.insert_append(
80-
new_snapshots_table,
81-
exp.select("*").from_(snapshots_table),
82-
target_columns_to_types=snapshots_columns_to_types,
83-
)
67+
versions_columns_to_types = {
68+
"schema_version": exp.DataType.build("int"),
69+
"sqlglot_version": exp.DataType.build(index_type),
70+
"sqlmesh_version": exp.DataType.build(index_type),
71+
}
8472

85-
# Recreate the environments table and its indexes.
86-
engine_adapter.create_table(
87-
new_environments_table, environments_columns_to_types, primary_key=("name",)
88-
)
89-
engine_adapter.insert_append(
90-
new_environments_table,
91-
exp.select("*").from_(environments_table),
92-
target_columns_to_types=environments_columns_to_types,
73+
# Create the versions table.
74+
engine_adapter.create_state_table(versions_table, versions_columns_to_types)
75+
76+
# Create the snapshots table and its indexes.
77+
engine_adapter.create_state_table(
78+
snapshots_table, snapshots_columns_to_types, primary_key=("name", "identifier")
9379
)
80+
engine_adapter.create_index(snapshots_table, "_snapshots_name_version_idx", ("name", "version"))
9481

95-
# Recreate the intervals table and its indexes.
96-
engine_adapter.create_table(
97-
new_intervals_table, intervals_columns_to_types, primary_key=("id",)
82+
# Create the environments table and its indexes.
83+
engine_adapter.create_state_table(
84+
environments_table, environments_columns_to_types, primary_key=("name",)
9885
)
99-
engine_adapter.create_index(
100-
new_intervals_table, "_intervals_name_identifier_idx", ("name", "identifier")
86+
87+
# Create the intervals table and its indexes.
88+
engine_adapter.create_state_table(
89+
intervals_table, intervals_columns_to_types, primary_key=("id",)
10190
)
10291
engine_adapter.create_index(
103-
new_intervals_table, "_intervals_name_version_idx", ("name", "version")
92+
intervals_table, "_intervals_name_identifier_idx", ("name", "identifier")
10493
)
105-
engine_adapter.insert_append(
106-
new_intervals_table,
107-
exp.select("*").from_(intervals_table),
108-
target_columns_to_types=intervals_columns_to_types,
109-
)
110-
111-
# Drop old tables.
112-
for table in (snapshots_table, environments_table, intervals_table):
113-
engine_adapter.drop_table(table)
114-
115-
# Replace old tables with new ones.
116-
engine_adapter.rename_table(new_snapshots_table, snapshots_table)
117-
engine_adapter.rename_table(new_environments_table, environments_table)
118-
engine_adapter.rename_table(new_intervals_table, intervals_table)
94+
engine_adapter.create_index(intervals_table, "_intervals_name_version_idx", ("name", "version"))
11995

12096

12197
def migrate_rows(state_sync, **kwargs): # type: ignore

sqlmesh/migrations/v0001_init.py

Lines changed: 0 additions & 64 deletions
This file was deleted.

0 commit comments

Comments
 (0)