1212from  infrahub .context  import  InfrahubContext   # noqa: TC001  needed for prefect flow 
1313from  infrahub .core  import  registry 
1414from  infrahub .core .branch  import  Branch 
15+ from  infrahub .core .branch .enums  import  BranchStatus 
1516from  infrahub .core .changelog .diff  import  DiffChangelogCollector , MigrationTracker 
1617from  infrahub .core .constants  import  MutationAction 
1718from  infrahub .core .diff .coordinator  import  DiffCoordinator 
2122from  infrahub .core .diff .model .path  import  BranchTrackingId , EnrichedDiffRoot , EnrichedDiffRootMetadata 
2223from  infrahub .core .diff .models  import  RequestDiffUpdate 
2324from  infrahub .core .diff .repository .repository  import  DiffRepository 
25+ from  infrahub .core .graph  import  GRAPH_VERSION 
2426from  infrahub .core .merge  import  BranchMerger 
27+ from  infrahub .core .migrations .exceptions  import  MigrationFailureError 
28+ from  infrahub .core .migrations .runner  import  MigrationRunner 
2529from  infrahub .core .migrations .schema .models  import  SchemaApplyMigrationData 
2630from  infrahub .core .migrations .schema .tasks  import  schema_apply_migrations 
2731from  infrahub .core .timestamp  import  Timestamp 
3943from  infrahub .workflows .catalogue  import  (
4044    BRANCH_CANCEL_PROPOSED_CHANGES ,
4145    BRANCH_MERGE_POST_PROCESS ,
46+     BRANCH_MIGRATE ,
4247    DIFF_REFRESH_ALL ,
4348    DIFF_UPDATE ,
4449    GIT_REPOSITORIES_CREATE_BRANCH ,
5156
5257@flow (name = "branch-rebase" , flow_run_name = "Rebase branch {branch}" ) 
5358async  def  rebase_branch (branch : str , context : InfrahubContext , send_events : bool  =  True ) ->  None :  # noqa: PLR0915 
59+     workflow  =  get_workflow ()
5460    database  =  await  get_database ()
5561    async  with  database .start_session () as  db :
5662        log  =  get_run_logger ()
@@ -69,7 +75,7 @@ async def rebase_branch(branch: str, context: InfrahubContext, send_events: bool
6975            diff_repository = diff_repository ,
7076            source_branch = obj ,
7177            diff_locker = DiffLocker (),
72-             workflow = get_workflow () ,
78+             workflow = workflow ,
7379        )
7480
7581        enriched_diff_metadata  =  await  diff_coordinator .update_branch_diff (base_branch = base_branch , diff_branch = obj )
@@ -156,41 +162,75 @@ async def rebase_branch(branch: str, context: InfrahubContext, send_events: bool
156162            target_branch_name = registry .default_branch ,
157163        )
158164        if  ipam_node_details :
159-             await  get_workflow () .submit_workflow (
165+             await  workflow .submit_workflow (
160166                workflow = IPAM_RECONCILIATION ,
161167                context = context ,
162168                parameters = {"branch" : obj .name , "ipam_node_details" : ipam_node_details },
163169            )
164170
165-     await  get_workflow ().submit_workflow (
166-         workflow = DIFF_REFRESH_ALL , context = context , parameters = {"branch_name" : obj .name }
167-     )
171+     await  workflow .submit_workflow (workflow = DIFF_REFRESH_ALL , context = context , parameters = {"branch_name" : obj .name })
172+     await  workflow .submit_workflow (workflow = BRANCH_MIGRATE , context = context , parameters = {"branch_name" : obj .name })
168173
169-     if  send_events :
170-         # ------------------------------------------------------------- 
171-         # Generate an event to indicate that a branch has been rebased 
172-         # ------------------------------------------------------------- 
173-         rebase_event  =  BranchRebasedEvent (
174-             branch_name = obj .name , branch_id = str (obj .uuid ), meta = EventMeta (branch = obj , context = context )
175-         )
176-         events : list [InfrahubEvent ] =  [rebase_event ]
177-         changelog_collector  =  DiffChangelogCollector (
178-             diff = default_branch_diff , branch = obj , db = db , migration_tracker = MigrationTracker (migrations = migrations )
174+     if  not  send_events :
175+         return 
176+ 
177+     # ------------------------------------------------------------- 
178+     # Generate an event to indicate that a branch has been rebased 
179+     # ------------------------------------------------------------- 
180+     rebase_event  =  BranchRebasedEvent (
181+         branch_name = obj .name , branch_id = str (obj .uuid ), meta = EventMeta (branch = obj , context = context )
182+     )
183+     events : list [InfrahubEvent ] =  [rebase_event ]
184+     changelog_collector  =  DiffChangelogCollector (
185+         diff = default_branch_diff , branch = obj , db = db , migration_tracker = MigrationTracker (migrations = migrations )
186+     )
187+     for  action , node_changelog  in  changelog_collector .collect_changelogs ():
188+         node_event_class  =  get_node_event (MutationAction .from_diff_action (diff_action = action ))
189+         mutate_event  =  node_event_class (
190+             kind = node_changelog .node_kind ,
191+             node_id = node_changelog .node_id ,
192+             changelog = node_changelog ,
193+             fields = node_changelog .updated_fields ,
194+             meta = EventMeta .from_parent (parent = rebase_event , branch = obj ),
179195        )
180-         for  action , node_changelog  in  changelog_collector .collect_changelogs ():
181-             node_event_class  =  get_node_event (MutationAction .from_diff_action (diff_action = action ))
182-             mutate_event  =  node_event_class (
183-                 kind = node_changelog .node_kind ,
184-                 node_id = node_changelog .node_id ,
185-                 changelog = node_changelog ,
186-                 fields = node_changelog .updated_fields ,
187-                 meta = EventMeta .from_parent (parent = rebase_event , branch = obj ),
188-             )
189-             events .append (mutate_event )
196+         events .append (mutate_event )
190197
191-         event_service  =  await  get_event_service ()
192-         for  event  in  events :
193-             await  event_service .send (event )
198+     event_service  =  await  get_event_service ()
199+     for  event  in  events :
200+         await  event_service .send (event )
201+ 
202+ 
203+ @flow (name = "migrate_branch" , flow_run_name = "Apply migrations to branch {branch}" ) 
204+ async  def  migrate_branch (branch_name : str , context : InfrahubContext ) ->  None :  # noqa: ARG001 
205+     db  =  await  get_database ()
206+     log  =  get_run_logger ()
207+ 
208+     branch  =  await  registry .get_branch (db = db , branch = branch_name )
209+ 
210+     if  branch .graph_version  ==  GRAPH_VERSION :
211+         log .info (f"Branch '{ branch .name }  ' is up-to-date" )
212+         return 
213+ 
214+     migration_runner  =  MigrationRunner (branch = branch )
215+     if  not  migration_runner .has_migrations ():
216+         log .info (f"No migrations detected for branch '{ branch .name }  '" )
217+         return 
218+ 
219+     # Branch status will remain as so if the migration process fails 
220+     # This will help user to know that a branch is in an invalid state to be used properly and that actions need to be taken 
221+     branch .status  =  BranchStatus .NEED_UPGRADE_REBASE 
222+     await  branch .save (db = db )
223+ 
224+     try :
225+         await  migration_runner .run (db = db )
226+     except  MigrationFailureError  as  exc :
227+         log .error (f"Failed to migrate branch '{ branch .name }  ': { exc .errors }  " )
228+         return 
229+ 
230+     if  branch .status  ==  BranchStatus .NEED_UPGRADE_REBASE :
231+         branch .status  =  BranchStatus .OPEN 
232+     branch .graph_version  =  GRAPH_VERSION 
233+     await  branch .save (db = db )
194234
195235
196236@flow (name = "branch-merge" , flow_run_name = "Merge branch {branch} into main" ) 
0 commit comments