|
| 1 | +import logging |
| 2 | + |
| 3 | +from mpt_tool.enums import MigrationStatusEnum, MigrationTypeEnum |
| 4 | +from mpt_tool.managers import FileMigrationManager, StateManager, StateManagerFactory |
| 5 | +from mpt_tool.managers.errors import LoadMigrationError, MigrationFolderError, StateNotFoundError |
| 6 | +from mpt_tool.migration.base import BaseMigration |
| 7 | +from mpt_tool.models import Migration, MigrationFile |
| 8 | +from mpt_tool.use_cases.errors import RunMigrationError |
| 9 | + |
| 10 | +logger = logging.getLogger(__name__) |
| 11 | + |
| 12 | + |
| 13 | +class RunSingleMigrationUseCase: |
| 14 | + """Use case for running a single migration.""" |
| 15 | + |
| 16 | + def __init__( |
| 17 | + self, |
| 18 | + file_migration_manager: FileMigrationManager | None = None, |
| 19 | + state_manager: StateManager | None = None, |
| 20 | + ): |
| 21 | + self.file_migration_manager = file_migration_manager or FileMigrationManager() |
| 22 | + self.state_manager = state_manager or StateManagerFactory.get_instance() |
| 23 | + |
| 24 | + def execute(self, migration_id: str, migration_type: MigrationTypeEnum) -> None: |
| 25 | + """Run one migration by id and type. |
| 26 | +
|
| 27 | + Args: |
| 28 | + migration_id: The migration id to run. |
| 29 | + migration_type: The expected migration type. |
| 30 | +
|
| 31 | + Raises: |
| 32 | + RunMigrationError: If an error occurs during migration execution. |
| 33 | + """ |
| 34 | + migration_file = self._get_migration_file(migration_id) |
| 35 | + migration_instance = self._get_migration_instance_by_type(migration_file, migration_type) |
| 36 | + state = self._get_or_create_state( |
| 37 | + migration_file.migration_id, migration_type, migration_file.order_id |
| 38 | + ) |
| 39 | + if state.applied_at is not None: |
| 40 | + raise RunMigrationError(f"Migration {migration_id} already applied") |
| 41 | + |
| 42 | + logger.info("Running migration: %s", migration_file.migration_id) |
| 43 | + self._save_state(state, status=MigrationStatusEnum.RUNNING) |
| 44 | + try: |
| 45 | + migration_instance.run() |
| 46 | + except Exception as error: |
| 47 | + self._save_state(state, status=MigrationStatusEnum.FAILED) |
| 48 | + raise RunMigrationError( |
| 49 | + f"Migration {migration_file.migration_id} failed: {error!s}" |
| 50 | + ) from error |
| 51 | + |
| 52 | + self._save_state(state, status=MigrationStatusEnum.APPLIED) |
| 53 | + |
| 54 | + def _get_migration_file(self, migration_id: str) -> MigrationFile: |
| 55 | + migration_files = self._get_validated_migrations() |
| 56 | + migration_file = next( |
| 57 | + (migration for migration in migration_files if migration.migration_id == migration_id), |
| 58 | + None, |
| 59 | + ) |
| 60 | + if migration_file is None: |
| 61 | + raise RunMigrationError(f"Migration {migration_id} not found") |
| 62 | + |
| 63 | + return migration_file |
| 64 | + |
| 65 | + def _get_migration_instance_by_type( |
| 66 | + self, migration_file: MigrationFile, migration_type: MigrationTypeEnum |
| 67 | + ) -> BaseMigration: |
| 68 | + try: |
| 69 | + migration_instance = self.file_migration_manager.load_migration(migration_file) |
| 70 | + except LoadMigrationError as error: |
| 71 | + raise RunMigrationError(str(error)) from error |
| 72 | + |
| 73 | + if migration_instance.type != migration_type: |
| 74 | + raise RunMigrationError( |
| 75 | + f"Migration {migration_file.migration_id} is not a {migration_type} migration" |
| 76 | + ) |
| 77 | + |
| 78 | + return migration_instance |
| 79 | + |
| 80 | + def _get_validated_migrations(self) -> tuple[MigrationFile, ...]: |
| 81 | + try: |
| 82 | + return self.file_migration_manager.validate() |
| 83 | + except MigrationFolderError as error: |
| 84 | + raise RunMigrationError(str(error)) from error |
| 85 | + |
| 86 | + def _get_or_create_state( |
| 87 | + self, migration_id: str, migration_type: MigrationTypeEnum, order_id: int |
| 88 | + ) -> Migration: |
| 89 | + try: |
| 90 | + state = self.state_manager.get_by_id(migration_id) |
| 91 | + except StateNotFoundError: |
| 92 | + state = self.state_manager.new(migration_id, migration_type, order_id) |
| 93 | + |
| 94 | + return state |
| 95 | + |
| 96 | + def _save_state(self, state: Migration, status: MigrationStatusEnum) -> None: |
| 97 | + match status: |
| 98 | + case MigrationStatusEnum.APPLIED: |
| 99 | + state.applied() |
| 100 | + case MigrationStatusEnum.FAILED: |
| 101 | + state.failed() |
| 102 | + case MigrationStatusEnum.RUNNING: |
| 103 | + state.start() |
| 104 | + |
| 105 | + self.state_manager.save_state(state) |
0 commit comments