@@ -410,11 +410,11 @@ async fn maybe_update_resource_setup<
410410async fn apply_changes_for_flow (
411411 write : & mut ( dyn std:: io:: Write + Send ) ,
412412 flow_ctx : & FlowContext ,
413- flow_status : & FlowSetupChange ,
413+ flow_setup_change : & FlowSetupChange ,
414414 existing_setup_state : & mut Option < setup:: FlowSetupState < setup:: ExistingMode > > ,
415415 pool : & PgPool ,
416416) -> Result < ( ) > {
417- let Some ( status) = flow_status . status else {
417+ let Some ( status) = flow_setup_change . status else {
418418 return Ok ( ( ) ) ;
419419 } ;
420420 let verb = match status {
@@ -428,7 +428,7 @@ async fn apply_changes_for_flow(
428428 let mut update_info =
429429 HashMap :: < db_metadata:: ResourceTypeKey , db_metadata:: StateUpdateInfo > :: new ( ) ;
430430
431- if let Some ( metadata_change) = & flow_status . metadata_change {
431+ if let Some ( metadata_change) = & flow_setup_change . metadata_change {
432432 update_info. insert (
433433 db_metadata:: ResourceTypeKey :: new (
434434 MetadataRecordType :: FlowMetadata . to_string ( ) ,
@@ -437,7 +437,7 @@ async fn apply_changes_for_flow(
437437 db_metadata:: StateUpdateInfo :: new ( metadata_change. desired_state ( ) , None ) ?,
438438 ) ;
439439 }
440- if let Some ( tracking_table) = & flow_status . tracking_table {
440+ if let Some ( tracking_table) = & flow_setup_change . tracking_table {
441441 if tracking_table
442442 . setup_change
443443 . as_ref ( )
@@ -454,7 +454,7 @@ async fn apply_changes_for_flow(
454454 }
455455 }
456456
457- for target_resource in & flow_status . target_resources {
457+ for target_resource in & flow_setup_change . target_resources {
458458 update_info. insert (
459459 db_metadata:: ResourceTypeKey :: new (
460460 MetadataRecordType :: Target ( target_resource. key . target_kind . clone ( ) ) . to_string ( ) ,
@@ -474,13 +474,13 @@ async fn apply_changes_for_flow(
474474
475475 let new_version_id = db_metadata:: stage_changes_for_flow (
476476 flow_ctx. flow_name ( ) ,
477- flow_status . seen_flow_metadata_version ,
477+ flow_setup_change . seen_flow_metadata_version ,
478478 & update_info,
479479 pool,
480480 )
481481 . await ?;
482482
483- if let Some ( tracking_table) = & flow_status . tracking_table {
483+ if let Some ( tracking_table) = & flow_setup_change . tracking_table {
484484 maybe_update_resource_setup (
485485 "tracking table" ,
486486 write,
@@ -491,7 +491,7 @@ async fn apply_changes_for_flow(
491491 }
492492
493493 let mut setup_change_by_target_kind = IndexMap :: < & str , Vec < _ > > :: new ( ) ;
494- for target_resource in & flow_status . target_resources {
494+ for target_resource in & flow_setup_change . target_resources {
495495 setup_change_by_target_kind
496496 . entry ( target_resource. key . target_kind . as_str ( ) )
497497 . or_default ( )
@@ -543,21 +543,21 @@ async fn apply_changes_for_flow(
543543 } ;
544544 let metadata = CombinedState :: from_change (
545545 existing_metadata,
546- flow_status
546+ flow_setup_change
547547 . metadata_change
548548 . as_ref ( )
549549 . map ( |v| v. desired_state ( ) ) ,
550550 ) ;
551551 let tracking_table = CombinedState :: from_change (
552552 existing_tracking_table,
553- flow_status . tracking_table . as_ref ( ) . map ( |c| {
553+ flow_setup_change . tracking_table . as_ref ( ) . map ( |c| {
554554 c. setup_change
555555 . as_ref ( )
556556 . and_then ( |c| c. desired_state . as_ref ( ) )
557557 } ) ,
558558 ) ;
559559 let mut targets = existing_targets;
560- for target_resource in & flow_status . target_resources {
560+ for target_resource in & flow_setup_change . target_resources {
561561 match & target_resource. state {
562562 Some ( state) => {
563563 targets. insert (
@@ -618,26 +618,6 @@ pub struct SetupChangeBundle {
618618}
619619
620620impl SetupChangeBundle {
621- async fn get_flow_setup_change < ' a > (
622- setup_ctx : & LibSetupContext ,
623- flow_ctx : & ' a FlowContext ,
624- flow_exec_ctx : & ' a FlowExecutionContext ,
625- action : & FlowSetupChangeAction ,
626- buffer : & ' a mut Option < FlowSetupChange > ,
627- ) -> Result < & ' a FlowSetupChange > {
628- let result = match action {
629- FlowSetupChangeAction :: Setup => & flow_exec_ctx. setup_change ,
630- FlowSetupChangeAction :: Drop => {
631- let existing_state = setup_ctx. all_setup_states . flows . get ( flow_ctx. flow_name ( ) ) ;
632- buffer. insert (
633- diff_flow_setup_states ( None , existing_state, & flow_ctx. flow . flow_instance_ctx )
634- . await ?,
635- )
636- }
637- } ;
638- Ok ( result)
639- }
640-
641621 pub async fn describe ( & self , lib_context : & LibContext ) -> Result < ( String , bool ) > {
642622 let mut text = String :: new ( ) ;
643623 let mut is_up_to_date = true ;
@@ -665,7 +645,7 @@ impl SetupChangeBundle {
665645 let flow_exec_ctx = flow_ctx. get_execution_ctx_for_setup ( ) . read ( ) . await ;
666646
667647 let mut setup_change_buffer = None ;
668- let setup_change = Self :: get_flow_setup_change (
648+ let setup_change = get_flow_setup_change (
669649 setup_ctx,
670650 & flow_ctx,
671651 & flow_exec_ctx,
@@ -715,40 +695,75 @@ impl SetupChangeBundle {
715695 . clone ( )
716696 } ;
717697 let mut flow_exec_ctx = flow_ctx. get_execution_ctx_for_setup ( ) . write ( ) . await ;
718-
719- let mut setup_change_buffer = None ;
720- let setup_change = Self :: get_flow_setup_change (
721- setup_ctx,
698+ apply_changes_for_flow_ctx (
699+ self . action ,
722700 & flow_ctx,
723- & flow_exec_ctx,
724- & self . action ,
725- & mut setup_change_buffer,
726- )
727- . await ?;
728- if setup_change. is_up_to_date ( ) {
729- continue ;
730- }
731-
732- let mut flow_states = setup_ctx. all_setup_states . flows . remove ( flow_name) ;
733- apply_changes_for_flow (
734- write,
735- & flow_ctx,
736- setup_change,
737- & mut flow_states,
701+ & mut flow_exec_ctx,
702+ setup_ctx,
738703 & persistence_ctx. builtin_db_pool ,
704+ write,
739705 )
740706 . await ?;
741-
742- flow_exec_ctx
743- . update_setup_state ( & flow_ctx. flow , flow_states. as_ref ( ) )
744- . await ?;
745- if let Some ( flow_states) = flow_states {
746- setup_ctx
747- . all_setup_states
748- . flows
749- . insert ( flow_name. to_string ( ) , flow_states) ;
750- }
751707 }
752708 Ok ( ( ) )
753709 }
754710}
711+
712+ async fn get_flow_setup_change < ' a > (
713+ setup_ctx : & LibSetupContext ,
714+ flow_ctx : & ' a FlowContext ,
715+ flow_exec_ctx : & ' a FlowExecutionContext ,
716+ action : & FlowSetupChangeAction ,
717+ buffer : & ' a mut Option < FlowSetupChange > ,
718+ ) -> Result < & ' a FlowSetupChange > {
719+ let result = match action {
720+ FlowSetupChangeAction :: Setup => & flow_exec_ctx. setup_change ,
721+ FlowSetupChangeAction :: Drop => {
722+ let existing_state = setup_ctx. all_setup_states . flows . get ( flow_ctx. flow_name ( ) ) ;
723+ buffer. insert (
724+ diff_flow_setup_states ( None , existing_state, & flow_ctx. flow . flow_instance_ctx )
725+ . await ?,
726+ )
727+ }
728+ } ;
729+ Ok ( result)
730+ }
731+
732+ pub ( crate ) async fn apply_changes_for_flow_ctx (
733+ action : FlowSetupChangeAction ,
734+ flow_ctx : & FlowContext ,
735+ flow_exec_ctx : & mut FlowExecutionContext ,
736+ setup_ctx : & mut LibSetupContext ,
737+ db_pool : & PgPool ,
738+ write : & mut ( dyn std:: io:: Write + Send ) ,
739+ ) -> Result < ( ) > {
740+ let mut setup_change_buffer = None ;
741+ let setup_change = get_flow_setup_change (
742+ setup_ctx,
743+ & flow_ctx,
744+ & flow_exec_ctx,
745+ & action,
746+ & mut setup_change_buffer,
747+ )
748+ . await ?;
749+ if setup_change. is_up_to_date ( ) {
750+ return Ok ( ( ) ) ;
751+ }
752+
753+ let mut flow_states = setup_ctx
754+ . all_setup_states
755+ . flows
756+ . remove ( flow_ctx. flow_name ( ) ) ;
757+ apply_changes_for_flow ( write, & flow_ctx, setup_change, & mut flow_states, db_pool) . await ?;
758+
759+ flow_exec_ctx
760+ . update_setup_state ( & flow_ctx. flow , flow_states. as_ref ( ) )
761+ . await ?;
762+ if let Some ( flow_states) = flow_states {
763+ setup_ctx
764+ . all_setup_states
765+ . flows
766+ . insert ( flow_ctx. flow_name ( ) . to_string ( ) , flow_states) ;
767+ }
768+ Ok ( ( ) )
769+ }
0 commit comments