Skip to content

Commit eb8bbb3

Browse files
committed
Run branch migrate flow inline
1 parent fc2dfb5 commit eb8bbb3

File tree

1 file changed

+45
-47
lines changed

1 file changed

+45
-47
lines changed

backend/infrahub/core/branch/tasks.py

Lines changed: 45 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,50 @@
6060
from infrahub.workflows.utils import add_tags
6161

6262

63+
@flow(name="branch-migrate", flow_run_name="Apply migrations to branch {branch}")
64+
async def migrate_branch(branch: str, context: InfrahubContext, send_events: bool = True) -> None:
65+
await add_tags(branches=[branch])
66+
67+
db = await get_database()
68+
log = get_run_logger()
69+
70+
obj = await Branch.get_by_name(db=db, name=branch)
71+
72+
if obj.graph_version == GRAPH_VERSION:
73+
log.info(f"Branch '{obj.name}' is up-to-date")
74+
return
75+
76+
migration_runner = MigrationRunner(branch=obj)
77+
if not migration_runner.has_migrations():
78+
log.info(f"No migrations detected for branch '{obj.name}'")
79+
return
80+
81+
# Branch status will remain as so if the migration process fails
82+
# This will help user to know that a branch is in an invalid state to be used properly and that actions need to be taken
83+
obj.status = BranchStatus.NEED_UPGRADE_REBASE
84+
await obj.save(db=db)
85+
86+
try:
87+
log.info(f"Running migrations for branch '{obj.name}'")
88+
await migration_runner.run(db=db)
89+
except MigrationFailureError as exc:
90+
log.error(f"Failed to run migrations for branch '{obj.name}': {exc.errors}")
91+
return
92+
93+
if obj.status == BranchStatus.NEED_UPGRADE_REBASE:
94+
obj.status = BranchStatus.OPEN
95+
obj.graph_version = GRAPH_VERSION
96+
await obj.save(db=db)
97+
98+
if send_events:
99+
event_service = await get_event_service()
100+
await event_service.send(
101+
BranchMigratedEvent(
102+
branch_name=obj.name, branch_id=str(obj.uuid), meta=EventMeta(branch=obj, context=context)
103+
)
104+
)
105+
106+
63107
@flow(name="branch-rebase", flow_run_name="Rebase branch {branch}")
64108
async def rebase_branch(branch: str, context: InfrahubContext, send_events: bool = True) -> None: # noqa: PLR0915
65109
workflow = get_workflow()
@@ -174,9 +218,7 @@ async def rebase_branch(branch: str, context: InfrahubContext, send_events: bool
174218
parameters={"branch": obj.name, "ipam_node_details": ipam_node_details},
175219
)
176220

177-
await workflow.submit_workflow(
178-
workflow=BRANCH_MIGRATE, context=context, parameters={"branch": obj.name, "send_events": send_events}
179-
)
221+
await migrate_branch(branch=branch, context=context, send_events=send_events)
180222
await workflow.submit_workflow(workflow=DIFF_REFRESH_ALL, context=context, parameters={"branch_name": obj.name})
181223

182224
if not send_events:
@@ -208,50 +250,6 @@ async def rebase_branch(branch: str, context: InfrahubContext, send_events: bool
208250
await event_service.send(event)
209251

210252

211-
@flow(name="branch-migrate", flow_run_name="Apply migrations to branch {branch}")
212-
async def migrate_branch(branch: str, context: InfrahubContext, send_events: bool = True) -> None:
213-
await add_tags(branches=[branch])
214-
215-
db = await get_database()
216-
log = get_run_logger()
217-
218-
obj = await Branch.get_by_name(db=db, name=branch)
219-
220-
if obj.graph_version == GRAPH_VERSION:
221-
log.info(f"Branch '{obj.name}' is up-to-date")
222-
return
223-
224-
migration_runner = MigrationRunner(branch=obj)
225-
if not migration_runner.has_migrations():
226-
log.info(f"No migrations detected for branch '{obj.name}'")
227-
return
228-
229-
# Branch status will remain as so if the migration process fails
230-
# This will help user to know that a branch is in an invalid state to be used properly and that actions need to be taken
231-
obj.status = BranchStatus.NEED_UPGRADE_REBASE
232-
await obj.save(db=db)
233-
234-
try:
235-
log.info(f"Running migrations for branch '{obj.name}'")
236-
await migration_runner.run(db=db)
237-
except MigrationFailureError as exc:
238-
log.error(f"Failed to run migrations for branch '{obj.name}': {exc.errors}")
239-
return
240-
241-
if obj.status == BranchStatus.NEED_UPGRADE_REBASE:
242-
obj.status = BranchStatus.OPEN
243-
obj.graph_version = GRAPH_VERSION
244-
await obj.save(db=db)
245-
246-
if send_events:
247-
event_service = await get_event_service()
248-
await event_service.send(
249-
BranchMigratedEvent(
250-
branch_name=obj.name, branch_id=str(obj.uuid), meta=EventMeta(branch=obj, context=context)
251-
)
252-
)
253-
254-
255253
@flow(name="branch-merge", flow_run_name="Merge branch {branch} into main")
256254
async def merge_branch(branch: str, context: InfrahubContext, proposed_change_id: str | None = None) -> None:
257255
database = await get_database()

0 commit comments

Comments
 (0)