Skip to content

Commit eb22a53

Browse files
committed
Use prefect flow for rebasing branches
1 parent 8c87fc3 commit eb22a53

File tree

2 files changed

+42
-36
lines changed

2 files changed

+42
-36
lines changed

backend/infrahub/core/branch/tasks.py

Lines changed: 24 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@
5050

5151

5252
@flow(name="branch-rebase", flow_run_name="Rebase branch {branch}")
53-
async def rebase_branch(branch: str, context: InfrahubContext) -> None: # noqa: PLR0915
53+
async def rebase_branch(branch: str, context: InfrahubContext, send_events: bool = True) -> None: # noqa: PLR0915
5454
database = await get_database()
5555
async with database.start_session() as db:
5656
log = get_run_logger()
@@ -166,30 +166,31 @@ async def rebase_branch(branch: str, context: InfrahubContext) -> None: # noqa:
166166
workflow=DIFF_REFRESH_ALL, context=context, parameters={"branch_name": obj.name}
167167
)
168168

169-
# -------------------------------------------------------------
170-
# Generate an event to indicate that a branch has been rebased
171-
# -------------------------------------------------------------
172-
rebase_event = BranchRebasedEvent(
173-
branch_name=obj.name, branch_id=str(obj.uuid), meta=EventMeta(branch=obj, context=context)
174-
)
175-
events: list[InfrahubEvent] = [rebase_event]
176-
changelog_collector = DiffChangelogCollector(
177-
diff=default_branch_diff, branch=obj, db=db, migration_tracker=MigrationTracker(migrations=migrations)
178-
)
179-
for action, node_changelog in changelog_collector.collect_changelogs():
180-
node_event_class = get_node_event(MutationAction.from_diff_action(diff_action=action))
181-
mutate_event = node_event_class(
182-
kind=node_changelog.node_kind,
183-
node_id=node_changelog.node_id,
184-
changelog=node_changelog,
185-
fields=node_changelog.updated_fields,
186-
meta=EventMeta.from_parent(parent=rebase_event, branch=obj),
169+
if send_events:
170+
# -------------------------------------------------------------
171+
# Generate an event to indicate that a branch has been rebased
172+
# -------------------------------------------------------------
173+
rebase_event = BranchRebasedEvent(
174+
branch_name=obj.name, branch_id=str(obj.uuid), meta=EventMeta(branch=obj, context=context)
187175
)
188-
events.append(mutate_event)
176+
events: list[InfrahubEvent] = [rebase_event]
177+
changelog_collector = DiffChangelogCollector(
178+
diff=default_branch_diff, branch=obj, db=db, migration_tracker=MigrationTracker(migrations=migrations)
179+
)
180+
for action, node_changelog in changelog_collector.collect_changelogs():
181+
node_event_class = get_node_event(MutationAction.from_diff_action(diff_action=action))
182+
mutate_event = node_event_class(
183+
kind=node_changelog.node_kind,
184+
node_id=node_changelog.node_id,
185+
changelog=node_changelog,
186+
fields=node_changelog.updated_fields,
187+
meta=EventMeta.from_parent(parent=rebase_event, branch=obj),
188+
)
189+
events.append(mutate_event)
189190

190-
event_service = await get_event_service()
191-
for event in events:
192-
await event_service.send(event)
191+
event_service = await get_event_service()
192+
for event in events:
193+
await event_service.send(event)
193194

194195

195196
@flow(name="branch-merge", flow_run_name="Merge branch {branch} into main")

backend/infrahub/core/migrations/shared.py

Lines changed: 18 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@
66
from rich.console import Console
77
from typing_extensions import Self
88

9+
from infrahub.auth import AccountSession, AuthType
10+
from infrahub.context import InfrahubContext
911
from infrahub.core import registry
1012
from infrahub.core.branch.enums import BranchStatus
1113
from infrahub.core.path import SchemaPath # noqa: TC001
@@ -18,6 +20,7 @@
1820
internal_schema,
1921
)
2022
from infrahub.core.timestamp import Timestamp
23+
from infrahub.exceptions import ValidationError
2124

2225
from .query import MigrationBaseQuery # noqa: TC001
2326

@@ -242,24 +245,26 @@ def init(cls, **kwargs: dict[str, Any]) -> Self:
242245
return cls(**kwargs) # type: ignore[arg-type]
243246

244247
async def rebase_branch(self, db: InfrahubDatabase, branch: Branch) -> bool:
245-
# Circular deps if import inside import block
246-
# from infrahub.core.branch.tasks import rebase_branch
247-
# await rebase_branch(
248-
# branch=branch.name,
249-
# context=InfrahubContext.init(
250-
# branch=branch,
251-
# # FIXME: or superuser account?
252-
# account=AccountSession(auth_type=AuthType.NONE, authenticated=False, account_id=""),
253-
# ),
254-
# )
248+
# Circular deps if imported inside import block
249+
from infrahub.core.branch.tasks import rebase_branch
250+
255251
# NOTE: need service/bus component up and running to use prefect flow
256252
console = Console()
257253
console.print(f"Rebasing branch '{branch.name}' (ID: {branch.uuid})...", end="")
258254
try:
259-
await branch.rebase(db=db)
255+
# await branch.rebase(db=db)
256+
await rebase_branch(
257+
branch=branch.name,
258+
context=InfrahubContext.init(
259+
branch=branch,
260+
# FIXME: or superuser account?
261+
account=AccountSession(auth_type=AuthType.NONE, authenticated=False, account_id=""),
262+
),
263+
send_events=False,
264+
)
260265
console.print("done")
261266
branch.graph_version = self.minimum_version + 1
262-
# NOTE: Narrow to more accurate exception
267+
except ValidationError:
263268
console.print("failed")
264269
branch.status = BranchStatus.NEED_UPGRADE_REBASE
265270
return False
@@ -280,7 +285,7 @@ async def execute_against_branches(self, db: InfrahubDatabase, branches: Sequenc
280285
for branch in branches:
281286
if not await self.rebase_branch(db=db, branch=branch):
282287
result.errors.append(f"Failed to rebase branch '{branch.name}' ({branch.uuid})")
283-
return result
288+
continue
284289

285290
r = await self.execute_against_branch(db=db, branch=branch)
286291
result.nbr_migrations_executed += 1

0 commit comments

Comments
 (0)