Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
42 commits
Select commit Hold shift + click to select a range
4e1bb27
Add `MigrationWithRebase` class
gmazoyer Oct 15, 2025
7157fa1
Convert migration 42 to a rebase one
gmazoyer Oct 15, 2025
9fe20cf
Split some logic into different functions
gmazoyer Oct 15, 2025
0e76288
Fix initial migration run
gmazoyer Oct 17, 2025
dd04fa4
Add one more message to understand what goes on
gmazoyer Oct 17, 2025
505d93b
Load branches from db only once
gmazoyer Oct 20, 2025
1ca016e
Cleanup lint errors
gmazoyer Oct 20, 2025
3500ea0
Move init back to migration to avoid circular deps
gmazoyer Oct 21, 2025
915cf28
Add need upgrade rebase branch status
gmazoyer Oct 22, 2025
17b9c80
Add graph number in branch and set it
gmazoyer Oct 22, 2025
f85be21
Use prefect flow for rebasing branches
gmazoyer Oct 22, 2025
773d113
Need to load schema before rebase
gmazoyer Oct 22, 2025
018c2a8
Fix unit test
gmazoyer Oct 22, 2025
f1c44ef
Move code around to store branch status en graph version
gmazoyer Oct 22, 2025
b86d627
Don't hard stop upgrade process on branch rebase failure
gmazoyer Oct 22, 2025
262e920
Allow migrations to run against individual branches
gmazoyer Oct 23, 2025
4c98e39
Set graph version on branch creation
gmazoyer Oct 24, 2025
668c41c
Add `--rebase-branches` flag to upgrade cmd
gmazoyer Oct 24, 2025
3a0fe94
Add new flow to run migrations on a branch
gmazoyer Oct 28, 2025
6908ed4
Fix wrongly deleted code
gmazoyer Oct 28, 2025
2e907b3
Fix generated graphql schema
gmazoyer Oct 28, 2025
4be9884
Update docs
gmazoyer Oct 28, 2025
e092d8f
Add new flow to workflow list
gmazoyer Oct 28, 2025
16f60c2
Fix for triggering migrations
gmazoyer Oct 28, 2025
d59be63
Add branch migrated evemt
gmazoyer Oct 28, 2025
40aa298
Fix flow name
gmazoyer Oct 29, 2025
91de062
Fix rebase of branches using upgrade command
gmazoyer Oct 29, 2025
48fb7c4
Remove left over
gmazoyer Oct 29, 2025
3c70090
Rename class
gmazoyer Oct 29, 2025
c814786
Run branch migrate flow inline
gmazoyer Oct 30, 2025
8e6c259
Add tests for migration runner
gmazoyer Oct 30, 2025
45564b2
Update graphql schema
gmazoyer Oct 30, 2025
6325348
Do not leak db connection on early exit
gmazoyer Oct 30, 2025
74fd90d
Wrap into database session
gmazoyer Oct 30, 2025
33c9bb5
Catch migration failure exception to update CLI feedback
gmazoyer Oct 30, 2025
260586c
Fix import
gmazoyer Oct 30, 2025
18beba6
Update the graph version if no migrations need to be applied
gmazoyer Oct 30, 2025
17ead5c
Add changelog record
gmazoyer Oct 30, 2025
dcbdf7d
Typo
gmazoyer Oct 30, 2025
6e06543
Add typealias for migration classes
gmazoyer Oct 31, 2025
770308a
Fix test to avoid having to update it when we'll add migrations
gmazoyer Oct 31, 2025
d92dfc3
Fix docstring
gmazoyer Oct 31, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
118 changes: 83 additions & 35 deletions backend/infrahub/cli/db.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,12 @@
from rich.table import Table

from infrahub import config
from infrahub.auth import AccountSession, AuthType
from infrahub.context import InfrahubContext
from infrahub.core import registry
from infrahub.core.branch import Branch
from infrahub.core.branch.tasks import rebase_branch
from infrahub.core.constants import GLOBAL_BRANCH_NAME
from infrahub.core.graph import GRAPH_VERSION
from infrahub.core.graph.constraints import ConstraintManagerBase, ConstraintManagerMemgraph, ConstraintManagerNeo4j
from infrahub.core.graph.index import node_indexes, rel_indexes
Expand All @@ -30,10 +35,8 @@
GraphRelationshipIsPartOf,
GraphRelationshipProperties,
)
from infrahub.core.initialization import (
get_root_node,
initialize_registry,
)
from infrahub.core.initialization import get_root_node, initialize_registry
from infrahub.core.migrations.exceptions import MigrationFailureError
from infrahub.core.migrations.graph import get_graph_migrations, get_migration_by_number
from infrahub.core.migrations.schema.models import SchemaApplyMigrationData
from infrahub.core.migrations.schema.tasks import schema_apply_migrations
Expand All @@ -45,6 +48,7 @@
from infrahub.database import DatabaseType
from infrahub.database.memgraph import IndexManagerMemgraph
from infrahub.database.neo4j import IndexManagerNeo4j
from infrahub.exceptions import ValidationError

from .constants import ERROR_BADGE, FAILED_BADGE, SUCCESS_BADGE
from .db_commands.check_inheritance import check_inheritance
Expand All @@ -59,7 +63,7 @@ def get_timestamp_string() -> str:

if TYPE_CHECKING:
from infrahub.cli.context import CliContext
from infrahub.core.migrations.shared import ArbitraryMigration, GraphMigration, InternalSchemaMigration
from infrahub.core.migrations.shared import MigrationTypes
from infrahub.database import InfrahubDatabase
from infrahub.database.index import IndexManagerBase

Expand Down Expand Up @@ -105,7 +109,15 @@ async def migrate_cmd(
context: CliContext = ctx.obj
dbdriver = await context.init_db(retry=1)

await migrate_database(db=dbdriver, initialize=True, check=check, migration_number=migration_number)
root_node = await get_root_node(db=dbdriver)
migrations = await detect_migration_to_run(
current_graph_version=root_node.graph_version, migration_number=migration_number
)

if check or not migrations:
return

await migrate_database(db=dbdriver, migrations=migrations, initialize=True)

await dbdriver.close()

Expand Down Expand Up @@ -268,49 +280,55 @@ async def index(
await dbdriver.close()


async def detect_migration_to_run(
current_graph_version: int, migration_number: int | str | None = None
) -> Sequence[MigrationTypes]:
"""Return a sequence of migrations to apply to upgrade the database."""
rprint("Checking current state of the database")
migrations: list[MigrationTypes] = []

if migration_number:
migration = get_migration_by_number(migration_number)
migrations.append(migration)
if current_graph_version > migration.minimum_version:
rprint(
f"Migration {migration_number} already applied. To apply again, run the command without the --check flag."
)
return []
rprint(
f"Migration {migration_number} needs to be applied. Run `infrahub db migrate` to apply all outstanding migrations."
)
else:
migrations.extend(await get_graph_migrations(current_graph_version=current_graph_version))
if not migrations:
rprint(f"Database up-to-date (v{current_graph_version}), no migration to execute.")
return []

rprint(
f"Database needs to be updated (v{current_graph_version} -> v{GRAPH_VERSION}), {len(migrations)} migrations pending"
)
return migrations


async def migrate_database(
db: InfrahubDatabase, initialize: bool = False, check: bool = False, migration_number: int | str | None = None
db: InfrahubDatabase, migrations: Sequence[MigrationTypes], initialize: bool = False
) -> bool:
"""Apply the latest migrations to the database, this function will print the status directly in the console.

Returns a boolean indicating whether a migration failed or if all migrations succeeded.

Args:
db: The database object.
check: If True, the function will only check the status of the database and not apply the migrations. Defaults to False.
migration_number: If provided, the function will only apply the migration with the given number. Defaults to None.
migrations: Sequence of migrations to apply.
initialize: Whether to initialize the registry before running migrations.
"""
rprint("Checking current state of the Database")
if not migrations:
return True

if initialize:
await initialize_registry(db=db)

root_node = await get_root_node(db=db)
if migration_number:
migration = get_migration_by_number(migration_number)
migrations: Sequence[GraphMigration | InternalSchemaMigration | ArbitraryMigration] = [migration]
if check:
if root_node.graph_version > migration.minimum_version:
rprint(
f"Migration {migration_number} already applied. To apply again, run the command without the --check flag."
)
return True
rprint(
f"Migration {migration_number} needs to be applied. Run `infrahub db migrate` to apply all outstanding migrations."
)
return False
else:
migrations = await get_graph_migrations(root=root_node)
if not migrations:
rprint(f"Database up-to-date (v{root_node.graph_version}), no migration to execute.")
return True

rprint(
f"Database needs to be updated (v{root_node.graph_version} -> v{GRAPH_VERSION}), {len(migrations)} migrations pending"
)

if check:
return True

for migration in migrations:
execution_result = await migration.execute(db=db)
Expand All @@ -335,6 +353,36 @@ async def migrate_database(
return True


async def trigger_rebase_branches(db: InfrahubDatabase) -> None:
"""Trigger rebase of non-default branches, also triggering migrations in the process."""
branches = [b for b in await Branch.get_list(db=db) if b.name not in [registry.default_branch, GLOBAL_BRANCH_NAME]]
if not branches:
return

rprint(f"Planning rebase and migrations for {len(branches)} branches: {', '.join([b.name for b in branches])}")

for branch in branches:
if branch.graph_version == GRAPH_VERSION:
rprint(
f"Ignoring branch rebase and migrations for '{branch.name}' (ID: {branch.uuid}), it is already up-to-date"
)
continue

rprint(f"Rebasing branch '{branch.name}' (ID: {branch.uuid})...", end="")
try:
await registry.schema.load_schema(db=db, branch=branch)
await rebase_branch(
branch=branch.name,
context=InfrahubContext.init(
branch=branch, account=AccountSession(auth_type=AuthType.NONE, authenticated=False, account_id="")
),
send_events=False,
)
rprint(SUCCESS_BADGE)
except (ValidationError, MigrationFailureError):
rprint(FAILED_BADGE)


async def initialize_internal_schema() -> None:
registry.schema = SchemaManager()
schema = SchemaRoot(**internal_schema)
Expand Down
31 changes: 28 additions & 3 deletions backend/infrahub/cli/upgrade.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,12 @@
from rich import print as rprint

from infrahub import config
from infrahub.core.initialization import create_anonymous_role, create_default_account_groups, initialize_registry
from infrahub.core.initialization import (
create_anonymous_role,
create_default_account_groups,
get_root_node,
initialize_registry,
)
from infrahub.core.manager import NodeManager
from infrahub.core.protocols import CoreAccount, CoreObjectPermission
from infrahub.dependencies.registry import build_component_registry
Expand All @@ -26,7 +31,13 @@
setup_worker_pools,
)

from .db import initialize_internal_schema, migrate_database, update_core_schema
from .db import (
detect_migration_to_run,
initialize_internal_schema,
migrate_database,
trigger_rebase_branches,
update_core_schema,
)

if TYPE_CHECKING:
from infrahub.cli.context import CliContext
Expand All @@ -40,6 +51,7 @@ async def upgrade_cmd(
ctx: typer.Context,
config_file: str = typer.Argument("infrahub.toml", envvar="INFRAHUB_CONFIG"),
check: bool = typer.Option(False, help="Check the state of the system without upgrading."),
rebase_branches: bool = typer.Option(False, help="Rebase and apply migrations to branches if required."),
) -> None:
"""Upgrade Infrahub to the latest version."""

Expand All @@ -57,6 +69,8 @@ async def upgrade_cmd(

build_component_registry()

root_node = await get_root_node(db=dbdriver)

# NOTE add step to validate if the database and the task manager are reachable

# -------------------------------------------
Expand All @@ -67,7 +81,12 @@ async def upgrade_cmd(
# Upgrade Infrahub Database and Schema
# -------------------------------------------

if not await migrate_database(db=dbdriver, initialize=False, check=check):
migrations = await detect_migration_to_run(current_graph_version=root_node.graph_version)
if check:
await dbdriver.close()
return

if not await migrate_database(db=dbdriver, initialize=False, migrations=migrations):
# A migration failed, stop the upgrade process
rprint("Upgrade cancelled due to migration failure.")
await dbdriver.close()
Expand All @@ -91,6 +110,12 @@ async def upgrade_cmd(
await setup_deployments(client=client)
await trigger_configure_all()

# -------------------------------------------
# Perform branch rebase and apply migrations to them
# -------------------------------------------
if rebase_branches:
await trigger_rebase_branches(db=dbdriver)

await dbdriver.close()


Expand Down
1 change: 1 addition & 0 deletions backend/infrahub/core/branch/enums.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,5 @@
class BranchStatus(InfrahubStringEnum):
OPEN = "OPEN"
NEED_REBASE = "NEED_REBASE"
NEED_UPGRADE_REBASE = "NEED_UPGRADE_REBASE"
DELETING = "DELETING"
10 changes: 7 additions & 3 deletions backend/infrahub/core/branch/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,8 @@
from pydantic import Field, field_validator

from infrahub.core.branch.enums import BranchStatus
from infrahub.core.constants import (
GLOBAL_BRANCH_NAME,
)
from infrahub.core.constants import GLOBAL_BRANCH_NAME
from infrahub.core.graph import GRAPH_VERSION
from infrahub.core.models import SchemaBranchHash # noqa: TC001
from infrahub.core.node.standard import StandardNode
from infrahub.core.query import QueryType
Expand Down Expand Up @@ -46,6 +45,7 @@ class Branch(StandardNode):
is_isolated: bool = True
schema_changed_at: Optional[str] = None
schema_hash: Optional[SchemaBranchHash] = None
graph_version: int | None = None

_exclude_attrs: list[str] = ["id", "uuid", "owner"]

Expand Down Expand Up @@ -261,6 +261,10 @@ def get_branches_and_times_for_range(

return start, end

async def create(self, db: InfrahubDatabase) -> bool:
self.graph_version = GRAPH_VERSION
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One question here is how we manage this number after a rebase operation. Do we then bump the graph version? (I haven't yet checked if we already do this, just commenting some thoughts)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This number only changes in the branch-migrate flow, if there are no migrations to apply or if migrations were applied succesfully. This means that if a branch is created at graph version n and the global graph version is changed to m, the branch's graph version will never be m unless a rebase operation happens.

return await super().create(db=db)
Comment on lines +264 to +266
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Set graph_version during branch rebases

create() now seeds graph_version, but rebase() still leaves whatever stale value the branch carried before the migration. After a successful rebase the branch should advertise the current GRAPH_VERSION; otherwise the new upgrade flow keeps treating it as out-of-date and may keep it in NEED_UPGRADE_REBASE. Please assign and persist the fresh version inside rebase() once the transaction succeeds. For example:

         self.branched_from = at.to_string()
         self.status = BranchStatus.OPEN
+        self.graph_version = GRAPH_VERSION
         await self.save(db=db)

Also applies to: 476-493

🤖 Prompt for AI Agents
In backend/infrahub/core/branch/models.py around lines 264-266 (and similarly
476-493), rebase() currently leaves the branch's stale graph_version untouched;
after a successful rebase you must assign self.graph_version = GRAPH_VERSION and
persist that change in the database inside the successful transaction/commit
path. Update rebase() to set the new GRAPH_VERSION on the instance and call the
model's save/update method (or run an UPDATE via the db session) only after the
rebase transaction succeeds so the refreshed graph_version is stored and the
branch no longer appears as needing an upgrade rebase.


async def delete(self, db: InfrahubDatabase) -> None:
if self.is_default:
raise ValidationError(f"Unable to delete {self.name} it is the default branch.")
Expand Down
Loading
Loading