Skip to content

Commit bf03669

Browse files
committed
Allow migrations to run against individual branches
1 parent 82e5229 commit bf03669

File tree

5 files changed

+72
-53
lines changed

5 files changed

+72
-53
lines changed

backend/infrahub/cli/db.py

Lines changed: 49 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
from infrahub import config
2121
from infrahub.core import registry
2222
from infrahub.core.branch import Branch
23+
from infrahub.core.branch.enums import BranchStatus
2324
from infrahub.core.constants import GLOBAL_BRANCH_NAME
2425
from infrahub.core.graph import GRAPH_VERSION
2526
from infrahub.core.graph.constraints import ConstraintManagerBase, ConstraintManagerMemgraph, ConstraintManagerNeo4j
@@ -108,7 +109,10 @@ async def migrate_cmd(
108109
context: CliContext = ctx.obj
109110
dbdriver = await context.init_db(retry=1)
110111

111-
migrations = await detect_migration_to_run(db=dbdriver, migration_number=migration_number)
112+
root_node = await get_root_node(db=dbdriver)
113+
migrations = await detect_migration_to_run(
114+
current_graph_version=root_node.graph_version, migration_number=migration_number
115+
)
112116

113117
if check or not migrations:
114118
return
@@ -277,17 +281,16 @@ async def index(
277281

278282

279283
async def detect_migration_to_run(
280-
db: InfrahubDatabase, migration_number: int | str | None = None
284+
current_graph_version: int, migration_number: int | str | None = None
281285
) -> Sequence[GraphMigration | InternalSchemaMigration | ArbitraryMigration | MigrationWithRebase]:
282286
"""Return a sequence of migrations to apply to upgrade the database."""
283287
rprint("Checking current state of the database")
284288
migrations: list[GraphMigration | InternalSchemaMigration | ArbitraryMigration | MigrationWithRebase] = []
285289

286-
root_node = await get_root_node(db=db)
287290
if migration_number:
288291
migration = get_migration_by_number(migration_number)
289292
migrations.append(migration)
290-
if root_node.graph_version > migration.minimum_version:
293+
if current_graph_version > migration.minimum_version:
291294
rprint(
292295
f"Migration {migration_number} already applied. To apply again, run the command without the --check flag."
293296
)
@@ -296,13 +299,13 @@ async def detect_migration_to_run(
296299
f"Migration {migration_number} needs to be applied. Run `infrahub db migrate` to apply all outstanding migrations."
297300
)
298301
else:
299-
migrations.extend(await get_graph_migrations(root=root_node))
302+
migrations.extend(await get_graph_migrations(current_graph_version=current_graph_version))
300303
if not migrations:
301-
rprint(f"Database up-to-date (v{root_node.graph_version}), no migration to execute.")
304+
rprint(f"Database up-to-date (v{current_graph_version}), no migration to execute.")
302305
return []
303306

304307
rprint(
305-
f"Database needs to be updated (v{root_node.graph_version} -> v{GRAPH_VERSION}), {len(migrations)} migrations pending"
308+
f"Database needs to be updated (v{current_graph_version} -> v{GRAPH_VERSION}), {len(migrations)} migrations pending"
306309
)
307310
return migrations
308311

@@ -320,6 +323,9 @@ async def migrate_database(
320323
db: The database object.
321324
migration_number: If provided, the function will only apply the migration with the given number. Defaults to None.
322325
"""
326+
if not migrations:
327+
return True
328+
323329
if initialize:
324330
await initialize_registry(db=db)
325331

@@ -348,33 +354,49 @@ async def migrate_database(
348354
return True
349355

350356

351-
async def rebase_and_migrate_branches(
352-
db: InfrahubDatabase,
353-
migrations: Sequence[GraphMigration | InternalSchemaMigration | ArbitraryMigration | MigrationWithRebase],
354-
) -> bool:
357+
async def rebase_and_migrate_branches(db: InfrahubDatabase, current_graph_version: int) -> bool:
355358
"""Only applies migrations that aim at rebasing branches."""
356-
branches = [b for b in await Branch.get_list(db=db) if b.name not in [registry.default_branch, GLOBAL_BRANCH_NAME]]
359+
branches = [
360+
b
361+
for b in await Branch.get_list(db=db)
362+
if b.name not in [registry.default_branch, GLOBAL_BRANCH_NAME]
363+
and (not b.graph_version or b.graph_version < current_graph_version)
364+
]
365+
366+
if not branches:
367+
return True
368+
357369
rprint(f"Planning rebase and migrations for {len(branches)} branches: {', '.join([b.name for b in branches])}")
358370

359-
rebase_migrations = [m for m in migrations if isinstance(m, MigrationWithRebase)]
371+
for branch in branches:
372+
migrations = [
373+
m
374+
for m in await detect_migration_to_run(current_graph_version=branch.graph_version or current_graph_version)
375+
if isinstance(m, MigrationWithRebase)
376+
]
377+
rprint(
378+
f"Detected {len(migrations)} migrations to run against '{branch.name}' (ID: {branch.uuid}): {', '.join([m.name for m in migrations])}"
379+
)
360380

361-
for migration in rebase_migrations:
362-
execution_result = await migration.execute_against_branches(db=db, branches=branches)
363-
validation_result = None
381+
for migration in migrations:
382+
execution_result = await migration.execute_against_branch(db=db, branch=branch)
383+
validation_result = None
364384

365-
if execution_result.success:
366-
validation_result = await migration.validate_migration(db=db)
367-
if validation_result.success:
368-
rprint(f"Migration: {migration.name} {SUCCESS_BADGE}")
385+
if execution_result.success:
386+
validation_result = await migration.validate_migration(db=db)
387+
if validation_result.success and branch.status != BranchStatus.NEED_UPGRADE_REBASE:
388+
branch.graph_version = migration.minimum_version + 1
389+
await branch.save(db=db)
390+
rprint(f"Migration: {migration.name} {SUCCESS_BADGE}")
369391

370-
if not execution_result.success or (validation_result and not validation_result.success):
371-
rprint(f"Migration: {migration.name} {FAILED_BADGE}")
372-
for error in execution_result.errors:
373-
rprint(f" {error}")
374-
if validation_result and not validation_result.success:
375-
for error in validation_result.errors:
392+
if not execution_result.success or (validation_result and not validation_result.success):
393+
rprint(f"Migration: {migration.name} {FAILED_BADGE}")
394+
for error in execution_result.errors:
376395
rprint(f" {error}")
377-
return False
396+
if validation_result and not validation_result.success:
397+
for error in validation_result.errors:
398+
rprint(f" {error}")
399+
return False
378400

379401
return True
380402

backend/infrahub/cli/upgrade.py

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,12 @@
1111
from rich import print as rprint
1212

1313
from infrahub import config
14-
from infrahub.core.initialization import create_anonymous_role, create_default_account_groups, initialize_registry
14+
from infrahub.core.initialization import (
15+
create_anonymous_role,
16+
create_default_account_groups,
17+
get_root_node,
18+
initialize_registry,
19+
)
1520
from infrahub.core.manager import NodeManager
1621
from infrahub.core.protocols import CoreAccount, CoreObjectPermission
1722
from infrahub.dependencies.registry import build_component_registry
@@ -63,6 +68,8 @@ async def upgrade_cmd(
6368

6469
build_component_registry()
6570

71+
root_node = await get_root_node(db=dbdriver)
72+
6673
# NOTE add step to validate if the database and the task manager are reachable
6774

6875
# -------------------------------------------
@@ -73,8 +80,8 @@ async def upgrade_cmd(
7380
# Upgrade Infrahub Database and Schema
7481
# -------------------------------------------
7582

76-
migrations = await detect_migration_to_run(db=dbdriver)
77-
if check or not migrations:
83+
migrations = await detect_migration_to_run(current_graph_version=root_node.graph_version)
84+
if check:
7885
return
7986

8087
if not await migrate_database(db=dbdriver, initialize=False, migrations=migrations):
@@ -86,7 +93,7 @@ async def upgrade_cmd(
8693
await initialize_internal_schema()
8794
await update_core_schema(db=dbdriver, initialize=False)
8895

89-
if not await rebase_and_migrate_branches(db=dbdriver, migrations=migrations):
96+
if not await rebase_and_migrate_branches(db=dbdriver, current_graph_version=root_node.graph_version):
9097
# A migration failed, stop the upgrade process
9198
rprint("Upgrade cancelled due to branch rebase and migration failure.")
9299
await dbdriver.close()

backend/infrahub/core/migrations/graph/__init__.py

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -47,8 +47,6 @@
4747
from .m043_backfill_hfid_display_label_in_db import Migration043
4848

4949
if TYPE_CHECKING:
50-
from infrahub.core.root import Root
51-
5250
from ..shared import ArbitraryMigration, GraphMigration, InternalSchemaMigration, MigrationWithRebase
5351

5452
MIGRATIONS: list[type[GraphMigration | InternalSchemaMigration | ArbitraryMigration | MigrationWithRebase]] = [
@@ -99,12 +97,12 @@
9997

10098

10199
async def get_graph_migrations(
102-
root: Root,
100+
current_graph_version: int,
103101
) -> Sequence[GraphMigration | InternalSchemaMigration | ArbitraryMigration | MigrationWithRebase]:
104102
applicable_migrations = []
105103
for migration_class in MIGRATIONS:
106104
migration = migration_class.init()
107-
if root.graph_version > migration.minimum_version:
105+
if current_graph_version > migration.minimum_version:
108106
continue
109107
applicable_migrations.append(migration)
110108

backend/infrahub/core/migrations/graph/m043_backfill_hfid_display_label_in_db.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -718,7 +718,7 @@ async def _do_one_schema_all(
718718

719719
print("done")
720720

721-
async def execute_against_branch(self, db: InfrahubDatabase, branch: Branch) -> MigrationResult:
721+
async def process_branch(self, db: InfrahubDatabase, branch: Branch) -> MigrationResult:
722722
root_node = await get_root_node(db=db, initialize=False)
723723
default_branch_name = root_node.default_branch
724724
default_branch = await Branch.get_by_name(db=db, name=default_branch_name)

backend/infrahub/core/migrations/shared.py

Lines changed: 9 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -271,34 +271,26 @@ async def rebase_branch(self, branch: Branch) -> bool:
271271
async def validate_migration(self, db: InfrahubDatabase) -> MigrationResult:
272272
raise NotImplementedError()
273273

274-
async def execute_against_branch(self, db: InfrahubDatabase, branch: Branch) -> MigrationResult:
274+
async def process_branch(self, db: InfrahubDatabase, branch: Branch) -> MigrationResult:
275275
raise NotImplementedError()
276276

277-
async def execute_against_branches(self, db: InfrahubDatabase, branches: Sequence[Branch]) -> MigrationResult:
278-
result = MigrationResult()
279-
280-
for branch in branches:
281-
await registry.schema.load_schema(db=db, branch=branch)
277+
async def execute_against_branch(
278+
self, db: InfrahubDatabase, branch: Branch, skip_rebase: bool = False
279+
) -> MigrationResult:
280+
await registry.schema.load_schema(db=db, branch=branch)
282281

282+
if not skip_rebase:
283283
if not await self.rebase_branch(branch=branch):
284284
branch.status = BranchStatus.NEED_UPGRADE_REBASE
285285
await branch.save(db=db)
286-
continue
287-
288-
r = await self.execute_against_branch(db=db, branch=branch)
289-
result.nbr_migrations_executed += 1
290-
if r.errors:
291-
result.errors.extend(r.errors)
292-
if r.success:
293-
branch.graph_version = self.minimum_version + 1
294-
await branch.save(db=db)
286+
return MigrationResult()
295287

296-
return result
288+
return await self.process_branch(db=db, branch=branch)
297289

298290
async def execute(self, db: InfrahubDatabase) -> MigrationResult:
299291
from infrahub.core.initialization import initialization
300292

301293
initialize_lock()
302294
await initialization(db=db)
303295

304-
return await self.execute_against_branch(db=db, branch=registry.get_branch_from_registry())
296+
return await self.execute_against_branch(db=db, branch=registry.get_branch_from_registry(), skip_rebase=True)

0 commit comments

Comments
 (0)