From 0b5c7cd02db148c15f46fef2b4547e9c38944d39 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Susana=20V=C3=A1zquez?= <3016283+svazquezco@users.noreply.github.com> Date: Wed, 18 Feb 2026 18:44:25 +0100 Subject: [PATCH] feat: allow to run a single migration --- docs/PROJECT_DESCRIPTION.md | 13 ++++ mpt_tool/cli.py | 3 + mpt_tool/commands/data.py | 9 ++- mpt_tool/commands/factory.py | 8 +- mpt_tool/commands/schema.py | 9 ++- mpt_tool/commands/validators.py | 9 ++- mpt_tool/services/__init__.py | 0 mpt_tool/services/migration_state.py | 36 +++++++++ mpt_tool/use_cases/__init__.py | 2 + mpt_tool/use_cases/apply_migration.py | 7 +- mpt_tool/use_cases/run_migrations.py | 36 +++------ mpt_tool/use_cases/run_single_migration.py | 87 ++++++++++++++++++++++ tests/cli/test_cli.py | 8 ++ tests/cli/test_cli_local_storage.py | 71 ++++++++++++++++++ tests/services/__init__.py | 0 tests/services/test_migration_state.py | 61 +++++++++++++++ 16 files changed, 323 insertions(+), 36 deletions(-) create mode 100644 mpt_tool/services/__init__.py create mode 100644 mpt_tool/services/migration_state.py create mode 100644 mpt_tool/use_cases/run_single_migration.py create mode 100644 tests/services/__init__.py create mode 100644 tests/services/test_migration_state.py diff --git a/docs/PROJECT_DESCRIPTION.md b/docs/PROJECT_DESCRIPTION.md index 01fe351..ca9c5c6 100644 --- a/docs/PROJECT_DESCRIPTION.md +++ b/docs/PROJECT_DESCRIPTION.md @@ -191,6 +191,14 @@ Error running check command: Duplicate migration_id found in migrations: 2026011 ```bash mpt-service-cli migrate --schema ``` +- **Run one specific data migration:** + ```bash + mpt-service-cli migrate --data MIGRATION_ID + ``` +- **Run one specific schema migration:** + ```bash + mpt-service-cli migrate --schema MIGRATION_ID + ``` Migrations are executed in order based on their order_id (timestamp). The tool automatically: - Validates the migration folder structure @@ -199,6 +207,11 @@ Migrations are executed in order based on their order_id (timestamp). The tool a - Logs migration progress - Handles errors gracefully and updates state accordingly +When running a single migration (`--data MIGRATION_ID` or `--schema MIGRATION_ID`), the tool: +- Fails if `MIGRATION_ID` does not exist +- Fails if the migration type does not match the selected flag +- Fails if the migration was already applied + **Migration State File (`.migrations-state.json`):** ```json { diff --git a/mpt_tool/cli.py b/mpt_tool/cli.py index 93b2490..1246cea 100644 --- a/mpt_tool/cli.py +++ b/mpt_tool/cli.py @@ -51,6 +51,9 @@ def migrate( # noqa: WPS211 ), ] = None, list: Annotated[bool, typer.Option("--list", help="List all migrations.")] = False, # noqa: A002, FBT002 + migration_id: Annotated[ + str | None, typer.Argument(help="Optional migration ID for --data or --schema.") + ] = None, ) -> None: """Migrate command.""" try: diff --git a/mpt_tool/commands/data.py b/mpt_tool/commands/data.py index 917f70d..f5688fe 100644 --- a/mpt_tool/commands/data.py +++ b/mpt_tool/commands/data.py @@ -2,12 +2,15 @@ from mpt_tool.commands.base import BaseCommand from mpt_tool.enums import MigrationTypeEnum -from mpt_tool.use_cases import RunMigrationsUseCase +from mpt_tool.use_cases import RunMigrationsUseCase, RunSingleMigrationUseCase class DataCommand(BaseCommand): """Runs all data migrations.""" + def __init__(self, migration_id: str | None = None) -> None: + self._migration_id = migration_id + @override @property def start_message(self) -> str: @@ -20,4 +23,8 @@ def success_message(self) -> str: @override def run(self) -> None: + if self._migration_id: + RunSingleMigrationUseCase().execute(self._migration_id, MigrationTypeEnum.DATA) + return + RunMigrationsUseCase().execute(MigrationTypeEnum.DATA) diff --git a/mpt_tool/commands/factory.py b/mpt_tool/commands/factory.py index 9f888a5..41bed1b 100644 --- a/mpt_tool/commands/factory.py +++ b/mpt_tool/commands/factory.py @@ -35,10 +35,10 @@ def get_instance(cls, param_data: dict[str, bool | str | None]) -> BaseCommand: return InitCommand() case {"check": True}: return CheckCommand() - case {"data": True}: - return DataCommand() - case {"schema": True}: - return SchemaCommand() + case {"data": True, "migration_id": migration_id}: + return DataCommand(migration_id=cast(str | None, migration_id)) + case {"schema": True, "migration_id": migration_id}: + return SchemaCommand(migration_id=cast(str | None, migration_id)) case {"manual": manual_value} if manual_value is not None: return ManualCommand(migration_id=cast(str, manual_value)) case {"new_schema": new_schema_value} if new_schema_value is not None: diff --git a/mpt_tool/commands/schema.py b/mpt_tool/commands/schema.py index ccd46b5..67f78ee 100644 --- a/mpt_tool/commands/schema.py +++ b/mpt_tool/commands/schema.py @@ -2,12 +2,15 @@ from mpt_tool.commands.base import BaseCommand from mpt_tool.enums import MigrationTypeEnum -from mpt_tool.use_cases import RunMigrationsUseCase +from mpt_tool.use_cases import RunMigrationsUseCase, RunSingleMigrationUseCase class SchemaCommand(BaseCommand): """Runs all schema migrations.""" + def __init__(self, migration_id: str | None = None) -> None: + self._migration_id = migration_id + @override @property def start_message(self) -> str: @@ -20,4 +23,8 @@ def success_message(self) -> str: @override def run(self) -> None: + if self._migration_id: + RunSingleMigrationUseCase().execute(self._migration_id, MigrationTypeEnum.SCHEMA) + return + RunMigrationsUseCase().execute(MigrationTypeEnum.SCHEMA) diff --git a/mpt_tool/commands/validators.py b/mpt_tool/commands/validators.py index b2b3cd2..d596a19 100644 --- a/mpt_tool/commands/validators.py +++ b/mpt_tool/commands/validators.py @@ -16,7 +16,14 @@ def validate(cls, command_params: dict[str, Any]) -> None: Raises: BadParameterError: When none or more than one param is used """ - param_counts = sum(1 for param_value in command_params.values() if param_value) + migration_id = command_params.get("migration_id") + command_values = { + key: param_value for key, param_value in command_params.items() if key != "migration_id" + } + param_counts = sum(1 for param_value in command_values.values() if param_value) + if migration_id and not command_params.get("data") and not command_params.get("schema"): + raise BadParameterError("MIGRATION_ID can only be used with --data or --schema.") + if not param_counts: raise BadParameterError("At least one param must be used.") diff --git a/mpt_tool/services/__init__.py b/mpt_tool/services/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/mpt_tool/services/migration_state.py b/mpt_tool/services/migration_state.py new file mode 100644 index 0000000..08a6fcd --- /dev/null +++ b/mpt_tool/services/migration_state.py @@ -0,0 +1,36 @@ +from mpt_tool.enums import MigrationStatusEnum, MigrationTypeEnum +from mpt_tool.managers import StateManager +from mpt_tool.managers.errors import StateNotFoundError +from mpt_tool.models import Migration + + +class MigrationStateService: + """Shared migration state operations for migration use cases.""" + + def __init__(self, state_manager: StateManager) -> None: + self._state_manager = state_manager + + def get_or_create_state( + self, migration_id: str, migration_type: MigrationTypeEnum, order_id: int + ) -> Migration: + """Return existing migration state, creating it if needed.""" + try: + state = self._state_manager.get_by_id(migration_id) + except StateNotFoundError: + state = self._state_manager.new(migration_id, migration_type, order_id) + + return state + + def save_state(self, state: Migration, status: MigrationStatusEnum) -> None: + """Apply status transition and persist migration state.""" + match status: + case MigrationStatusEnum.APPLIED: + state.applied() + case MigrationStatusEnum.FAILED: + state.failed() + case MigrationStatusEnum.MANUAL_APPLIED: + state.manual() + case MigrationStatusEnum.RUNNING: + state.start() + + self._state_manager.save_state(state) diff --git a/mpt_tool/use_cases/__init__.py b/mpt_tool/use_cases/__init__.py index 5e54d3f..ca45d97 100644 --- a/mpt_tool/use_cases/__init__.py +++ b/mpt_tool/use_cases/__init__.py @@ -4,6 +4,7 @@ from mpt_tool.use_cases.list_migrations import ListMigrationsUseCase from mpt_tool.use_cases.new_migration import NewMigrationUseCase from mpt_tool.use_cases.run_migrations import RunMigrationsUseCase +from mpt_tool.use_cases.run_single_migration import RunSingleMigrationUseCase __all__ = [ "ApplyMigrationUseCase", @@ -12,4 +13,5 @@ "ListMigrationsUseCase", "NewMigrationUseCase", "RunMigrationsUseCase", + "RunSingleMigrationUseCase", ] diff --git a/mpt_tool/use_cases/apply_migration.py b/mpt_tool/use_cases/apply_migration.py index bc2e96d..f08c06d 100644 --- a/mpt_tool/use_cases/apply_migration.py +++ b/mpt_tool/use_cases/apply_migration.py @@ -1,5 +1,7 @@ +from mpt_tool.enums import MigrationStatusEnum from mpt_tool.managers import FileMigrationManager, StateManager, StateManagerFactory from mpt_tool.managers.errors import MigrationFolderError, StateNotFoundError +from mpt_tool.services.migration_state import MigrationStateService from mpt_tool.use_cases.errors import ApplyMigrationError @@ -10,9 +12,11 @@ def __init__( self, file_migration_manager: FileMigrationManager | None = None, state_manager: StateManager | None = None, + state_service: MigrationStateService | None = None, ): self.file_migration_manager = file_migration_manager or FileMigrationManager() self.state_manager = state_manager or StateManagerFactory.get_instance() + self.state_service = state_service or MigrationStateService(self.state_manager) def execute(self, migration_id: str) -> None: """Apply a migration without running it.""" @@ -38,5 +42,4 @@ def execute(self, migration_id: str) -> None: if state.applied_at is not None: raise ApplyMigrationError(f"Migration {migration_id} already applied") - state.manual() - self.state_manager.save_state(state) + self.state_service.save_state(state, status=MigrationStatusEnum.MANUAL_APPLIED) diff --git a/mpt_tool/use_cases/run_migrations.py b/mpt_tool/use_cases/run_migrations.py index 4e22f81..bf0ca20 100644 --- a/mpt_tool/use_cases/run_migrations.py +++ b/mpt_tool/use_cases/run_migrations.py @@ -2,9 +2,10 @@ from mpt_tool.enums import MigrationStatusEnum, MigrationTypeEnum from mpt_tool.managers import FileMigrationManager, StateManager, StateManagerFactory -from mpt_tool.managers.errors import LoadMigrationError, MigrationFolderError, StateNotFoundError +from mpt_tool.managers.errors import LoadMigrationError, MigrationFolderError from mpt_tool.migration.base import BaseMigration -from mpt_tool.models import Migration, MigrationFile +from mpt_tool.models import MigrationFile +from mpt_tool.services.migration_state import MigrationStateService from mpt_tool.use_cases.errors import RunMigrationError logger = logging.getLogger(__name__) @@ -17,9 +18,11 @@ def __init__( self, file_migration_manager: FileMigrationManager | None = None, state_manager: StateManager | None = None, + state_service: MigrationStateService | None = None, ): self.file_migration_manager = file_migration_manager or FileMigrationManager() self.state_manager = state_manager or StateManagerFactory.get_instance() + self.state_service = state_service or MigrationStateService(self.state_manager) def execute(self, migration_type: MigrationTypeEnum) -> None: # noqa: WPS231 """Run all migrations of a given type. @@ -42,7 +45,7 @@ def execute(self, migration_type: MigrationTypeEnum) -> None: # noqa: WPS231 if migration_instance is None: continue - state = self._get_or_create_state( + state = self.state_service.get_or_create_state( migration_file.migration_id, migration_type, migration_file.order_id ) if state.applied_at is not None: @@ -50,18 +53,18 @@ def execute(self, migration_type: MigrationTypeEnum) -> None: # noqa: WPS231 continue logger.info("Running migration: %s", migration_file.migration_id) - self._save_state(state, status=MigrationStatusEnum.RUNNING) + self.state_service.save_state(state, status=MigrationStatusEnum.RUNNING) try: migration_instance.run() # We catch all exceptions here to ensure the state is updated # and the flow is not interrupted abruptly except Exception as error: - self._save_state(state, status=MigrationStatusEnum.FAILED) + self.state_service.save_state(state, status=MigrationStatusEnum.FAILED) raise RunMigrationError( f"Migration {migration_file.migration_id} failed: {error!s}" ) from error - self._save_state(state, status=MigrationStatusEnum.APPLIED) + self.state_service.save_state(state, status=MigrationStatusEnum.APPLIED) def _get_migration_instance_by_type( self, migration_file: MigrationFile, migration_type: MigrationTypeEnum @@ -75,24 +78,3 @@ def _get_migration_instance_by_type( return None return migration_instance - - def _get_or_create_state( - self, migration_id: str, migration_type: MigrationTypeEnum, order_id: int - ) -> Migration: - try: - state = self.state_manager.get_by_id(migration_id) - except StateNotFoundError: - state = self.state_manager.new(migration_id, migration_type, order_id) - - return state - - def _save_state(self, state: Migration, status: MigrationStatusEnum) -> None: - match status: - case MigrationStatusEnum.APPLIED: - state.applied() - case MigrationStatusEnum.FAILED: - state.failed() - case MigrationStatusEnum.RUNNING: - state.start() - - self.state_manager.save_state(state) diff --git a/mpt_tool/use_cases/run_single_migration.py b/mpt_tool/use_cases/run_single_migration.py new file mode 100644 index 0000000..92bb577 --- /dev/null +++ b/mpt_tool/use_cases/run_single_migration.py @@ -0,0 +1,87 @@ +import logging + +from mpt_tool.enums import MigrationStatusEnum, MigrationTypeEnum +from mpt_tool.managers import FileMigrationManager, StateManager, StateManagerFactory +from mpt_tool.managers.errors import LoadMigrationError, MigrationFolderError +from mpt_tool.migration.base import BaseMigration +from mpt_tool.models import MigrationFile +from mpt_tool.services.migration_state import MigrationStateService +from mpt_tool.use_cases.errors import RunMigrationError + +logger = logging.getLogger(__name__) + + +class RunSingleMigrationUseCase: + """Use case for running a single migration.""" + + def __init__( + self, + file_migration_manager: FileMigrationManager | None = None, + state_manager: StateManager | None = None, + state_service: MigrationStateService | None = None, + ): + self.file_migration_manager = file_migration_manager or FileMigrationManager() + self.state_manager = state_manager or StateManagerFactory.get_instance() + self.state_service = state_service or MigrationStateService(self.state_manager) + + def execute(self, migration_id: str, migration_type: MigrationTypeEnum) -> None: + """Run one migration by id and type. + + Args: + migration_id: The migration id to run. + migration_type: The expected migration type. + + Raises: + RunMigrationError: If an error occurs during migration execution. + """ + migration_file = self._get_migration_file(migration_id) + migration_instance = self._get_migration_instance_by_type(migration_file, migration_type) + state = self.state_service.get_or_create_state( + migration_file.migration_id, migration_type, migration_file.order_id + ) + if state.applied_at is not None: + raise RunMigrationError(f"Migration {migration_id} already applied") + + logger.info("Running migration: %s", migration_file.migration_id) + self.state_service.save_state(state, status=MigrationStatusEnum.RUNNING) + try: + migration_instance.run() + # We catch all exceptions here to ensure the state is updated + # and the flow is not interrupted abruptly + except Exception as error: + self.state_service.save_state(state, status=MigrationStatusEnum.FAILED) + raise RunMigrationError( + f"Migration {migration_file.migration_id} failed: {error!s}" + ) from error + + self.state_service.save_state(state, status=MigrationStatusEnum.APPLIED) + + def _get_migration_file(self, migration_id: str) -> MigrationFile: + try: + migration_files = self.file_migration_manager.validate() + except MigrationFolderError as error: + raise RunMigrationError(str(error)) from error + + migration_file = next( + (migration for migration in migration_files if migration.migration_id == migration_id), + None, + ) + if migration_file is None: + raise RunMigrationError(f"Migration {migration_id} not found") + + return migration_file + + def _get_migration_instance_by_type( + self, migration_file: MigrationFile, migration_type: MigrationTypeEnum + ) -> BaseMigration: + try: + migration_instance = self.file_migration_manager.load_migration(migration_file) + except LoadMigrationError as error: + raise RunMigrationError(str(error)) from error + + if migration_instance.type != migration_type: + raise RunMigrationError( + f"Migration {migration_file.migration_id} is not a {migration_type} migration" + ) + + return migration_instance diff --git a/tests/cli/test_cli.py b/tests/cli/test_cli.py index 90329f7..a50b69e 100644 --- a/tests/cli/test_cli.py +++ b/tests/cli/test_cli.py @@ -28,6 +28,14 @@ def test_migrate_command_multiple_params_error(runner): assert "Only one param can be used." in result.output +def test_migrate_migration_id_without_type_error(runner): + result = runner.invoke(app, ["migrate", "fake_data_file_name"]) + + assert result.exit_code == 2, result.output + assert "Invalid value for migrate:" in result.output + assert "MIGRATION_ID can only be used with --data or" in result.output + + def test_migrate_data_duplicate_migration(runner, migration_folder): (migration_folder / "20250406020202_fake_file_name.py").touch() (migration_folder / "20260107010101_fake_file_name.py").touch() diff --git a/tests/cli/test_cli_local_storage.py b/tests/cli/test_cli_local_storage.py index ad8fd70..7722ec9 100644 --- a/tests/cli/test_cli_local_storage.py +++ b/tests/cli/test_cli_local_storage.py @@ -51,6 +51,27 @@ def test_migrate_data_migration(migration_state_file, runner, log): assert "Migrations completed successfully." in result.output +@freeze_time("2025-04-06 13:10:30") +@pytest.mark.usefixtures("data_migration_file", "schema_migration_file") +def test_migrate_data_single_migration(migration_state_file, runner, log): + result = runner.invoke(app, ["migrate", "--data", "fake_data_file_name"]) + + assert result.exit_code == 0, result.output + migration_state_data = json.loads(migration_state_file.read_text(encoding="utf-8")) + assert migration_state_data == { + "fake_data_file_name": { + "migration_id": "fake_data_file_name", + "order_id": 20250406020202, + "type": "data", + "started_at": "2025-04-06T13:10:30+00:00", + "applied_at": "2025-04-06T13:10:30+00:00", + } + } + assert "Running data migrations..." in result.output + assert "Running migration: fake_data_file_name" in log.text + assert "Migrations completed successfully." in result.output + + @freeze_time("2025-04-06 13:00:00") @pytest.mark.usefixtures("applied_migration") def test_migrate_skip_migration_already_applied(migration_state_file, runner, log): @@ -72,6 +93,16 @@ def test_migrate_skip_migration_already_applied(migration_state_file, runner, lo assert "Migrations completed successfully." in result.output +@pytest.mark.usefixtures("applied_migration") +def test_migrate_data_single_already_applied(runner): + result = runner.invoke(app, ["migrate", "--data", "fake_data_file_name"]) + + assert result.exit_code == 1, result.output + assert ( + "Error running data command: Migration fake_data_file_name already applied" in result.output + ) + + @pytest.mark.usefixtures("data_migration_file_error") def test_migrate_data_run_script_fail(migration_state_file, runner, log): result = runner.invoke(app, ["migrate", "--data"]) @@ -90,6 +121,46 @@ def test_migrate_data_run_script_fail(migration_state_file, runner, log): assert "Migration fake_error_file_name failed: Fake Error" in result.output +@pytest.mark.usefixtures("data_migration_file") +def test_migrate_data_single_migration_not_found(runner): + result = runner.invoke(app, ["migrate", "--data", "not_existing_migration"]) + + assert result.exit_code == 1, result.output + assert "Error running data command: Migration not_existing_migration not found" in result.output + + +@pytest.mark.usefixtures("data_migration_file", "schema_migration_file") +def test_migrate_data_single_migration_wrong_type(runner): + result = runner.invoke(app, ["migrate", "--data", "fake_schema_file_name"]) + + assert result.exit_code == 1, result.output + assert ( + "Error running data command: Migration fake_schema_file_name is not a data migration" + in result.output + ) + + +@freeze_time("2025-04-06 13:10:30") +@pytest.mark.usefixtures("data_migration_file", "schema_migration_file") +def test_migrate_schema_single_migration(migration_state_file, runner, log): + result = runner.invoke(app, ["migrate", "--schema", "fake_schema_file_name"]) + + assert result.exit_code == 0, result.output + migration_state_data = json.loads(migration_state_file.read_text(encoding="utf-8")) + assert migration_state_data == { + "fake_schema_file_name": { + "migration_id": "fake_schema_file_name", + "order_id": 20260101010101, + "type": "schema", + "started_at": "2025-04-06T13:10:30+00:00", + "applied_at": "2025-04-06T13:10:30+00:00", + } + } + assert "Running schema migrations..." in result.output + assert "Running migration: fake_schema_file_name" in log.text + assert "schema migrations applied successfully." in result.output + + @freeze_time("2025-04-06 10:11:24") @pytest.mark.usefixtures("data_migration_file") def test_migrate_manual(migration_state_file, runner): diff --git a/tests/services/__init__.py b/tests/services/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/services/test_migration_state.py b/tests/services/test_migration_state.py new file mode 100644 index 0000000..63d8045 --- /dev/null +++ b/tests/services/test_migration_state.py @@ -0,0 +1,61 @@ +import pytest + +from mpt_tool.enums import MigrationStatusEnum, MigrationTypeEnum +from mpt_tool.managers import StateManager +from mpt_tool.managers.errors import StateNotFoundError +from mpt_tool.models import Migration +from mpt_tool.services.migration_state import MigrationStateService + + +@pytest.fixture +def mock_state(): + return Migration(migration_id="fake_id", order_id=1024, type=MigrationTypeEnum.DATA) + + +def test_get_or_create_state_existing(mocker, mock_state): + state_manager = mocker.Mock(spec=StateManager) + state_manager.get_by_id.return_value = mock_state + service = MigrationStateService(state_manager) + + result = service.get_or_create_state( + migration_id="fake_id", migration_type=MigrationTypeEnum.DATA, order_id=1024 + ) + + assert result == mock_state + state_manager.get_by_id.assert_called_once_with("fake_id") + state_manager.new.assert_not_called() + + +def test_get_or_create_state_missing(mocker, mock_state): + state_manager = mocker.Mock(spec=StateManager) + state_manager.get_by_id.side_effect = StateNotFoundError("State not found") + state_manager.new.return_value = mock_state + service = MigrationStateService(state_manager) + + result = service.get_or_create_state( + migration_id="fake_id", migration_type=MigrationTypeEnum.DATA, order_id=1024 + ) + + assert result == mock_state + state_manager.get_by_id.assert_called_once_with("fake_id") + state_manager.new.assert_called_once_with("fake_id", MigrationTypeEnum.DATA, 1024) + + +@pytest.mark.parametrize( + ("status", "expected_method_call"), + [ + (MigrationStatusEnum.APPLIED, "applied"), + (MigrationStatusEnum.FAILED, "failed"), + (MigrationStatusEnum.MANUAL_APPLIED, "manual"), + (MigrationStatusEnum.RUNNING, "start"), + ], +) +def test_save_state(status, expected_method_call, mocker, mock_state): + mock_method = mocker.patch.object(Migration, expected_method_call, autospec=True) + state_manager = mocker.Mock(spec=StateManager) + service = MigrationStateService(state_manager) + + service.save_state(mock_state, status=status) # act + + mock_method.assert_called_once() + state_manager.save_state.assert_called_once_with(mock_state)