@@ -710,8 +710,7 @@ apply(#{machine_version := Vsn} = Meta,
710710 _ ->
711711 return (Meta , State0 , stream_not_found , [])
712712 end ;
713- apply (#{machine_version := Vsn } = Meta ,
714- {nodeup , Node } = Cmd ,
713+ apply (Meta , {nodeup , Node } = Cmd ,
715714 #? MODULE {monitors = Monitors0 ,
716715 streams = Streams0 ,
717716 single_active_consumer = Sac0 } = State ) ->
@@ -735,14 +734,8 @@ apply(#{machine_version := Vsn} = Meta,
735734 {Ss #{Id => S }, E }
736735 end , {Streams0 , Effects0 }, Streams0 ),
737736
738- {Sac1 , Effects2 } = case ? V5_OR_MORE (Vsn ) of
739- true ->
740- SacMod = sac_module (Meta ),
741- SacMod :handle_node_reconnected (Node ,
742- Sac0 , Effects1 );
743- false ->
744- {Sac0 , Effects1 }
745- end ,
737+
738+ {Sac1 , Effects2 } = sac_handle_node_reconnected (Meta , Node , Sac0 , Effects1 ),
746739 return (Meta , State #? MODULE {monitors = Monitors ,
747740 streams = Streams ,
748741 single_active_consumer = Sac1 }, ok , Effects2 );
@@ -2444,6 +2437,17 @@ sac_handle_connection_down(SacState, Pid, Reason, Vsn) when ?V5_OR_MORE(Vsn) ->
24442437sac_handle_connection_down (SacState , Pid , _Reason , _Vsn ) ->
24452438 ? SAC_V4 :handle_connection_down (Pid , SacState ).
24462439
2440+ sac_handle_node_reconnected (#{machine_version := Vsn } = Meta , Node ,
2441+ Sac , Effects ) ->
2442+ case ? V5_OR_MORE (Vsn ) of
2443+ true ->
2444+ SacMod = sac_module (Meta ),
2445+ SacMod :handle_node_reconnected (Node ,
2446+ Sac , Effects );
2447+ false ->
2448+ {Sac , Effects }
2449+ end .
2450+
24472451sac_make_purge_nodes (Nodes ) ->
24482452 rabbit_stream_sac_coordinator :make_purge_nodes (Nodes ).
24492453
0 commit comments