Skip to content

Commit be32638

Browse files
committed
Use nodes from SAC state to detect stale nodes
1 parent 549573d commit be32638

File tree

3 files changed

+399
-337
lines changed

3 files changed

+399
-337
lines changed

deps/rabbit/src/rabbit_stream_coordinator.erl

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -837,7 +837,7 @@ members() ->
837837
end
838838
end.
839839

840-
maybe_resize_coordinator_cluster(LeaderPid, MachineVersion) ->
840+
maybe_resize_coordinator_cluster(LeaderPid, SacNodes, MachineVersion) ->
841841
spawn(fun() ->
842842
RabbitIsRunning = rabbit:is_running(),
843843
case members() of
@@ -872,20 +872,20 @@ maybe_resize_coordinator_cluster(LeaderPid, MachineVersion) ->
872872
"deleting: ~w", [?MODULE, Old]),
873873
remove_member(Leader, Members, Old)
874874
end,
875-
maybe_handle_stale_nodes(MemberNodes, RabbitNodes,
875+
maybe_handle_stale_nodes(SacNodes, RabbitNodes,
876876
LeaderPid,
877877
MachineVersion);
878878
_ ->
879879
ok
880880
end
881881
end).
882882

883-
maybe_handle_stale_nodes(MemberNodes, ExpectedNodes,
883+
maybe_handle_stale_nodes(SacNodes, BrokerNodes,
884884
LeaderPid, MachineVersion) when MachineVersion > 4 ->
885-
case MemberNodes -- ExpectedNodes of
885+
case SacNodes -- BrokerNodes of
886886
[] ->
887887
ok;
888-
Stale when length(ExpectedNodes) > 0 ->
888+
Stale when length(BrokerNodes) > 0 ->
889889
rabbit_log:debug("Stale nodes detected in stream SAC "
890890
"coordinator: ~w. Purging state.",
891891
[Stale]),
@@ -981,7 +981,8 @@ handle_aux(leader, _, maybe_resize_coordinator_cluster,
981981
#aux{resizer = undefined} = Aux, RaAux) ->
982982
Leader = ra_aux:leader_id(RaAux),
983983
MachineVersion = ra_aux:effective_machine_version(RaAux),
984-
Pid = maybe_resize_coordinator_cluster(Leader, MachineVersion),
984+
SacNodes = sac_list_nodes(ra_aux:machine_state(RaAux), MachineVersion),
985+
Pid = maybe_resize_coordinator_cluster(Leader, SacNodes, MachineVersion),
985986
{no_reply, Aux#aux{resizer = Pid}, RaAux, [{monitor, process, aux, Pid}]};
986987
handle_aux(leader, _, maybe_resize_coordinator_cluster,
987988
AuxState, RaAux) ->
@@ -2451,3 +2452,8 @@ sac_make_update_conf(Conf) ->
24512452

24522453
sac_check_conf_change(SacState) ->
24532454
rabbit_stream_sac_coordinator:check_conf_change(SacState).
2455+
2456+
sac_list_nodes(State, MachineVersion) when MachineVersion > 4 ->
2457+
rabbit_stream_sac_coordinator:list_nodes(sac_state(State));
2458+
sac_list_nodes(_, _) ->
2459+
[].

deps/rabbit/src/rabbit_stream_sac_coordinator.erl

Lines changed: 20 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,8 @@
4949
group_consumers/5,
5050
overview/1,
5151
import_state/2,
52-
check_conf_change/1]).
52+
check_conf_change/1,
53+
list_nodes/1]).
5354
-export([make_purge_nodes/1,
5455
make_update_conf/1]).
5556

@@ -187,7 +188,7 @@ group_consumers(VirtualHost, Stream, Reference, InfoKeys) ->
187188
{timeout, _} -> {error, timeout}
188189
end.
189190

190-
-spec overview(state()) -> map().
191+
-spec overview(state() | undefined) -> map() | undefined.
191192
overview(undefined) ->
192193
undefined;
193194
overview(#?MODULE{groups = Groups}) ->
@@ -862,7 +863,7 @@ import_state(4, #{<<"groups">> := Groups, <<"pids_groups">> := PidsGroups}) ->
862863
conf = #{disconnected_timeout => ?DISCONNECTED_TIMEOUT_MS}}.
863864

864865
-spec check_conf_change(state() | term()) -> {new, conf()} | unchanged.
865-
check_conf_change(State) when is_record(State, ?MODULE)->
866+
check_conf_change(State) when is_record(State, ?MODULE) ->
866867
#?MODULE{conf = Conf} = State,
867868
DisconTimeout = lookup_disconnected_timeout(),
868869
case Conf of
@@ -877,7 +878,22 @@ check_conf_change(State) when is_record(State, ?MODULE)->
877878
check_conf_change(_) ->
878879
unchanged.
879880

880-
- spec make_purge_nodes([node()]) -> {sac, command()}.
881+
-spec list_nodes(state()) -> [node()].
882+
list_nodes(#?MODULE{groups = Groups}) ->
883+
Nodes = maps:fold(fun(_, G, Acc) ->
884+
GNodes = nodes_from_group(G),
885+
maps:merge(Acc, GNodes)
886+
end, #{}, Groups),
887+
maps:keys(Nodes).
888+
889+
nodes_from_group(#group{consumers = Cs}) when is_list(Cs) ->
890+
lists:foldl(fun(#consumer{pid = Pid}, Acc) ->
891+
Acc#{node(Pid) => true}
892+
end, #{}, Cs);
893+
nodes_from_group(_) ->
894+
#{}.
895+
896+
-spec make_purge_nodes([node()]) -> {sac, command()}.
881897
make_purge_nodes(Nodes) ->
882898
wrap_cmd(#command_purge_nodes{nodes = Nodes}).
883899

0 commit comments

Comments
 (0)