Skip to content

Commit d8b618f

Browse files
authored
Remove support for live recorder data migration of context ids (#125309)
1 parent e888a95 commit d8b618f

File tree

10 files changed

+349
-132
lines changed

10 files changed

+349
-132
lines changed

homeassistant/components/recorder/core.py

Lines changed: 6 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -78,16 +78,8 @@
7878
StatisticsShortTerm,
7979
)
8080
from .executor import DBInterruptibleThreadPoolExecutor
81-
from .migration import (
82-
EntityIDMigration,
83-
EventIDPostMigration,
84-
EventsContextIDMigration,
85-
EventTypeIDMigration,
86-
StatesContextIDMigration,
87-
)
8881
from .models import DatabaseEngine, StatisticData, StatisticMetaData, UnsupportedDialect
8982
from .pool import POOL_SIZE, MutexPool, RecorderPool
90-
from .queries import get_migration_changes
9183
from .table_managers.event_data import EventDataManager
9284
from .table_managers.event_types import EventTypeManager
9385
from .table_managers.recorder_runs import RecorderRunsManager
@@ -120,7 +112,6 @@
120112
build_mysqldb_conv,
121113
dburl_to_path,
122114
end_incomplete_runs,
123-
execute_stmt_lambda_element,
124115
is_second_sunday,
125116
move_away_broken_database,
126117
session_scope,
@@ -740,12 +731,17 @@ def _run(self) -> None:
740731

741732
# First do non-live migration steps, if needed
742733
if schema_status.migration_needed:
734+
# Do non-live schema migration
743735
result, schema_status = self._migrate_schema_offline(schema_status)
744736
if not result:
745737
self._notify_migration_failed()
746738
self.migration_in_progress = False
747739
return
748740
self.schema_version = schema_status.current_version
741+
742+
# Do non-live data migration
743+
migration.migrate_data_non_live(self, self.get_session, schema_status)
744+
749745
# Non-live migration is now completed, remaining steps are live
750746
self.migration_is_live = True
751747

@@ -801,20 +797,7 @@ def _activate_and_set_db_ready(
801797
# there are a lot of statistics graphs on the frontend.
802798
self.statistics_meta_manager.load(session)
803799

804-
migration_changes: dict[str, int] = {
805-
row[0]: row[1]
806-
for row in execute_stmt_lambda_element(session, get_migration_changes())
807-
}
808-
809-
for migrator_cls in (
810-
StatesContextIDMigration,
811-
EventsContextIDMigration,
812-
EventTypeIDMigration,
813-
EntityIDMigration,
814-
EventIDPostMigration,
815-
):
816-
migrator = migrator_cls(schema_status.start_version, migration_changes)
817-
migrator.do_migrate(self, session)
800+
migration.migrate_data_live(self, self.get_session, schema_status)
818801

819802
# We must only set the db ready after we have set the table managers
820803
# to active if there is no data to migrate.

homeassistant/components/recorder/migration.py

Lines changed: 133 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,7 @@
9191
find_states_context_ids_to_migrate,
9292
find_unmigrated_short_term_statistics_rows,
9393
find_unmigrated_statistics_rows,
94+
get_migration_changes,
9495
has_entity_ids_to_migrate,
9596
has_event_type_to_migrate,
9697
has_events_context_ids_to_migrate,
@@ -104,6 +105,7 @@
104105
from .tasks import RecorderTask
105106
from .util import (
106107
database_job_retry_wrapper,
108+
database_job_retry_wrapper_method,
107109
execute_stmt_lambda_element,
108110
get_index_by_name,
109111
retryable_database_job_method,
@@ -233,8 +235,12 @@ def validate_db_schema(
233235
# columns may otherwise not exist etc.
234236
schema_errors = _find_schema_errors(hass, instance, session_maker)
235237

238+
migration_needed = not is_current or non_live_data_migration_needed(
239+
instance, session_maker, current_version
240+
)
241+
236242
return SchemaValidationStatus(
237-
current_version, not is_current, schema_errors, current_version
243+
current_version, migration_needed, schema_errors, current_version
238244
)
239245

240246

@@ -350,6 +356,68 @@ def migrate_schema_live(
350356
return schema_status
351357

352358

359+
def _get_migration_changes(session: Session) -> dict[str, int]:
360+
"""Return migration changes as a dict."""
361+
migration_changes: dict[str, int] = {
362+
row[0]: row[1]
363+
for row in execute_stmt_lambda_element(session, get_migration_changes())
364+
}
365+
return migration_changes
366+
367+
368+
def non_live_data_migration_needed(
369+
instance: Recorder,
370+
session_maker: Callable[[], Session],
371+
schema_version: int,
372+
) -> bool:
373+
"""Return True if non-live data migration is needed.
374+
375+
This must only be called if database schema is current.
376+
"""
377+
migration_needed = False
378+
with session_scope(session=session_maker()) as session:
379+
migration_changes = _get_migration_changes(session)
380+
for migrator_cls in NON_LIVE_DATA_MIGRATORS:
381+
migrator = migrator_cls(schema_version, migration_changes)
382+
migration_needed |= migrator.needs_migrate(instance, session)
383+
384+
return migration_needed
385+
386+
387+
def migrate_data_non_live(
388+
instance: Recorder,
389+
session_maker: Callable[[], Session],
390+
schema_status: SchemaValidationStatus,
391+
) -> None:
392+
"""Do non-live data migration.
393+
394+
This must be called after non-live schema migration is completed.
395+
"""
396+
with session_scope(session=session_maker()) as session:
397+
migration_changes = _get_migration_changes(session)
398+
399+
for migrator_cls in NON_LIVE_DATA_MIGRATORS:
400+
migrator = migrator_cls(schema_status.start_version, migration_changes)
401+
migrator.migrate_all(instance, session_maker)
402+
403+
404+
def migrate_data_live(
405+
instance: Recorder,
406+
session_maker: Callable[[], Session],
407+
schema_status: SchemaValidationStatus,
408+
) -> None:
409+
"""Queue live schema migration tasks.
410+
411+
This must be called after live schema migration is completed.
412+
"""
413+
with session_scope(session=session_maker()) as session:
414+
migration_changes = _get_migration_changes(session)
415+
416+
for migrator_cls in LIVE_DATA_MIGRATORS:
417+
migrator = migrator_cls(schema_status.start_version, migration_changes)
418+
migrator.queue_migration(instance, session)
419+
420+
353421
def _create_index(
354422
session_maker: Callable[[], Session], table_name: str, index_name: str
355423
) -> None:
@@ -2196,29 +2264,24 @@ class DataMigrationStatus:
21962264
migration_done: bool
21972265

21982266

2199-
class BaseRunTimeMigration(ABC):
2200-
"""Base class for run time migrations."""
2267+
class BaseMigration(ABC):
2268+
"""Base class for migrations."""
22012269

22022270
index_to_drop: tuple[str, str] | None = None
22032271
required_schema_version = 0
22042272
migration_version = 1
22052273
migration_id: str
2206-
task = MigrationTask
22072274

22082275
def __init__(self, schema_version: int, migration_changes: dict[str, int]) -> None:
22092276
"""Initialize a new BaseRunTimeMigration."""
22102277
self.schema_version = schema_version
22112278
self.migration_changes = migration_changes
22122279

2213-
def do_migrate(self, instance: Recorder, session: Session) -> None:
2214-
"""Start migration if needed."""
2215-
if self.needs_migrate(instance, session):
2216-
instance.queue_task(self.task(self))
2217-
else:
2218-
self.migration_done(instance, session)
2219-
2220-
@retryable_database_job_method("migrate data")
2280+
@abstractmethod
22212281
def migrate_data(self, instance: Recorder) -> bool:
2282+
"""Migrate some data, return True if migration is completed."""
2283+
2284+
def _migrate_data(self, instance: Recorder) -> bool:
22222285
"""Migrate some data, returns True if migration is completed."""
22232286
status = self.migrate_data_impl(instance)
22242287
if status.migration_done:
@@ -2273,7 +2336,45 @@ def needs_migrate(self, instance: Recorder, session: Session) -> bool:
22732336
return needs_migrate.needs_migrate
22742337

22752338

2276-
class BaseRunTimeMigrationWithQuery(BaseRunTimeMigration):
2339+
class BaseOffLineMigration(BaseMigration):
2340+
"""Base class for off line migrations."""
2341+
2342+
def migrate_all(
2343+
self, instance: Recorder, session_maker: Callable[[], Session]
2344+
) -> None:
2345+
"""Migrate all data."""
2346+
with session_scope(session=session_maker()) as session:
2347+
if not self.needs_migrate(instance, session):
2348+
self.migration_done(instance, session)
2349+
return
2350+
while not self.migrate_data(instance):
2351+
pass
2352+
2353+
@database_job_retry_wrapper_method("migrate data", 10)
2354+
def migrate_data(self, instance: Recorder) -> bool:
2355+
"""Migrate some data, returns True if migration is completed."""
2356+
return self._migrate_data(instance)
2357+
2358+
2359+
class BaseRunTimeMigration(BaseMigration):
2360+
"""Base class for run time migrations."""
2361+
2362+
task = MigrationTask
2363+
2364+
def queue_migration(self, instance: Recorder, session: Session) -> None:
2365+
"""Start migration if needed."""
2366+
if self.needs_migrate(instance, session):
2367+
instance.queue_task(self.task(self))
2368+
else:
2369+
self.migration_done(instance, session)
2370+
2371+
@retryable_database_job_method("migrate data")
2372+
def migrate_data(self, instance: Recorder) -> bool:
2373+
"""Migrate some data, returns True if migration is completed."""
2374+
return self._migrate_data(instance)
2375+
2376+
2377+
class BaseMigrationWithQuery(BaseMigration):
22772378
"""Base class for run time migrations."""
22782379

22792380
@abstractmethod
@@ -2290,7 +2391,7 @@ def needs_migrate_impl(
22902391
)
22912392

22922393

2293-
class StatesContextIDMigration(BaseRunTimeMigrationWithQuery):
2394+
class StatesContextIDMigration(BaseMigrationWithQuery, BaseOffLineMigration):
22942395
"""Migration to migrate states context_ids to binary format."""
22952396

22962397
required_schema_version = CONTEXT_ID_AS_BINARY_SCHEMA_VERSION
@@ -2333,7 +2434,7 @@ def needs_migrate_query(self) -> StatementLambdaElement:
23332434
return has_states_context_ids_to_migrate()
23342435

23352436

2336-
class EventsContextIDMigration(BaseRunTimeMigrationWithQuery):
2437+
class EventsContextIDMigration(BaseMigrationWithQuery, BaseOffLineMigration):
23372438
"""Migration to migrate events context_ids to binary format."""
23382439

23392440
required_schema_version = CONTEXT_ID_AS_BINARY_SCHEMA_VERSION
@@ -2376,7 +2477,7 @@ def needs_migrate_query(self) -> StatementLambdaElement:
23762477
return has_events_context_ids_to_migrate()
23772478

23782479

2379-
class EventTypeIDMigration(BaseRunTimeMigrationWithQuery):
2480+
class EventTypeIDMigration(BaseMigrationWithQuery, BaseRunTimeMigration):
23802481
"""Migration to migrate event_type to event_type_ids."""
23812482

23822483
required_schema_version = EVENT_TYPE_IDS_SCHEMA_VERSION
@@ -2454,7 +2555,7 @@ def needs_migrate_query(self) -> StatementLambdaElement:
24542555
return has_event_type_to_migrate()
24552556

24562557

2457-
class EntityIDMigration(BaseRunTimeMigrationWithQuery):
2558+
class EntityIDMigration(BaseMigrationWithQuery, BaseRunTimeMigration):
24582559
"""Migration to migrate entity_ids to states_meta."""
24592560

24602561
required_schema_version = STATES_META_SCHEMA_VERSION
@@ -2542,7 +2643,7 @@ def migration_done(self, instance: Recorder, session: Session) -> None:
25422643
instance.states_meta_manager.active = True
25432644
with contextlib.suppress(SQLAlchemyError):
25442645
migrate = EntityIDPostMigration(self.schema_version, self.migration_changes)
2545-
migrate.do_migrate(instance, session)
2646+
migrate.queue_migration(instance, session)
25462647

25472648
def needs_migrate_query(self) -> StatementLambdaElement:
25482649
"""Check if the data is migrated."""
@@ -2631,7 +2732,7 @@ def needs_migrate_impl(
26312732
return DataMigrationStatus(needs_migrate=False, migration_done=True)
26322733

26332734

2634-
class EntityIDPostMigration(BaseRunTimeMigrationWithQuery):
2735+
class EntityIDPostMigration(BaseMigrationWithQuery, BaseRunTimeMigration):
26352736
"""Migration to remove old entity_id strings from states."""
26362737

26372738
migration_id = "entity_id_post_migration"
@@ -2648,9 +2749,19 @@ def needs_migrate_query(self) -> StatementLambdaElement:
26482749
return has_used_states_entity_ids()
26492750

26502751

2651-
def _mark_migration_done(
2652-
session: Session, migration: type[BaseRunTimeMigration]
2653-
) -> None:
2752+
NON_LIVE_DATA_MIGRATORS = (
2753+
StatesContextIDMigration, # Introduced in HA Core 2023.4
2754+
EventsContextIDMigration, # Introduced in HA Core 2023.4
2755+
)
2756+
2757+
LIVE_DATA_MIGRATORS = (
2758+
EventTypeIDMigration,
2759+
EntityIDMigration,
2760+
EventIDPostMigration,
2761+
)
2762+
2763+
2764+
def _mark_migration_done(session: Session, migration: type[BaseMigration]) -> None:
26542765
"""Mark a migration as done in the database."""
26552766
session.merge(
26562767
MigrationChanges(

0 commit comments

Comments
 (0)