|
60 | 60 | from infrahub.workflows.utils import add_tags |
61 | 61 |
|
62 | 62 |
|
| 63 | +@flow(name="branch-migrate", flow_run_name="Apply migrations to branch {branch}") |
| 64 | +async def migrate_branch(branch: str, context: InfrahubContext, send_events: bool = True) -> None: |
| 65 | + await add_tags(branches=[branch]) |
| 66 | + |
| 67 | + db = await get_database() |
| 68 | + log = get_run_logger() |
| 69 | + |
| 70 | + obj = await Branch.get_by_name(db=db, name=branch) |
| 71 | + |
| 72 | + if obj.graph_version == GRAPH_VERSION: |
| 73 | + log.info(f"Branch '{obj.name}' is up-to-date") |
| 74 | + return |
| 75 | + |
| 76 | + migration_runner = MigrationRunner(branch=obj) |
| 77 | + if not migration_runner.has_migrations(): |
| 78 | + log.info(f"No migrations detected for branch '{obj.name}'") |
| 79 | + return |
| 80 | + |
| 81 | + # Branch status will remain as so if the migration process fails |
| 82 | + # This will help user to know that a branch is in an invalid state to be used properly and that actions need to be taken |
| 83 | + obj.status = BranchStatus.NEED_UPGRADE_REBASE |
| 84 | + await obj.save(db=db) |
| 85 | + |
| 86 | + try: |
| 87 | + log.info(f"Running migrations for branch '{obj.name}'") |
| 88 | + await migration_runner.run(db=db) |
| 89 | + except MigrationFailureError as exc: |
| 90 | + log.error(f"Failed to run migrations for branch '{obj.name}': {exc.errors}") |
| 91 | + return |
| 92 | + |
| 93 | + if obj.status == BranchStatus.NEED_UPGRADE_REBASE: |
| 94 | + obj.status = BranchStatus.OPEN |
| 95 | + obj.graph_version = GRAPH_VERSION |
| 96 | + await obj.save(db=db) |
| 97 | + |
| 98 | + if send_events: |
| 99 | + event_service = await get_event_service() |
| 100 | + await event_service.send( |
| 101 | + BranchMigratedEvent( |
| 102 | + branch_name=obj.name, branch_id=str(obj.uuid), meta=EventMeta(branch=obj, context=context) |
| 103 | + ) |
| 104 | + ) |
| 105 | + |
| 106 | + |
63 | 107 | @flow(name="branch-rebase", flow_run_name="Rebase branch {branch}") |
64 | 108 | async def rebase_branch(branch: str, context: InfrahubContext, send_events: bool = True) -> None: # noqa: PLR0915 |
65 | 109 | workflow = get_workflow() |
@@ -174,9 +218,7 @@ async def rebase_branch(branch: str, context: InfrahubContext, send_events: bool |
174 | 218 | parameters={"branch": obj.name, "ipam_node_details": ipam_node_details}, |
175 | 219 | ) |
176 | 220 |
|
177 | | - await workflow.submit_workflow( |
178 | | - workflow=BRANCH_MIGRATE, context=context, parameters={"branch": obj.name, "send_events": send_events} |
179 | | - ) |
| 221 | + await migrate_branch(branch=branch, context=context, send_events=send_events) |
180 | 222 | await workflow.submit_workflow(workflow=DIFF_REFRESH_ALL, context=context, parameters={"branch_name": obj.name}) |
181 | 223 |
|
182 | 224 | if not send_events: |
@@ -208,50 +250,6 @@ async def rebase_branch(branch: str, context: InfrahubContext, send_events: bool |
208 | 250 | await event_service.send(event) |
209 | 251 |
|
210 | 252 |
|
211 | | -@flow(name="branch-migrate", flow_run_name="Apply migrations to branch {branch}") |
212 | | -async def migrate_branch(branch: str, context: InfrahubContext, send_events: bool = True) -> None: |
213 | | - await add_tags(branches=[branch]) |
214 | | - |
215 | | - db = await get_database() |
216 | | - log = get_run_logger() |
217 | | - |
218 | | - obj = await Branch.get_by_name(db=db, name=branch) |
219 | | - |
220 | | - if obj.graph_version == GRAPH_VERSION: |
221 | | - log.info(f"Branch '{obj.name}' is up-to-date") |
222 | | - return |
223 | | - |
224 | | - migration_runner = MigrationRunner(branch=obj) |
225 | | - if not migration_runner.has_migrations(): |
226 | | - log.info(f"No migrations detected for branch '{obj.name}'") |
227 | | - return |
228 | | - |
229 | | - # Branch status will remain as so if the migration process fails |
230 | | - # This will help user to know that a branch is in an invalid state to be used properly and that actions need to be taken |
231 | | - obj.status = BranchStatus.NEED_UPGRADE_REBASE |
232 | | - await obj.save(db=db) |
233 | | - |
234 | | - try: |
235 | | - log.info(f"Running migrations for branch '{obj.name}'") |
236 | | - await migration_runner.run(db=db) |
237 | | - except MigrationFailureError as exc: |
238 | | - log.error(f"Failed to run migrations for branch '{obj.name}': {exc.errors}") |
239 | | - return |
240 | | - |
241 | | - if obj.status == BranchStatus.NEED_UPGRADE_REBASE: |
242 | | - obj.status = BranchStatus.OPEN |
243 | | - obj.graph_version = GRAPH_VERSION |
244 | | - await obj.save(db=db) |
245 | | - |
246 | | - if send_events: |
247 | | - event_service = await get_event_service() |
248 | | - await event_service.send( |
249 | | - BranchMigratedEvent( |
250 | | - branch_name=obj.name, branch_id=str(obj.uuid), meta=EventMeta(branch=obj, context=context) |
251 | | - ) |
252 | | - ) |
253 | | - |
254 | | - |
255 | 253 | @flow(name="branch-merge", flow_run_name="Merge branch {branch} into main") |
256 | 254 | async def merge_branch(branch: str, context: InfrahubContext, proposed_change_id: str | None = None) -> None: |
257 | 255 | database = await get_database() |
|
0 commit comments