Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 8 additions & 22 deletions mpt_tool/cli.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,12 @@
import datetime as dt
import logging
from pathlib import Path
from typing import Annotated
from typing import Annotated, cast

import typer

from mpt_tool.constants import MIGRATION_FOLDER
from mpt_tool.enums import MigrationTypeEnum
from mpt_tool.errors import RunMigrationError
from mpt_tool.templates import MIGRATION_SCAFFOLDING_TEMPLATE
from mpt_tool.errors import NewMigrationError, RunMigrationError
from mpt_tool.use_cases import RunMigrationsUseCase
from mpt_tool.use_cases.new_migration import NewMigrationUseCase

app = typer.Typer(help="MPT CLI - Migration tool for extensions.", no_args_is_help=True)

Expand All @@ -20,7 +17,7 @@ def callback() -> None:


@app.command("migrate")
def migrate( # noqa: C901, WPS238, WPS210, WPS213, WPS231
def migrate( # noqa: C901, WPS238, WPS231
data: Annotated[bool, typer.Option("--data", help="Run data migrations.")] = False, # noqa: FBT002
schema: Annotated[bool, typer.Option("--schema", help="Run schema migrations.")] = False, # noqa: FBT002
new_data: Annotated[
Expand Down Expand Up @@ -63,25 +60,14 @@ def migrate( # noqa: C901, WPS238, WPS210, WPS213, WPS231

if new_schema or new_data:
filename_suffix = new_data or new_schema
migration_type = MigrationTypeEnum.DATA if new_data else MigrationTypeEnum.SCHEMA
typer.echo(f"Scaffolding migration: {filename_suffix}.")
migration_folder = Path(MIGRATION_FOLDER)
migration_folder.mkdir(parents=True, exist_ok=True)
timestamp = dt.datetime.now(tz=dt.UTC).strftime("%Y%m%d%H%M%S")
# TODO: add filename validation
filename = f"{timestamp}_{filename_suffix}.py"
full_filename_path = migration_folder / filename
try:
full_filename_path.touch(exist_ok=False)
except FileExistsError:
typer.secho(f"File already exists: {filename}", fg=typer.colors.RED)
filename = NewMigrationUseCase().execute(migration_type, cast(str, filename_suffix))
except NewMigrationError as error:
typer.secho(f"Error creating migration: {error!s}", fg=typer.colors.RED)
raise typer.Abort

full_filename_path.write_text(
encoding="utf-8",
data=MIGRATION_SCAFFOLDING_TEMPLATE.substitute(
command_name="DataBaseCommand" if new_data else "SchemaBaseCommand"
),
)
typer.secho(f"Migration file: {filename} has been created.", fg=typer.colors.GREEN)


Expand Down
8 changes: 8 additions & 0 deletions mpt_tool/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,18 @@ def __init__(self, message: str):
super().__init__(message)


class CreateMigrationError(BaseError):
"""Error creating the migration file."""


class LoadMigrationError(BaseError):
"""Error loading migrations."""


class NewMigrationError(BaseError):
"""Error creating new migration."""


class MigrationFolderError(BaseError):
"""Error accessing migrations folder."""

Expand Down
46 changes: 40 additions & 6 deletions mpt_tool/managers.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,14 @@

from mpt_tool.constants import MIGRATION_FOLDER, MIGRATION_STATE_FILE
from mpt_tool.enums import MigrationTypeEnum
from mpt_tool.errors import LoadMigrationError, MigrationFolderError, StateNotFoundError
from mpt_tool.errors import (
CreateMigrationError,
LoadMigrationError,
MigrationFolderError,
StateNotFoundError,
)
from mpt_tool.models import Migration, MigrationFile
from mpt_tool.templates import MIGRATION_SCAFFOLDING_TEMPLATE


class FileMigrationManager:
Expand All @@ -18,21 +24,23 @@ class FileMigrationManager:
_migration_folder: Path = Path(MIGRATION_FOLDER)

@classmethod
def validate(cls) -> tuple[MigrationFile, ...]:
def validate(cls) -> tuple[MigrationFile, ...]: # noqa: WPS238
"""Validates the migration folder and returns a tuple of migration files."""
if not cls._migration_folder.exists():
raise MigrationFolderError(f"Migration folder not found: {cls._migration_folder}")

migrations = tuple(
sorted(
try:
migrations = sorted(
(
MigrationFile.build_from_path(path)
for path in cls._migration_folder.glob("*.py")
if re.match(r"\d+_.*\.py", path.name)
),
key=lambda migration_file: migration_file.order_id,
)
)
except ValueError as error:
raise MigrationFolderError(str(error)) from None

if not migrations:
raise MigrationFolderError(f"No migration files found in {cls._migration_folder}")

Expand All @@ -43,7 +51,7 @@ def validate(cls) -> tuple[MigrationFile, ...]:
f"Duplicate migration filename found: {duplicated_migrations[0]}"
)

return migrations
return tuple(migrations)

@classmethod
def load_migration(cls, migration_file: MigrationFile) -> ModuleType:
Expand All @@ -63,6 +71,32 @@ def load_migration(cls, migration_file: MigrationFile) -> ModuleType:
spec.loader.exec_module(migration_module)
return migration_module

@classmethod
def new_migration(cls, file_suffix: str, migration_type: MigrationTypeEnum) -> MigrationFile:
"""Creates a new migration file."""
cls._migration_folder.mkdir(parents=True, exist_ok=True)
try:
migration_file = MigrationFile.new(migration_id=file_suffix, path=cls._migration_folder)
except ValueError as error:
raise CreateMigrationError(f"Invalid migration ID: {error}") from error

try:
migration_file.full_path.touch(exist_ok=False)
except FileExistsError as error:
raise CreateMigrationError(
f"File already exists: {migration_file.file_name}"
) from error

migration_file.full_path.write_text(
encoding="utf-8",
data=MIGRATION_SCAFFOLDING_TEMPLATE.substitute(
command_name="DataBaseCommand"
if migration_type == MigrationTypeEnum.DATA
else "SchemaBaseCommand"
),
)
return migration_file


class StateJSONEncoder(json.JSONEncoder):
"""JSON encoder for migration states."""
Expand Down
23 changes: 23 additions & 0 deletions mpt_tool/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,17 @@ class MigrationFile:
migration_id: str
order_id: int

@property
def file_name(self) -> str:
"""Migration file name."""
return f"{self.order_id}_{self.migration_id}.py"

def __post_init__(self):
if not self.migration_id.isidentifier():
raise ValueError(
"Migration ID must contain only alphanumeric letters and numbers, or underscores."
)

@property
def name(self) -> str:
"""Migration file name."""
Expand All @@ -77,3 +88,15 @@ def build_from_path(cls, path: Path) -> Self:
"""
order_id, migration_id = path.stem.split("_", maxsplit=1)
return cls(full_path=path, order_id=int(order_id), migration_id=migration_id)

@classmethod
def new(cls, migration_id: str, path: Path) -> Self:
"""Create a new migration file.
Args:
migration_id: The migration ID.
path: The path to the migration folder.
"""
timestamp = dt.datetime.now(tz=dt.UTC).strftime("%Y%m%d%H%M%S")
full_path = path / f"{timestamp}_{migration_id}.py"
return cls(full_path=full_path, migration_id=migration_id, order_id=int(timestamp))
21 changes: 21 additions & 0 deletions mpt_tool/use_cases/new_migration.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
from mpt_tool.enums import MigrationTypeEnum
from mpt_tool.errors import CreateMigrationError, NewMigrationError
from mpt_tool.managers import FileMigrationManager


class NewMigrationUseCase:
"""Service to create a new migration file."""

def __init__(self, file_manager: FileMigrationManager | None = None):
self.file_manager = file_manager or FileMigrationManager()

def execute(self, migration_type: MigrationTypeEnum, file_name: str) -> str:
"""Create a new migration file."""
try:
migration_file = self.file_manager.new_migration(
file_suffix=file_name, migration_type=migration_type
)
except CreateMigrationError as error:
raise NewMigrationError(str(error)) from error

return migration_file.file_name