1515 apply /3 ,
1616 state_enter /2 ,
1717 init_aux /1 ,
18- handle_aux /6 ,
18+ handle_aux /5 ,
1919 tick /2 ,
2020 version /0 ,
2121 which_module /1 ,
@@ -629,11 +629,19 @@ apply(#{machine_version := MachineVersion} = Meta, {down, Pid, Reason} = Cmd,
629629 return (Meta , State #? MODULE {streams = Streams0 ,
630630 monitors = Monitors1 }, ok , Effects0 )
631631 end ;
632- {sac , Monitors1 } ->
632+ {sac , Monitors1 } when MachineVersion < 5 orelse Reason =/= noconnection ->
633+ % % A connection went down, v5+ treats noconnection differently but
634+ % % v4- does not.
633635 Mod = sac_module (Meta ),
634636 {SacState1 , Effects } = Mod :handle_connection_down (Pid , SacState0 ),
635637 return (Meta , State #? MODULE {single_active_consumer = SacState1 ,
636- monitors = Monitors1 }, ok , Effects );
638+ monitors = Monitors1 }, ok , [Effects0 ++ Effects ]);
639+ {sac , Monitors1 } when Reason =:= noconnection ->
640+ % % the node of a connection got disconnected
641+ Mod = sac_module (Meta ),
642+ {SacState1 , Effects } = Mod :handle_connection_node_disconnected (Pid , SacState0 ),
643+ return (Meta , State #? MODULE {single_active_consumer = SacState1 ,
644+ monitors = Monitors1 }, ok , [Effects0 ++ Effects ]);
637645 error ->
638646 return (Meta , State , ok , Effects0 )
639647 end ;
@@ -687,9 +695,11 @@ apply(#{machine_version := MachineVersion} = Meta,
687695 _ ->
688696 return (Meta , State0 , stream_not_found , [])
689697 end ;
690- apply (Meta , {nodeup , Node } = Cmd ,
698+ apply (#{machine_version := MachineVersion } = Meta ,
699+ {nodeup , Node } = Cmd ,
691700 #? MODULE {monitors = Monitors0 ,
692- streams = Streams0 } = State ) ->
701+ streams = Streams0 ,
702+ single_active_consumer = Sac0 } = State ) ->
693703 % % reissue monitors for all disconnected members
694704 {Effects0 , Monitors } =
695705 maps :fold (
@@ -703,14 +713,23 @@ apply(Meta, {nodeup, Node} = Cmd,
703713 {Acc , Mon }
704714 end
705715 end , {[], Monitors0 }, Streams0 ),
706- {Streams , Effects } =
716+ {Streams , Effects1 } =
707717 maps :fold (fun (Id , S0 , {Ss , E0 }) ->
708718 S1 = update_stream (Meta , Cmd , S0 ),
709719 {S , E } = evaluate_stream (Meta , S1 , E0 ),
710720 {Ss #{Id => S }, E }
711721 end , {Streams0 , Effects0 }, Streams0 ),
722+
723+ {Sac1 , Effects2 } = case MachineVersion > 5 of
724+ true ->
725+ SacMod = sac_module (Meta ),
726+ SacMod :handle_node_reconnected (Sac0 , Effects1 );
727+ false ->
728+ {Sac0 , Effects1 }
729+ end ,
712730 return (Meta , State #? MODULE {monitors = Monitors ,
713- streams = Streams }, ok , Effects );
731+ streams = Streams ,
732+ single_active_consumer = Sac1 }, ok , Effects2 );
714733apply (Meta , {machine_version , From , To }, State0 ) ->
715734 rabbit_log :info (" Stream coordinator machine version changes from ~tp to ~tp , "
716735 ++ " applying incremental upgrade." , [From , To ]),
@@ -721,6 +740,12 @@ apply(Meta, {machine_version, From, To}, State0) ->
721740 {S1 , Eff0 ++ Eff1 }
722741 end , {State0 , []}, lists :seq (From , To - 1 )),
723742 return (Meta , State1 , ok , Effects );
743+ apply (Meta , {timeout , {sac , node_disconnected , #{connection_pid := Pid }}},
744+ #? MODULE {single_active_consumer = SacState0 } = State0 ) ->
745+ Mod = sac_module (Meta ),
746+ {SacState1 , Effects } = Mod :forget_connection (Pid , SacState0 ),
747+ return (Meta , State0 #? MODULE {single_active_consumer = SacState1 }, ok ,
748+ Effects );
724749apply (Meta , UnkCmd , State ) ->
725750 rabbit_log :debug (" ~ts : unknown command ~W " ,
726751 [? MODULE , UnkCmd , 10 ]),
@@ -787,7 +812,7 @@ members() ->
787812 end
788813 end .
789814
790- maybe_resize_coordinator_cluster () ->
815+ maybe_resize_coordinator_cluster (MachineVersion ) ->
791816 spawn (fun () ->
792817 RabbitIsRunning = rabbit :is_running (),
793818 case members () of
@@ -813,19 +838,38 @@ maybe_resize_coordinator_cluster() ->
813838 case MemberNodes -- RabbitNodes of
814839 [] ->
815840 ok ;
816- [Old | _ ] ->
841+ [Old | _ ] when length ( RabbitNodes ) > 0 ->
817842 % % this ought to be rather rare as the stream
818843 % % coordinator member is now removed as part
819844 % % of the forget_cluster_node command
820845 rabbit_log :info (" ~ts : Rabbit node(s) removed from the cluster, "
821846 " deleting: ~w " , [? MODULE , Old ]),
822847 remove_member (Leader , Members , Old )
823- end ;
824- _ ->
848+ end ,
849+ maybe_handle_stale_nodes (MemberNodes , RabbitNodes ,
850+ MachineVersion );
851+ _ ->
825852 ok
826853 end
827854 end ).
828855
856+ maybe_handle_stale_nodes (MemberNodes , ExpectedNodes ,
857+ MachineVersion ) when MachineVersion > 4 ->
858+ case MemberNodes -- ExpectedNodes of
859+ [] ->
860+ ok ;
861+ Stale when length (ExpectedNodes ) > 0 ->
862+ rabbit_log :debug (" Stale nodes detected in stream SAC "
863+ " coordinator: ~w . Purging state." ,
864+ [Stale ]),
865+ % % TODO SAC pipeline command to purge state from stale nodes
866+ ok ;
867+ _ ->
868+ ok
869+ end ;
870+ maybe_handle_stale_nodes (_ , _ , _ ) ->
871+ ok .
872+
829873add_member (Members , Node ) ->
830874 MinMacVersion = erpc :call (Node , ? MODULE , version , []),
831875 Conf = make_ra_conf (Node , [N || {_ , N } <- Members ], MinMacVersion ),
@@ -899,80 +943,78 @@ init_aux(_Name) ->
899943
900944% % TODO ensure the dead writer is restarted as a replica at some point in time, increasing timeout?
901945handle_aux (leader , _ , maybe_resize_coordinator_cluster ,
902- # aux {resizer = undefined } = Aux , LogState , _ ) ->
903- Pid = maybe_resize_coordinator_cluster (),
904- {no_reply , Aux # aux {resizer = Pid }, LogState , [{monitor , process , aux , Pid }]};
946+ # aux {resizer = undefined } = Aux , RaAux ) ->
947+ MachineVersion = ra_aux :effective_machine_version (RaAux ),
948+ Pid = maybe_resize_coordinator_cluster (MachineVersion ),
949+ {no_reply , Aux # aux {resizer = Pid }, RaAux , [{monitor , process , aux , Pid }]};
905950handle_aux (leader , _ , maybe_resize_coordinator_cluster ,
906- AuxState , LogState , _ ) ->
951+ AuxState , RaAux ) ->
907952 % % Coordinator resizing is still happening, let's ignore this tick event
908- {no_reply , AuxState , LogState };
953+ {no_reply , AuxState , RaAux };
909954handle_aux (leader , _ , {down , Pid , _ },
910- # aux {resizer = Pid } = Aux , LogState , _ ) ->
955+ # aux {resizer = Pid } = Aux , RaAux ) ->
911956 % % Coordinator resizing has finished
912- {no_reply , Aux # aux {resizer = undefined }, LogState };
957+ {no_reply , Aux # aux {resizer = undefined }, RaAux };
913958handle_aux (leader , _ , {start_writer , StreamId ,
914959 #{epoch := Epoch , node := Node } = Args , Conf },
915- Aux , LogState , _ ) ->
960+ Aux , RaAux ) ->
916961 rabbit_log :debug (" ~ts : running action: 'start_writer'"
917962 " for ~ts on node ~w in epoch ~b " ,
918963 [? MODULE , StreamId , Node , Epoch ]),
919964 ActionFun = phase_start_writer (StreamId , Args , Conf ),
920- run_action (starting , StreamId , Args , ActionFun , Aux , LogState );
965+ run_action (starting , StreamId , Args , ActionFun , Aux , RaAux );
921966handle_aux (leader , _ , {start_replica , StreamId ,
922967 #{epoch := Epoch , node := Node } = Args , Conf },
923- Aux , LogState , _ ) ->
968+ Aux , RaAux ) ->
924969 rabbit_log :debug (" ~ts : running action: 'start_replica'"
925970 " for ~ts on node ~w in epoch ~b " ,
926971 [? MODULE , StreamId , Node , Epoch ]),
927972 ActionFun = phase_start_replica (StreamId , Args , Conf ),
928- run_action (starting , StreamId , Args , ActionFun , Aux , LogState );
973+ run_action (starting , StreamId , Args , ActionFun , Aux , RaAux );
929974handle_aux (leader , _ , {stop , StreamId , #{node := Node ,
930975 epoch := Epoch } = Args , Conf },
931- Aux , LogState , _ ) ->
976+ Aux , RaAux ) ->
932977 rabbit_log :debug (" ~ts : running action: 'stop'"
933978 " for ~ts on node ~w in epoch ~b " ,
934979 [? MODULE , StreamId , Node , Epoch ]),
935980 ActionFun = phase_stop_member (StreamId , Args , Conf ),
936- run_action (stopping , StreamId , Args , ActionFun , Aux , LogState );
981+ run_action (stopping , StreamId , Args , ActionFun , Aux , RaAux );
937982handle_aux (leader , _ , {update_mnesia , StreamId , Args , Conf },
938- # aux {actions = _Monitors } = Aux , LogState ,
939- #? MODULE {streams = _Streams }) ->
983+ # aux {actions = _Monitors } = Aux , RaAux ) ->
940984 rabbit_log :debug (" ~ts : running action: 'update_mnesia'"
941985 " for ~ts " , [? MODULE , StreamId ]),
942986 ActionFun = phase_update_mnesia (StreamId , Args , Conf ),
943- run_action (updating_mnesia , StreamId , Args , ActionFun , Aux , LogState );
987+ run_action (updating_mnesia , StreamId , Args , ActionFun , Aux , RaAux );
944988handle_aux (leader , _ , {update_retention , StreamId , Args , _Conf },
945- # aux {actions = _Monitors } = Aux , LogState ,
946- #? MODULE {streams = _Streams }) ->
989+ # aux {actions = _Monitors } = Aux , RaAux ) ->
947990 rabbit_log :debug (" ~ts : running action: 'update_retention'"
948991 " for ~ts " , [? MODULE , StreamId ]),
949992 ActionFun = phase_update_retention (StreamId , Args ),
950- run_action (update_retention , StreamId , Args , ActionFun , Aux , LogState );
993+ run_action (update_retention , StreamId , Args , ActionFun , Aux , RaAux );
951994handle_aux (leader , _ , {delete_member , StreamId , #{node := Node } = Args , Conf },
952- # aux {actions = _Monitors } = Aux , LogState ,
953- #? MODULE {streams = _Streams }) ->
995+ # aux {actions = _Monitors } = Aux , RaAux ) ->
954996 rabbit_log :debug (" ~ts : running action: 'delete_member'"
955997 " for ~ts ~ts " , [? MODULE , StreamId , Node ]),
956998 ActionFun = phase_delete_member (StreamId , Args , Conf ),
957- run_action (delete_member , StreamId , Args , ActionFun , Aux , LogState );
999+ run_action (delete_member , StreamId , Args , ActionFun , Aux , RaAux );
9581000handle_aux (leader , _ , fail_active_actions ,
959- # aux {actions = Actions } = Aux , LogState ,
960- #? MODULE {streams = Streams }) ->
1001+ # aux {actions = Actions } = Aux , RaAux ) ->
9611002 % % this bit of code just creates an exclude map of currently running
9621003 % % tasks to avoid failing them, this could only really happen during
9631004 % % a leader flipflap
9641005 Exclude = maps :from_list ([{S , ok }
9651006 || {P , {S , _ , _ }} <- maps_to_list (Actions ),
9661007 is_process_alive (P )]),
9671008 rabbit_log :debug (" ~ts : failing actions: ~w " , [? MODULE , Exclude ]),
1009+ #? MODULE {streams = Streams } = ra_aux :machine_state (RaAux ),
9681010 fail_active_actions (Streams , Exclude ),
969- {no_reply , Aux , LogState , []};
1011+ {no_reply , Aux , RaAux , []};
9701012handle_aux (leader , _ , {down , Pid , normal },
971- # aux {actions = Monitors } = Aux , LogState , _ ) ->
1013+ # aux {actions = Monitors } = Aux , RaAux ) ->
9721014 % % action process finished normally, just remove from actions map
973- {no_reply , Aux # aux {actions = maps :remove (Pid , Monitors )}, LogState , []};
1015+ {no_reply , Aux # aux {actions = maps :remove (Pid , Monitors )}, RaAux , []};
9741016handle_aux (leader , _ , {down , Pid , Reason },
975- # aux {actions = Monitors0 } = Aux , LogState , _ ) ->
1017+ # aux {actions = Monitors0 } = Aux , RaAux ) ->
9761018 % % An action has failed - report back to the state machine
9771019 case maps :get (Pid , Monitors0 , undefined ) of
9781020 {StreamId , Action , #{node := Node , epoch := Epoch } = Args } ->
@@ -983,13 +1025,13 @@ handle_aux(leader, _, {down, Pid, Reason},
9831025 Cmd = {action_failed , StreamId , Args #{action => Action }},
9841026 send_self_command (Cmd ),
9851027 {no_reply , Aux # aux {actions = maps :remove (Pid , Monitors )},
986- LogState , []};
1028+ RaAux , []};
9871029 undefined ->
9881030 % % should this ever happen?
989- {no_reply , Aux , LogState , []}
1031+ {no_reply , Aux , RaAux , []}
9901032 end ;
991- handle_aux (_ , _ , _ , AuxState , LogState , _ ) ->
992- {no_reply , AuxState , LogState }.
1033+ handle_aux (_ , _ , _ , AuxState , RaAux ) ->
1034+ {no_reply , AuxState , RaAux }.
9931035
9941036overview (#? MODULE {streams = Streams ,
9951037 monitors = Monitors ,
@@ -1025,15 +1067,15 @@ stream_overview0(#stream{epoch = Epoch,
10251067
10261068run_action (Action , StreamId , #{node := _Node ,
10271069 epoch := _Epoch } = Args ,
1028- ActionFun , # aux {actions = Actions0 } = Aux , Log ) ->
1070+ ActionFun , # aux {actions = Actions0 } = Aux , RaAux ) ->
10291071 Coordinator = self (),
10301072 Pid = spawn_link (fun () ->
10311073 ActionFun (),
10321074 unlink (Coordinator )
10331075 end ),
10341076 Effects = [{monitor , process , aux , Pid }],
10351077 Actions = Actions0 #{Pid => {StreamId , Action , Args }},
1036- {no_reply , Aux # aux {actions = Actions }, Log , Effects }.
1078+ {no_reply , Aux # aux {actions = Actions }, RaAux , Effects }.
10371079
10381080wrap_reply (From , Reply ) ->
10391081 [{reply , From , {wrap_reply , Reply }}].
0 commit comments