Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions docs/PROJECT_DESCRIPTION.md
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,14 @@ Error running check command: Duplicate migration_id found in migrations: 2026011
```bash
mpt-service-cli migrate --schema
```
- **Run one specific data migration:**
```bash
mpt-service-cli migrate --data MIGRATION_ID
```
- **Run one specific schema migration:**
```bash
mpt-service-cli migrate --schema MIGRATION_ID
```
Migrations are executed in order based on their order_id (timestamp). The tool automatically:
- Validates the migration folder structure
Expand All @@ -199,6 +207,11 @@ Migrations are executed in order based on their order_id (timestamp). The tool a
- Logs migration progress
- Handles errors gracefully and updates state accordingly
When running a single migration (`--data MIGRATION_ID` or `--schema MIGRATION_ID`), the tool:
- Fails if `MIGRATION_ID` does not exist
- Fails if the migration type does not match the selected flag
- Fails if the migration was already applied
**Migration State File (`.migrations-state.json`):**
```json
{
Expand Down
3 changes: 3 additions & 0 deletions mpt_tool/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,9 @@ def migrate( # noqa: WPS211
),
] = None,
list: Annotated[bool, typer.Option("--list", help="List all migrations.")] = False, # noqa: A002, FBT002
migration_id: Annotated[
str | None, typer.Argument(help="Optional migration ID for --data or --schema.")
] = None,
) -> None:
"""Migrate command."""
try:
Expand Down
9 changes: 8 additions & 1 deletion mpt_tool/commands/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,15 @@

from mpt_tool.commands.base import BaseCommand
from mpt_tool.enums import MigrationTypeEnum
from mpt_tool.use_cases import RunMigrationsUseCase
from mpt_tool.use_cases import RunMigrationsUseCase, RunSingleMigrationUseCase


class DataCommand(BaseCommand):
"""Runs all data migrations."""

def __init__(self, migration_id: str | None = None) -> None:
self._migration_id = migration_id

@override
@property
def start_message(self) -> str:
Expand All @@ -20,4 +23,8 @@ def success_message(self) -> str:

@override
def run(self) -> None:
if self._migration_id:
RunSingleMigrationUseCase().execute(self._migration_id, MigrationTypeEnum.DATA)
return

RunMigrationsUseCase().execute(MigrationTypeEnum.DATA)
8 changes: 4 additions & 4 deletions mpt_tool/commands/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,10 @@ def get_instance(cls, param_data: dict[str, bool | str | None]) -> BaseCommand:
return InitCommand()
case {"check": True}:
return CheckCommand()
case {"data": True}:
return DataCommand()
case {"schema": True}:
return SchemaCommand()
case {"data": True, "migration_id": migration_id}:
return DataCommand(migration_id=cast(str | None, migration_id))
case {"schema": True, "migration_id": migration_id}:
return SchemaCommand(migration_id=cast(str | None, migration_id))
case {"manual": manual_value} if manual_value is not None:
return ManualCommand(migration_id=cast(str, manual_value))
case {"new_schema": new_schema_value} if new_schema_value is not None:
Expand Down
9 changes: 8 additions & 1 deletion mpt_tool/commands/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,15 @@

from mpt_tool.commands.base import BaseCommand
from mpt_tool.enums import MigrationTypeEnum
from mpt_tool.use_cases import RunMigrationsUseCase
from mpt_tool.use_cases import RunMigrationsUseCase, RunSingleMigrationUseCase


class SchemaCommand(BaseCommand):
"""Runs all schema migrations."""

def __init__(self, migration_id: str | None = None) -> None:
self._migration_id = migration_id

@override
@property
def start_message(self) -> str:
Expand All @@ -20,4 +23,8 @@ def success_message(self) -> str:

@override
def run(self) -> None:
if self._migration_id:
RunSingleMigrationUseCase().execute(self._migration_id, MigrationTypeEnum.SCHEMA)
return

RunMigrationsUseCase().execute(MigrationTypeEnum.SCHEMA)
9 changes: 8 additions & 1 deletion mpt_tool/commands/validators.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,14 @@ def validate(cls, command_params: dict[str, Any]) -> None:
Raises:
BadParameterError: When none or more than one param is used
"""
param_counts = sum(1 for param_value in command_params.values() if param_value)
migration_id = command_params.get("migration_id")
command_values = {
key: param_value for key, param_value in command_params.items() if key != "migration_id"
}
param_counts = sum(1 for param_value in command_values.values() if param_value)
if migration_id and not command_params.get("data") and not command_params.get("schema"):
raise BadParameterError("MIGRATION_ID can only be used with --data or --schema.")

if not param_counts:
raise BadParameterError("At least one param must be used.")

Expand Down
Empty file added mpt_tool/services/__init__.py
Empty file.
36 changes: 36 additions & 0 deletions mpt_tool/services/migration_state.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
from mpt_tool.enums import MigrationStatusEnum, MigrationTypeEnum
from mpt_tool.managers import StateManager
from mpt_tool.managers.errors import StateNotFoundError
from mpt_tool.models import Migration


class MigrationStateService:
"""Shared migration state operations for migration use cases."""

def __init__(self, state_manager: StateManager) -> None:
self._state_manager = state_manager

def get_or_create_state(
self, migration_id: str, migration_type: MigrationTypeEnum, order_id: int
) -> Migration:
"""Return existing migration state, creating it if needed."""
try:
state = self._state_manager.get_by_id(migration_id)
except StateNotFoundError:
state = self._state_manager.new(migration_id, migration_type, order_id)

return state

def save_state(self, state: Migration, status: MigrationStatusEnum) -> None:
"""Apply status transition and persist migration state."""
match status:
case MigrationStatusEnum.APPLIED:
state.applied()
case MigrationStatusEnum.FAILED:
state.failed()
case MigrationStatusEnum.MANUAL_APPLIED:
state.manual()
case MigrationStatusEnum.RUNNING:
state.start()

self._state_manager.save_state(state)
2 changes: 2 additions & 0 deletions mpt_tool/use_cases/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from mpt_tool.use_cases.list_migrations import ListMigrationsUseCase
from mpt_tool.use_cases.new_migration import NewMigrationUseCase
from mpt_tool.use_cases.run_migrations import RunMigrationsUseCase
from mpt_tool.use_cases.run_single_migration import RunSingleMigrationUseCase

__all__ = [
"ApplyMigrationUseCase",
Expand All @@ -12,4 +13,5 @@
"ListMigrationsUseCase",
"NewMigrationUseCase",
"RunMigrationsUseCase",
"RunSingleMigrationUseCase",
]
7 changes: 5 additions & 2 deletions mpt_tool/use_cases/apply_migration.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
from mpt_tool.enums import MigrationStatusEnum
from mpt_tool.managers import FileMigrationManager, StateManager, StateManagerFactory
from mpt_tool.managers.errors import MigrationFolderError, StateNotFoundError
from mpt_tool.services.migration_state import MigrationStateService
from mpt_tool.use_cases.errors import ApplyMigrationError


Expand All @@ -10,9 +12,11 @@ def __init__(
self,
file_migration_manager: FileMigrationManager | None = None,
state_manager: StateManager | None = None,
state_service: MigrationStateService | None = None,
):
self.file_migration_manager = file_migration_manager or FileMigrationManager()
self.state_manager = state_manager or StateManagerFactory.get_instance()
self.state_service = state_service or MigrationStateService(self.state_manager)

def execute(self, migration_id: str) -> None:
"""Apply a migration without running it."""
Expand All @@ -38,5 +42,4 @@ def execute(self, migration_id: str) -> None:
if state.applied_at is not None:
raise ApplyMigrationError(f"Migration {migration_id} already applied")

state.manual()
self.state_manager.save_state(state)
self.state_service.save_state(state, status=MigrationStatusEnum.MANUAL_APPLIED)
36 changes: 9 additions & 27 deletions mpt_tool/use_cases/run_migrations.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,10 @@

from mpt_tool.enums import MigrationStatusEnum, MigrationTypeEnum
from mpt_tool.managers import FileMigrationManager, StateManager, StateManagerFactory
from mpt_tool.managers.errors import LoadMigrationError, MigrationFolderError, StateNotFoundError
from mpt_tool.managers.errors import LoadMigrationError, MigrationFolderError
from mpt_tool.migration.base import BaseMigration
from mpt_tool.models import Migration, MigrationFile
from mpt_tool.models import MigrationFile
from mpt_tool.services.migration_state import MigrationStateService
from mpt_tool.use_cases.errors import RunMigrationError

logger = logging.getLogger(__name__)
Expand All @@ -17,9 +18,11 @@ def __init__(
self,
file_migration_manager: FileMigrationManager | None = None,
state_manager: StateManager | None = None,
state_service: MigrationStateService | None = None,
):
self.file_migration_manager = file_migration_manager or FileMigrationManager()
self.state_manager = state_manager or StateManagerFactory.get_instance()
self.state_service = state_service or MigrationStateService(self.state_manager)

def execute(self, migration_type: MigrationTypeEnum) -> None: # noqa: WPS231
"""Run all migrations of a given type.
Expand All @@ -42,26 +45,26 @@ def execute(self, migration_type: MigrationTypeEnum) -> None: # noqa: WPS231
if migration_instance is None:
continue

state = self._get_or_create_state(
state = self.state_service.get_or_create_state(
migration_file.migration_id, migration_type, migration_file.order_id
)
if state.applied_at is not None:
logger.debug("Skipping applied migration: %s", migration_file.migration_id)
continue

logger.info("Running migration: %s", migration_file.migration_id)
self._save_state(state, status=MigrationStatusEnum.RUNNING)
self.state_service.save_state(state, status=MigrationStatusEnum.RUNNING)
try:
migration_instance.run()
# We catch all exceptions here to ensure the state is updated
# and the flow is not interrupted abruptly
except Exception as error:
self._save_state(state, status=MigrationStatusEnum.FAILED)
self.state_service.save_state(state, status=MigrationStatusEnum.FAILED)
raise RunMigrationError(
f"Migration {migration_file.migration_id} failed: {error!s}"
) from error

self._save_state(state, status=MigrationStatusEnum.APPLIED)
self.state_service.save_state(state, status=MigrationStatusEnum.APPLIED)

def _get_migration_instance_by_type(
self, migration_file: MigrationFile, migration_type: MigrationTypeEnum
Expand All @@ -75,24 +78,3 @@ def _get_migration_instance_by_type(
return None

return migration_instance

def _get_or_create_state(
self, migration_id: str, migration_type: MigrationTypeEnum, order_id: int
) -> Migration:
try:
state = self.state_manager.get_by_id(migration_id)
except StateNotFoundError:
state = self.state_manager.new(migration_id, migration_type, order_id)

return state

def _save_state(self, state: Migration, status: MigrationStatusEnum) -> None:
match status:
case MigrationStatusEnum.APPLIED:
state.applied()
case MigrationStatusEnum.FAILED:
state.failed()
case MigrationStatusEnum.RUNNING:
state.start()

self.state_manager.save_state(state)
87 changes: 87 additions & 0 deletions mpt_tool/use_cases/run_single_migration.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
import logging

from mpt_tool.enums import MigrationStatusEnum, MigrationTypeEnum
from mpt_tool.managers import FileMigrationManager, StateManager, StateManagerFactory
from mpt_tool.managers.errors import LoadMigrationError, MigrationFolderError
from mpt_tool.migration.base import BaseMigration
from mpt_tool.models import MigrationFile
from mpt_tool.services.migration_state import MigrationStateService
from mpt_tool.use_cases.errors import RunMigrationError

logger = logging.getLogger(__name__)


class RunSingleMigrationUseCase:
"""Use case for running a single migration."""

def __init__(
self,
file_migration_manager: FileMigrationManager | None = None,
state_manager: StateManager | None = None,
state_service: MigrationStateService | None = None,
):
self.file_migration_manager = file_migration_manager or FileMigrationManager()
self.state_manager = state_manager or StateManagerFactory.get_instance()
self.state_service = state_service or MigrationStateService(self.state_manager)

def execute(self, migration_id: str, migration_type: MigrationTypeEnum) -> None:
"""Run one migration by id and type.

Args:
migration_id: The migration id to run.
migration_type: The expected migration type.

Raises:
RunMigrationError: If an error occurs during migration execution.
"""
migration_file = self._get_migration_file(migration_id)
migration_instance = self._get_migration_instance_by_type(migration_file, migration_type)
state = self.state_service.get_or_create_state(
migration_file.migration_id, migration_type, migration_file.order_id
)
if state.applied_at is not None:
raise RunMigrationError(f"Migration {migration_id} already applied")

logger.info("Running migration: %s", migration_file.migration_id)
self.state_service.save_state(state, status=MigrationStatusEnum.RUNNING)
try:
migration_instance.run()
# We catch all exceptions here to ensure the state is updated
# and the flow is not interrupted abruptly
except Exception as error:
self.state_service.save_state(state, status=MigrationStatusEnum.FAILED)
raise RunMigrationError(
f"Migration {migration_file.migration_id} failed: {error!s}"
) from error

self.state_service.save_state(state, status=MigrationStatusEnum.APPLIED)

def _get_migration_file(self, migration_id: str) -> MigrationFile:
try:
migration_files = self.file_migration_manager.validate()
except MigrationFolderError as error:
raise RunMigrationError(str(error)) from error

migration_file = next(
(migration for migration in migration_files if migration.migration_id == migration_id),
None,
)
if migration_file is None:
raise RunMigrationError(f"Migration {migration_id} not found")

return migration_file

def _get_migration_instance_by_type(
self, migration_file: MigrationFile, migration_type: MigrationTypeEnum
) -> BaseMigration:
try:
migration_instance = self.file_migration_manager.load_migration(migration_file)
except LoadMigrationError as error:
raise RunMigrationError(str(error)) from error

if migration_instance.type != migration_type:
raise RunMigrationError(
f"Migration {migration_file.migration_id} is not a {migration_type} migration"
)

return migration_instance
8 changes: 8 additions & 0 deletions tests/cli/test_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,14 @@ def test_migrate_command_multiple_params_error(runner):
assert "Only one param can be used." in result.output


def test_migrate_migration_id_without_type_error(runner):
result = runner.invoke(app, ["migrate", "fake_data_file_name"])

assert result.exit_code == 2, result.output
assert "Invalid value for migrate:" in result.output
assert "MIGRATION_ID can only be used with --data or" in result.output


def test_migrate_data_duplicate_migration(runner, migration_folder):
(migration_folder / "20250406020202_fake_file_name.py").touch()
(migration_folder / "20260107010101_fake_file_name.py").touch()
Expand Down
Loading