From b4f7d468425f7cc8ff7bb89d0fcf88d6bd32f8fe Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arnaud=20Cogolu=C3=A8gnes?= <514737+acogoluegnes@users.noreply.github.com> Date: Mon, 23 Jun 2025 10:16:37 +0200 Subject: [PATCH] Miscellaneous minor improvements in stream SAC coordinator This commit handles edge cases in the stream SAC coordinator to make sure it does not crash during execution. Most of these edge cases consist in an inconsistent state, so there are very unlikely to happen. This commit also makes sure there is no duplicate in the consumer list of a group. Consumers are also now identified only by their connection PID and their subscription ID, as now the timestamp they contain in their state does not allow a field-by-field comparison. --- deps/rabbit/src/rabbit_stream_coordinator.erl | 24 +- .../src/rabbit_stream_sac_coordinator.erl | 585 +++++++++--------- .../rabbit_stream_sac_coordinator_SUITE.erl | 9 +- 3 files changed, 304 insertions(+), 314 deletions(-) diff --git a/deps/rabbit/src/rabbit_stream_coordinator.erl b/deps/rabbit/src/rabbit_stream_coordinator.erl index f7d26d014ba6..f910a1880337 100644 --- a/deps/rabbit/src/rabbit_stream_coordinator.erl +++ b/deps/rabbit/src/rabbit_stream_coordinator.erl @@ -710,8 +710,7 @@ apply(#{machine_version := Vsn} = Meta, _ -> return(Meta, State0, stream_not_found, []) end; -apply(#{machine_version := Vsn} = Meta, - {nodeup, Node} = Cmd, +apply(Meta, {nodeup, Node} = Cmd, #?MODULE{monitors = Monitors0, streams = Streams0, single_active_consumer = Sac0} = State) -> @@ -735,14 +734,8 @@ apply(#{machine_version := Vsn} = Meta, {Ss#{Id => S}, E} end, {Streams0, Effects0}, Streams0), - {Sac1, Effects2} = case ?V5_OR_MORE(Vsn) of - true -> - SacMod = sac_module(Meta), - SacMod:handle_node_reconnected(Node, - Sac0, Effects1); - false -> - {Sac0, Effects1} - end, + + {Sac1, Effects2} = sac_handle_node_reconnected(Meta, Node, Sac0, Effects1), return(Meta, State#?MODULE{monitors = Monitors, streams = Streams, single_active_consumer = Sac1}, ok, Effects2); @@ -2444,6 +2437,17 @@ sac_handle_connection_down(SacState, Pid, Reason, Vsn) when ?V5_OR_MORE(Vsn) -> sac_handle_connection_down(SacState, Pid, _Reason, _Vsn) -> ?SAC_V4:handle_connection_down(Pid, SacState). +sac_handle_node_reconnected(#{machine_version := Vsn} = Meta, Node, + Sac, Effects) -> + case ?V5_OR_MORE(Vsn) of + true -> + SacMod = sac_module(Meta), + SacMod:handle_node_reconnected(Node, + Sac, Effects); + false -> + {Sac, Effects} + end. + sac_make_purge_nodes(Nodes) -> rabbit_stream_sac_coordinator:make_purge_nodes(Nodes). diff --git a/deps/rabbit/src/rabbit_stream_sac_coordinator.erl b/deps/rabbit/src/rabbit_stream_sac_coordinator.erl index 00b7fb5dde3e..5f9ceec14449 100644 --- a/deps/rabbit/src/rabbit_stream_sac_coordinator.erl +++ b/deps/rabbit/src/rabbit_stream_sac_coordinator.erl @@ -83,6 +83,11 @@ -define(DISCONNECTED_TIMEOUT_MS, 60_000). -define(SAC_ERRORS, [partition_index_conflict, not_found]). -define(IS_STATE_REC(T), is_record(T, ?MODULE)). +-define(IS_GROUP_REC(T), is_record(T, group)). +-define(SAME_CSR(C1, C2), + (is_record(C1, consumer) andalso is_record(C2, consumer) andalso + C1#consumer.pid =:= C2#consumer.pid andalso + C1#consumer.subscription_id =:= C2#consumer.subscription_id)). %% Single Active Consumer API -spec register_consumer(binary(), @@ -132,6 +137,7 @@ activate_consumer(VH, Stream, Name) -> stream = Stream, consumer_name= Name}). +%% called by a stream connection to inform it is still alive -spec connection_reconnected(connection_pid()) -> ok | {error, sac_error() | term()}. connection_reconnected(Pid) -> @@ -228,10 +234,10 @@ apply(#command_register_consumer{vhost = VirtualHost, subscription_id = SubscriptionId}, #?MODULE{groups = StreamGroups0} = State) -> case maybe_create_group(VirtualHost, - Stream, - PartitionIndex, - ConsumerName, - StreamGroups0) of + Stream, + PartitionIndex, + ConsumerName, + StreamGroups0) of {ok, StreamGroups1} -> do_register_consumer(VirtualHost, Stream, @@ -256,8 +262,7 @@ apply(#command_unregister_consumer{vhost = VirtualHost, {State0, []}; Group0 -> {Group1, Effects} = - case lookup_consumer(ConnectionPid, SubscriptionId, Group0) - of + case lookup_consumer(ConnectionPid, SubscriptionId, Group0) of {value, Consumer} -> G1 = remove_from_group(Consumer, Group0), handle_consumer_removal( @@ -274,27 +279,24 @@ apply(#command_unregister_consumer{vhost = VirtualHost, {State0#?MODULE{groups = SGS}, Effects} end, {State1, ok, Effects1}; -apply(#command_activate_consumer{vhost = VirtualHost, - stream = Stream, - consumer_name = ConsumerName}, +apply(#command_activate_consumer{vhost = VH, stream = S, consumer_name = Name}, #?MODULE{groups = StreamGroups0} = State0) -> {G, Eff} = - case lookup_group(VirtualHost, Stream, ConsumerName, StreamGroups0) of + case lookup_group(VH, S, Name, StreamGroups0) of undefined -> rabbit_log:warning("Trying to activate consumer in group ~tp, but " "the group does not longer exist", - [{VirtualHost, Stream, ConsumerName}]), + [{VH, S, Name}]), {undefined, []}; G0 -> %% keep track of the former active, if any - {ActPid, ActSubId} = - case lookup_active_consumer(G0) of - {value, #consumer{pid = ActivePid, - subscription_id = ActiveSubId}} -> - {ActivePid, ActiveSubId}; - _ -> - {-1, -1} - end, + ActCsr = case lookup_active_consumer(G0) of + {value, Consumer} -> + Consumer; + _ -> + undefined + end, + %% connected consumers are set to waiting status G1 = update_connected_consumers(G0, ?CONN_WAIT), case evaluate_active_consumer(G1) of undefined -> @@ -302,26 +304,23 @@ apply(#command_activate_consumer{vhost = VirtualHost, #consumer{status = {?DISCONNECTED, _}} -> %% we keep it this way, the consumer may come back {G1, []}; - #consumer{pid = Pid, subscription_id = SubId} -> - G2 = update_consumer_state_in_group(G1, Pid, - SubId, - ?CONN_ACT), + Csr -> + G2 = update_consumer_state_in_group(G1, Csr, ?CONN_ACT), %% do we need effects or not? Effects = - case {Pid, SubId} of - {ActPid, ActSubId} -> - %% it is the same active consumer as before - %% no need to notify it - []; - _ -> - %% new active consumer, need to notify it - [notify_consumer_effect(Pid, SubId, Stream, - ConsumerName, true)] - end, + case Csr of + Csr when ?SAME_CSR(Csr, ActCsr) -> + %% it is the same active consumer as before + %% no need to notify it + []; + _ -> + %% new active consumer, need to notify it + [notify_csr_effect(Csr, S, Name, true)] + end, {G2, Effects} end end, - StreamGroups1 = update_groups(VirtualHost, Stream, ConsumerName, + StreamGroups1 = update_groups(VH, S, Name, G, StreamGroups0), R = case G of undefined -> @@ -363,28 +362,30 @@ handle_group_connection_reconnected(Pid, #?MODULE{groups = Groups0} = S0, undefined -> {S0, Eff0}; Group -> - case has_forgotten_active(Group, Pid) of + case has_pdown_active(Group, Pid) of true -> - %% a forgotten active is coming in the connection + %% a presumed-down active is coming back in the connection %% we need to reconcile the group, %% as there may have been 2 active consumers at a time - handle_forgotten_active_reconnected(Pid, S0, Eff0, K); + handle_pdown_active_reconnected(Pid, S0, Eff0, K); false -> do_handle_group_connection_reconnected(Pid, S0, Eff0, K) end end. do_handle_group_connection_reconnected(Pid, #?MODULE{groups = Groups0} = S0, - Eff0, {VH, S, Name} = K) -> + Eff0, {VH, S, Name} = K) + when is_map_key(K, Groups0) -> G0 = #group{consumers = Consumers0} = lookup_group(VH, S, Name, Groups0), + %% update the status of the consumers from the connection {Consumers1, Updated} = - lists:foldr( - fun(#consumer{pid = P, status = {_, St}} = C, {L, _}) - when P == Pid -> - {[csr_status(C, {?CONNECTED, St}) | L], true}; - (C, {L, UpdatedFlag}) -> - {[C | L], UpdatedFlag or false} - end, {[], false}, Consumers0), + lists:foldr( + fun(#consumer{pid = P, status = {_, St}} = C, {L, _}) + when P == Pid -> + {[csr_status(C, {?CONNECTED, St}) | L], true}; + (C, {L, UpdatedFlag}) -> + {[C | L], UpdatedFlag or false} + end, {[], false}, Consumers0), case Updated of true -> @@ -394,60 +395,59 @@ do_handle_group_connection_reconnected(Pid, #?MODULE{groups = Groups0} = S0, {S0#?MODULE{groups = Groups1}, Eff ++ Eff0}; false -> {S0, Eff0} - end. + end; +do_handle_group_connection_reconnected(_, S0, Eff0, _) -> + {S0, Eff0}. -handle_forgotten_active_reconnected(Pid, - #?MODULE{groups = Groups0} = S0, - Eff0, {VH, S, Name}) -> +handle_pdown_active_reconnected(Pid, + #?MODULE{groups = Groups0} = S0, + Eff0, {VH, S, Name} = K) + when is_map_key(K, Groups0) -> G0 = #group{consumers = Consumers0} = lookup_group(VH, S, Name, Groups0), {Consumers1, Eff1} = case has_disconnected_active(G0) of true -> %% disconnected active consumer in the group, no rebalancing possible - %% we update the disconnected active consumers + %% we update the presumed-down active consumers %% and tell them to step down - lists:foldr(fun(#consumer{status = St, - pid = P, - subscription_id = SID} = C, {Cs, Eff}) + lists:foldr(fun(#consumer{status = St, pid = P} = C, {Cs, Eff}) when P =:= Pid andalso St =:= ?PDOWN_ACT -> {[csr_status(C, ?CONN_WAIT) | Cs], - [notify_consumer_effect(Pid, SID, S, - Name, false, true) | Eff]}; + [notify_csr_effect(C, S, + Name, false, true) | Eff]}; (C, {Cs, Eff}) -> {[C | Cs], Eff} end, {[], Eff0}, Consumers0); false -> - lists:foldr(fun(#consumer{status = St, - pid = P, - subscription_id = SID} = C, {Cs, Eff}) + lists:foldr(fun(#consumer{status = St, pid = P} = C, {Cs, Eff}) when P =:= Pid andalso St =:= ?PDOWN_ACT -> - %% update forgotten active + %% update presumed-down active %% tell it to step down {[csr_status(C, ?CONN_WAIT) | Cs], - [notify_consumer_effect(P, SID, S, - Name, false, true) | Eff]}; + [notify_csr_effect(C, S, + Name, false, true) | Eff]}; (#consumer{status = {?PDOWN, _}, pid = P} = C, {Cs, Eff}) when P =:= Pid -> - %% update forgotten + %% update presumed-down {[csr_status(C, ?CONN_WAIT) | Cs], Eff}; - (#consumer{status = ?CONN_ACT, - pid = P, - subscription_id = SID} = C, {Cs, Eff}) -> + (#consumer{status = ?CONN_ACT} = C, {Cs, Eff}) -> %% update connected active %% tell it to step down {[csr_status(C, ?CONN_WAIT) | Cs], - [notify_consumer_effect(P, SID, S, - Name, false, true) | Eff]}; + [notify_csr_effect(C, S, + Name, false, true) | Eff]}; (C, {Cs, Eff}) -> {[C | Cs], Eff} end, {[], Eff0}, Consumers0) end, G1 = G0#group{consumers = Consumers1}, Groups1 = update_groups(VH, S, Name, G1, Groups0), - {S0#?MODULE{groups = Groups1}, Eff1}. + {S0#?MODULE{groups = Groups1}, Eff1}; +handle_pdown_active_reconnected(_, S0, Eff0, _) -> + {S0, Eff0}. -has_forgotten_active(#group{consumers = Consumers}, Pid) -> +has_pdown_active(#group{consumers = Consumers}, Pid) -> case lists:search(fun(#consumer{status = ?PDOWN_ACT, pid = P}) when P =:= Pid -> true; @@ -473,24 +473,33 @@ has_consumer_with_status(#group{consumers = Consumers}, Status) -> true end. +maybe_rebalance_group(#group{partition_index = PI} = G0, _) when PI < -1 -> + %% should not happen + {G0, []}; +maybe_rebalance_group(#group{consumers = CS} = G0, _) when length(CS) == 0 -> + {G0, []}; maybe_rebalance_group(#group{partition_index = -1, consumers = Consumers0} = G0, {_VH, S, Name}) -> case lookup_active_consumer(G0) of - {value, ActiveConsumer} -> + {value, ActiveCsr} -> %% there is already an active consumer, we just re-arrange %% the group to make sure the active consumer is the first in the array - Consumers1 = lists:filter(fun(C) -> - not same_consumer(C, ActiveConsumer) + %% remove the active consumer from the list + Consumers1 = lists:filter(fun(C) when ?SAME_CSR(C, ActiveCsr) -> + false; + (_) -> + true end, Consumers0), - G1 = G0#group{consumers = [ActiveConsumer | Consumers1]}, + %% add it back to the front + G1 = G0#group{consumers = [ActiveCsr | Consumers1]}, {G1, []}; _ -> %% no active consumer G1 = compute_active_consumer(G0), case lookup_active_consumer(G1) of - {value, #consumer{pid = Pid, subscription_id = SubId}} -> + {value, Csr} -> %% creating the side effect to notify the new active consumer - {G1, [notify_consumer_effect(Pid, SubId, S, Name, true)]}; + {G1, [notify_csr_effect(Csr, S, Name, true)]}; _ -> %% no active consumer found in the group, nothing to do {G1, []} @@ -499,8 +508,7 @@ maybe_rebalance_group(#group{partition_index = -1, consumers = Consumers0} = G0, maybe_rebalance_group(#group{partition_index = _, consumers = Consumers} = G, {_VH, S, Name}) -> case lookup_active_consumer(G) of - {value, #consumer{pid = ActPid, - subscription_id = ActSubId} = CurrentActive} -> + {value, CurrentActive} -> case evaluate_active_consumer(G) of undefined -> %% no-one to select @@ -510,19 +518,12 @@ maybe_rebalance_group(#group{partition_index = _, consumers = Consumers} = G, {G, []}; _ -> %% there's a change, telling the active it's not longer active - {update_consumer_state_in_group(G, - ActPid, - ActSubId, + {update_consumer_state_in_group(G, CurrentActive, {?CONNECTED, ?DEACTIVATING}), - [notify_consumer_effect(ActPid, - ActSubId, - S, - Name, - false, - true)]} + [notify_csr_effect(CurrentActive, S, Name, false, true)]} end; false -> - %% no active consumer in the (non-empty) group, + %% no active consumer in the group, case lists:search(fun(#consumer{status = Status}) -> Status =:= {?CONNECTED, ?DEACTIVATING} end, Consumers) of @@ -532,22 +533,16 @@ maybe_rebalance_group(#group{partition_index = _, consumers = Consumers} = G, {G, []}; _ -> %% nothing going on in the group - %% a {disconnected, active} may have become {forgotten, active} + %% a {disconnected, active} may have become {pdown, active} %% we must select a new active case evaluate_active_consumer(G) of undefined -> %% no-one to select {G, []}; - #consumer{pid = ActPid, subscription_id = ActSubId} -> - {update_consumer_state_in_group(G, - ActPid, - ActSubId, + Csr -> + {update_consumer_state_in_group(G, Csr, {?CONNECTED, ?ACTIVE}), - [notify_consumer_effect(ActPid, - ActSubId, - S, - Name, - true)]} + [notify_csr_effect(Csr, S, Name, true)]} end end end. @@ -640,14 +635,14 @@ connectivity_label(Cnty) -> map(), ra_machine:effects()) -> {state(), map(), ra_machine:effects()}. -ensure_monitors(#command_register_consumer{vhost = VirtualHost, - stream = Stream, - consumer_name = ConsumerName, +ensure_monitors(#command_register_consumer{vhost = VH, + stream = S, + consumer_name = Name, connection_pid = Pid}, #?MODULE{pids_groups = PidsGroups0} = State0, Monitors0, Effects) -> - GroupId = {VirtualHost, Stream, ConsumerName}, + GroupId = {VH, S, Name}, %% get the group IDs that depend on the PID Groups0 = maps:get(Pid, PidsGroups0, #{}), %% add the group ID @@ -656,7 +651,7 @@ ensure_monitors(#command_register_consumer{vhost = VirtualHost, PidsGroups1 = PidsGroups0#{Pid => Groups1}, {State0#?MODULE{pids_groups = PidsGroups1}, Monitors0#{Pid => sac}, [{monitor, process, Pid}, {monitor, node, node(Pid)} | Effects]}; -ensure_monitors(#command_unregister_consumer{vhost = VirtualHost, +ensure_monitors(#command_unregister_consumer{vhost = VH, stream = Stream, consumer_name = ConsumerName, connection_pid = Pid}, @@ -664,11 +659,11 @@ ensure_monitors(#command_unregister_consumer{vhost = VirtualHost, pids_groups = PidsGroups0} = State0, Monitors, Effects) - when is_map_key(Pid, PidsGroups0) -> - GroupId = {VirtualHost, Stream, ConsumerName}, + when is_map_key(Pid, PidsGroups0) -> + GroupId = {VH, Stream, ConsumerName}, #{Pid := PidGroup0} = PidsGroups0, PidGroup1 = - case lookup_group(VirtualHost, Stream, ConsumerName, StreamGroups0) of + case lookup_group(VH, Stream, ConsumerName, StreamGroups0) of undefined -> %% group is gone, can be removed from the PID map maps:remove(GroupId, PidGroup0); @@ -785,95 +780,78 @@ presume_connection_down(Pid, #?MODULE{groups = Groups} = State0) -> {State1, Eff}. handle_group_connection_presumed_down(Pid, #?MODULE{groups = Groups0} = S0, - Eff0, {VH, S, Name} = K) -> - case lookup_group(VH, S, Name, Groups0) of - undefined -> - {S0, Eff0}; - #group{consumers = Consumers0} = G0 -> - {Consumers1, Updated} = - lists:foldr( - fun(#consumer{pid = P, status = {?DISCONNECTED, St}} = C, {L, _}) - when P == Pid -> - {[csr_status(C, {?PDOWN, St}) | L], true}; - (C, {L, UpdatedFlag}) -> - {[C | L], UpdatedFlag or false} - end, {[], false}, Consumers0), - - case Updated of - true -> - G1 = G0#group{consumers = Consumers1}, - {G2, Eff} = maybe_rebalance_group(G1, K), - Groups1 = update_groups(VH, S, Name, G2, Groups0), - {S0#?MODULE{groups = Groups1}, Eff ++ Eff0}; - false -> - {S0, Eff0} - end - end. + Eff0, {VH, S, Name} = K) + when is_map_key(K, Groups0) -> + #group{consumers = Consumers0} = G0 = lookup_group(VH, S, Name, Groups0), + {Consumers1, Updated} = + lists:foldr( + fun(#consumer{pid = P, status = {?DISCONNECTED, St}} = C, {L, _}) + when P == Pid -> + {[csr_status(C, {?PDOWN, St}) | L], true}; + (C, {L, UpdatedFlag}) -> + {[C | L], UpdatedFlag or false} + end, {[], false}, Consumers0), + + case Updated of + true -> + G1 = G0#group{consumers = Consumers1}, + {G2, Eff} = maybe_rebalance_group(G1, K), + Groups1 = update_groups(VH, S, Name, G2, Groups0), + {S0#?MODULE{groups = Groups1}, Eff ++ Eff0}; + false -> + {S0, Eff0} + end; +handle_group_connection_presumed_down(_, S0, Eff0, _) -> + {S0, Eff0}. handle_group_after_connection_down(Pid, {#?MODULE{groups = Groups0} = S0, Eff0}, - {VirtualHost, Stream, ConsumerName}) -> - case lookup_group(VirtualHost, - Stream, - ConsumerName, - Groups0) of - undefined -> - {S0, Eff0}; - #group{consumers = Consumers0} = G0 -> - %% remove the connection consumers from the group state - %% keep flags to know what happened - {Consumers1, ActiveRemoved, AnyRemoved} = - lists:foldl( - fun(#consumer{pid = P, status = S}, {L, ActiveFlag, _}) - when P == Pid -> - {L, is_active(S) or ActiveFlag, true}; - (C, {L, ActiveFlag, AnyFlag}) -> - {L ++ [C], ActiveFlag, AnyFlag} - end, {[], false, false}, Consumers0), - - case AnyRemoved of - true -> - G1 = G0#group{consumers = Consumers1}, - {G2, Effects} = handle_consumer_removal(G1, Stream, - ConsumerName, - ActiveRemoved), - Groups1 = update_groups(VirtualHost, - Stream, - ConsumerName, - G2, - Groups0), - {S0#?MODULE{groups = Groups1}, Effects ++ Eff0}; - false -> - {S0, Eff0} - end - end. + {VH, St, Name} = K) + when is_map_key(K, Groups0) -> + #group{consumers = Consumers0} = G0 = lookup_group(VH, St, Name, Groups0), + %% remove the connection consumers from the group state + %% keep flags to know what happened + {Consumers1, ActiveRemoved, AnyRemoved} = + lists:foldl( + fun(#consumer{pid = P, status = S}, {L, ActiveFlag, _}) + when P == Pid -> + {L, is_active(S) or ActiveFlag, true}; + (C, {L, ActiveFlag, AnyFlag}) -> + {L ++ [C], ActiveFlag, AnyFlag} + end, {[], false, false}, Consumers0), + + case AnyRemoved of + true -> + G1 = G0#group{consumers = Consumers1}, + {G2, Effects} = handle_consumer_removal(G1, St, + Name, + ActiveRemoved), + Groups1 = update_groups(VH, St, Name, G2, Groups0), + {S0#?MODULE{groups = Groups1}, Effects ++ Eff0}; + false -> + {S0, Eff0} + end; +handle_group_after_connection_down(_, {S0, Eff0}, _) -> + {S0, Eff0}. handle_group_after_connection_node_disconnected(ConnPid, #?MODULE{groups = Groups0} = S0, - {VirtualHost, Stream, ConsumerName}) -> - case lookup_group(VirtualHost, - Stream, - ConsumerName, - Groups0) of - undefined -> - S0; - #group{consumers = Cs0} = G0 -> - Cs1 = lists:foldr(fun(#consumer{status = {_, St}, - pid = Pid} = C0, - Acc) when Pid =:= ConnPid -> - C1 = csr_status(C0, {?DISCONNECTED, St}), - [C1 | Acc]; - (C, Acc) -> - [C | Acc] - end, [], Cs0), - G1 = G0#group{consumers = Cs1}, - Groups1 = update_groups(VirtualHost, - Stream, - ConsumerName, - G1, - Groups0), - S0#?MODULE{groups = Groups1} - end. + {VH, S, Name} = K) + when is_map_key(K, Groups0) -> + #group{consumers = Cs0} = G0 = lookup_group(VH, S, Name, Groups0), + Cs1 = lists:foldr(fun(#consumer{status = {_, St}, + pid = Pid} = C0, + Acc) when Pid =:= ConnPid -> + C1 = csr_status(C0, {?DISCONNECTED, St}), + [C1 | Acc]; + (C, Acc) -> + [C | Acc] + end, [], Cs0), + G1 = G0#group{consumers = Cs1}, + Groups1 = update_groups(VH, S, Name, G1, Groups0), + S0#?MODULE{groups = Groups1}; +handle_group_after_connection_node_disconnected(_, S0, _) -> + S0. -spec import_state(ra_machine:version(), map()) -> state(). import_state(4, #{<<"groups">> := Groups, <<"pids_groups">> := PidsGroups}) -> @@ -909,10 +887,13 @@ list_nodes(#?MODULE{groups = Groups}) -> ra_machine:effects(). state_enter(leader, #?MODULE{groups = Groups} = State) when ?IS_STATE_REC(State) -> + %% becoming leader, we re-issue monitors and timers for connections with + %% disconnected consumers + %% iterate over groups {Nodes, DisConns} = maps:fold(fun(_, #group{consumers = Cs}, Acc) -> - %% iterage over group consumers + %% iterate over group consumers lists:foldl(fun(#consumer{pid = P, status = {?DISCONNECTED, _}, ts = Ts}, @@ -922,7 +903,7 @@ state_enter(leader, #?MODULE{groups = Groups} = State) {Nodes#{node(P) => true}, DisConns#{P => Ts}}; (#consumer{pid = P}, {Nodes, DisConns}) -> - %% store connection node + %% store connection node only {Nodes#{node(P) => true}, DisConns} end, Acc, Cs) end, {#{}, #{}}, Groups), @@ -973,7 +954,12 @@ disconnected_timeout(_) -> map_to_groups(Groups) when is_map(Groups) -> maps:fold(fun(K, V, Acc) -> - Acc#{K => map_to_group(V)} + case map_to_group(V) of + G when ?IS_GROUP_REC(G) -> + Acc#{K => map_to_group(V)}; + _ -> + Acc + end end, #{}, Groups); map_to_groups(_) -> #{}. @@ -984,15 +970,26 @@ map_to_pids_groups(_) -> #{}. map_to_group(#{<<"consumers">> := Consumers, <<"partition_index">> := Index}) -> - C = lists:foldl(fun(V, Acc) -> - Acc ++ [map_to_consumer(V)] - end, [], Consumers), - #group{consumers = C, - partition_index = Index}. + {C, _} = + lists:foldl(fun(V, {Cs, Dedup}) -> + case map_to_consumer(V) of + #consumer{pid = P, subscription_id = SubId} = C + when not is_map_key({P, SubId}, Dedup) -> + {[C | Cs], Dedup#{{P, SubId} => true}}; + _ -> + {Cs, Dedup} + end + end, {[], #{}}, Consumers), + #group{consumers = lists:reverse(C), + partition_index = Index}; +map_to_group(_) -> + undefined. map_to_consumer(#{<<"pid">> := Pid, <<"subscription_id">> := SubId, <<"owner">> := Owner, <<"active">> := Active}) -> - csr(Pid, SubId, Owner, active_to_status(Active)). + csr(Pid, SubId, Owner, active_to_status(Active)); +map_to_consumer(_) -> + undefined. active_to_status(true) -> {?CONNECTED, ?ACTIVE}; @@ -1008,82 +1005,69 @@ is_active({_, ?DEACTIVATING}) -> is_active(_) -> false. -do_register_consumer(VirtualHost, - Stream, - -1 = _PartitionIndex, - ConsumerName, - ConnectionPid, - Owner, - SubscriptionId, - #?MODULE{groups = StreamGroups0} = State) -> - Group0 = lookup_group(VirtualHost, Stream, ConsumerName, StreamGroups0), - - Consumer = - case lookup_active_consumer(Group0) of - {value, _} -> - csr(ConnectionPid, SubscriptionId, Owner, ?CONN_WAIT); - false -> - csr(ConnectionPid, SubscriptionId, Owner, ?CONN_ACT) - end, +do_register_consumer(VH, S, -1 = _PI, Name, Pid, Owner, SubId, + #?MODULE{groups = StreamGroups0} = State) + when is_map_key({VH, S, Name}, StreamGroups0) -> + Group0 = lookup_group(VH, S, Name, StreamGroups0), + + Consumer = case lookup_active_consumer(Group0) of + {value, _} -> + csr(Pid, SubId, Owner, ?CONN_WAIT); + false -> + csr(Pid, SubId, Owner, ?CONN_ACT) + end, Group1 = add_to_group(Consumer, Group0), - StreamGroups1 = update_groups(VirtualHost, Stream, ConsumerName, + StreamGroups1 = update_groups(VH, S, Name, Group1, StreamGroups0), #consumer{status = Status} = Consumer, - Effects = - case Status of - {_, ?ACTIVE} -> - [notify_consumer_effect(ConnectionPid, SubscriptionId, - Stream, ConsumerName, is_active(Status))]; - _ -> - [] - end, + Effects = case Status of + {_, ?ACTIVE} -> + [notify_csr_effect(Consumer, S, Name, is_active(Status))]; + _ -> + [] + end, {State#?MODULE{groups = StreamGroups1}, {ok, is_active(Status)}, Effects}; -do_register_consumer(VirtualHost, - Stream, - _PartitionIndex, - ConsumerName, - ConnectionPid, - Owner, - SubscriptionId, - #?MODULE{groups = StreamGroups0} = State) -> - Group0 = lookup_group(VirtualHost, Stream, ConsumerName, StreamGroups0), +do_register_consumer(VH, S, _PI, Name, Pid, Owner, SubId, + #?MODULE{groups = StreamGroups0} = State) + when is_map_key({VH, S, Name}, StreamGroups0) -> + Group0 = lookup_group(VH, S, Name, StreamGroups0), {Group1, Effects} = case Group0 of #group{consumers = []} -> %% first consumer in the group, it's the active one - Consumer0 = csr(ConnectionPid, SubscriptionId, Owner, ?CONN_ACT), + Consumer0 = csr(Pid, SubId, Owner, ?CONN_ACT), G1 = add_to_group(Consumer0, Group0), {G1, - [notify_consumer_effect(ConnectionPid, SubscriptionId, - Stream, ConsumerName, true)]}; + [notify_csr_effect(Consumer0, S, Name, true)]}; _G -> - Consumer0 = csr(ConnectionPid, SubscriptionId, Owner, ?CONN_WAIT), + Consumer0 = csr(Pid, SubId, Owner, ?CONN_WAIT), G1 = add_to_group(Consumer0, Group0), - maybe_rebalance_group(G1, {VirtualHost, Stream, ConsumerName}) + maybe_rebalance_group(G1, {VH, S, Name}) end, - StreamGroups1 = update_groups(VirtualHost, Stream, ConsumerName, + StreamGroups1 = update_groups(VH, S, Name, Group1, StreamGroups0), - {value, #consumer{status = Status}} = - lookup_consumer(ConnectionPid, SubscriptionId, Group1), - {State#?MODULE{groups = StreamGroups1}, {ok, is_active(Status)}, Effects}. + {value, #consumer{status = Status}} = lookup_consumer(Pid, SubId, Group1), + {State#?MODULE{groups = StreamGroups1}, {ok, is_active(Status)}, Effects}; +do_register_consumer(_, _, _, _, _, _, _, State) -> + {State, {ok, false}, []}. handle_consumer_removal(#group{consumers = []} = G, _, _, _) -> {G, []}; handle_consumer_removal(#group{partition_index = -1} = Group0, - Stream, ConsumerName, ActiveRemoved) -> + S, Name, ActiveRemoved) -> case ActiveRemoved of true -> %% this is the active consumer we remove, computing the new one Group1 = compute_active_consumer(Group0), case lookup_active_consumer(Group1) of - {value, #consumer{pid = Pid, subscription_id = SubId}} -> + {value, Csr} -> %% creating the side effect to notify the new active consumer - {Group1, [notify_consumer_effect(Pid, SubId, Stream, ConsumerName, true)]}; + {Group1, [notify_csr_effect(Csr, S, Name, true)]}; _ -> %% no active consumer found in the group, nothing to do {Group1, []} @@ -1094,8 +1078,7 @@ handle_consumer_removal(#group{partition_index = -1} = Group0, end; handle_consumer_removal(Group0, Stream, ConsumerName, ActiveRemoved) -> case lookup_active_consumer(Group0) of - {value, #consumer{pid = ActPid, - subscription_id = ActSubId} = CurrentActive} -> + {value, CurrentActive} -> case evaluate_active_consumer(Group0) of undefined -> {Group0, []}; @@ -1104,12 +1087,10 @@ handle_consumer_removal(Group0, Stream, ConsumerName, ActiveRemoved) -> {Group0, []}; _ -> %% there's a change, telling the active it's not longer active - {update_consumer_state_in_group(Group0, - ActPid, - ActSubId, + {update_consumer_state_in_group(Group0, CurrentActive, {?CONNECTED, ?DEACTIVATING}), - [notify_consumer_effect(ActPid, ActSubId, - Stream, ConsumerName, false, true)]} + [notify_csr_effect(CurrentActive, + Stream, ConsumerName, false, true)]} end; false -> case ActiveRemoved of @@ -1118,11 +1099,10 @@ handle_consumer_removal(Group0, Stream, ConsumerName, ActiveRemoved) -> case evaluate_active_consumer(Group0) of undefined -> {Group0, []}; - #consumer{pid = P, subscription_id = SID} -> - {update_consumer_state_in_group(Group0, P, SID, + Csr -> + {update_consumer_state_in_group(Group0, Csr, {?CONNECTED, ?ACTIVE}), - [notify_consumer_effect(P, SID, - Stream, ConsumerName, true)]} + [notify_csr_effect(Csr, Stream, ConsumerName, true)]} end; false -> %% no active consumer in the (non-empty) group, @@ -1134,17 +1114,19 @@ handle_consumer_removal(Group0, Stream, ConsumerName, ActiveRemoved) -> notify_connection_effect(Pid) -> mod_call_effect(Pid, {sac, check_connection, #{}}). -notify_consumer_effect(Pid, SubId, Stream, Name, Active) -> - notify_consumer_effect(Pid, SubId, Stream, Name, Active, false). +notify_csr_effect(Csr, S, Name, Active) -> + notify_csr_effect(Csr, S, Name, Active, false). -notify_consumer_effect(Pid, SubId, Stream, Name, Active, false = _SteppingDown) -> - mod_call_effect(Pid, +notify_csr_effect(#consumer{pid = P, subscription_id = SubId}, + Stream, Name, Active, false = _SteppingDown) -> + mod_call_effect(P, {sac, #{subscription_id => SubId, stream => Stream, consumer_name => Name, active => Active}}); -notify_consumer_effect(Pid, SubId, Stream, Name, Active, true = SteppingDown) -> - mod_call_effect(Pid, +notify_csr_effect(#consumer{pid = P, subscription_id = SubId}, + Stream, Name, Active, true = SteppingDown) -> + mod_call_effect(P, {sac, #{subscription_id => SubId, stream => Stream, consumer_name => Name, @@ -1171,11 +1153,23 @@ lookup_group(VirtualHost, Stream, ConsumerName, StreamGroups) -> maps:get({VirtualHost, Stream, ConsumerName}, StreamGroups, undefined). -add_to_group(Consumer, #group{consumers = Consumers} = Group) -> - Group#group{consumers = Consumers ++ [Consumer]}. +add_to_group(#consumer{pid = Pid, subscription_id = SubId} = Consumer, + #group{consumers = Consumers} = Group) -> + case lookup_consumer(Pid, SubId, Group) of + {value, _} -> + %% the consumer is already in the group, nothing to do + Group; + false -> + Group#group{consumers = Consumers ++ [Consumer]} + end. -remove_from_group(Consumer, #group{consumers = Consumers} = Group) -> - Group#group{consumers = lists:delete(Consumer, Consumers)}. +remove_from_group(Csr, #group{consumers = Consumers} = Group) -> + CS = lists:filter(fun(C) when ?SAME_CSR(C, Csr) -> + false; + (_) -> + true + end, Consumers), + Group#group{consumers = CS}. has_consumers_from_pid(#group{consumers = Consumers}, Pid) -> lists:any(fun (#consumer{pid = P}) when P == Pid -> @@ -1192,19 +1186,19 @@ compute_active_consumer(#group{partition_index = -1, compute_active_consumer(#group{partition_index = -1, consumers = Consumers} = G) -> case lists:search(fun(#consumer{status = S}) -> - S =:= {?DISCONNECTED, ?ACTIVE} + S =:= ?DISCONN_ACT end, Consumers) of {value, _DisconnectedActive} -> + %% no rebalancing if there is a disconnected active G; false -> case evaluate_active_consumer(G) of undefined -> G; - #consumer{pid = Pid, subscription_id = SubId} -> + AC -> Consumers1 = lists:foldr( - fun(#consumer{pid = P, subscription_id = SID} = C, L) - when P =:= Pid andalso SID =:= SubId -> + fun(C, L) when ?SAME_CSR(AC, C) -> %% change status of new active [csr_status(C, ?CONN_ACT) | L]; (#consumer{status = {?CONNECTED, _}} = C, L) -> @@ -1226,11 +1220,15 @@ evaluate_active_consumer(#group{consumers = Consumers} = G) -> S =:= ?DISCONN_ACT end, Consumers) of {value, C} -> + %% no rebalancing if there is a disconnected active C; _ -> do_evaluate_active_consumer(G#group{consumers = eligible(Consumers)}) end. +do_evaluate_active_consumer(#group{partition_index = PI}) when PI < -1 -> + %% should not happen + undefined; do_evaluate_active_consumer(#group{consumers = Consumers}) when length(Consumers) == 0 -> undefined; @@ -1264,36 +1262,25 @@ lookup_active_consumer(#group{consumers = Consumers}) -> lists:search(fun(#consumer{status = Status}) -> is_active(Status) end, Consumers). -update_groups(_VirtualHost, - _Stream, - _ConsumerName, - undefined, - StreamGroups) -> - StreamGroups; -update_groups(VirtualHost, - Stream, - ConsumerName, - #group{consumers = []}, - StreamGroups) -> +update_groups(_VH, _S, _Name, undefined, Groups) -> + Groups; +update_groups(VH, S, Name, #group{consumers = []}, Groups) + when is_map_key({VH, S, Name}, Groups) -> %% the group is now empty, removing the key - maps:remove({VirtualHost, Stream, ConsumerName}, StreamGroups); -update_groups(VirtualHost, - Stream, - ConsumerName, - Group, - StreamGroups) -> - StreamGroups#{{VirtualHost, Stream, ConsumerName} => Group}. - -update_consumer_state_in_group(#group{consumers = Consumers0} = G, - Pid, - SubId, + maps:remove({VH, S, Name}, Groups); +update_groups(_VH, _S, _Name, #group{consumers = []}, Groups) -> + %% the group is now empty, but not in the group map + %% just returning the map + Groups; +update_groups(VH, S, Name, G, Groups) -> + Groups#{{VH, S, Name} => G}. + +update_consumer_state_in_group(#group{consumers = Consumers0} = G, Csr, NewStatus) -> - CS1 = lists:map(fun(C0) -> - case C0 of - #consumer{pid = Pid, subscription_id = SubId} -> + CS1 = lists:map(fun(C0) when ?SAME_CSR(C0, Csr) -> csr_status(C0, NewStatus); - C -> C - end + (C) -> + C end, Consumers0), G#group{consumers = CS1}. @@ -1314,12 +1301,6 @@ send_message(ConnectionPid, Msg) -> ConnectionPid ! Msg, ok. -same_consumer(#consumer{pid = Pid, subscription_id = SubId}, - #consumer{pid = Pid, subscription_id = SubId}) -> - true; -same_consumer(_, _) -> - false. - -spec compute_pid_group_dependencies(groups()) -> pids_groups(). compute_pid_group_dependencies(Groups) -> maps:fold(fun(K, #group{consumers = Cs}, Acc) -> diff --git a/deps/rabbit/test/rabbit_stream_sac_coordinator_SUITE.erl b/deps/rabbit/test/rabbit_stream_sac_coordinator_SUITE.erl index 800ddb656ab6..f7c6add833fa 100644 --- a/deps/rabbit/test/rabbit_stream_sac_coordinator_SUITE.erl +++ b/deps/rabbit/test/rabbit_stream_sac_coordinator_SUITE.erl @@ -562,10 +562,15 @@ import_state_v4_test(_) -> OldState5 = apply_ensure_monitors(OldMod, Cmd4, OldState4), Cmd5 = register_consumer_command(P, 1, App1, Pid2, 2), OldState6 = apply_ensure_monitors(OldMod, Cmd5, OldState5), - Cmd6 = activate_consumer_command(P, App1), + %% a duplicate consumer sneaks in + %% this should not happen in real life, but it tests the dedup + %% logic in the import function + Cmd6 = register_consumer_command(P, 1, App1, Pid0, 0), OldState7 = apply_ensure_monitors(OldMod, Cmd6, OldState6), + Cmd7 = activate_consumer_command(P, App1), + OldState8 = apply_ensure_monitors(OldMod, Cmd7, OldState7), - Export = OldMod:state_to_map(OldState7), + Export = OldMod:state_to_map(OldState8), #?STATE{groups = Groups, pids_groups = PidsGroups} = ?MOD:import_state(4, Export), assertHasGroup({<<"/">>, S, App0}, grp(-1, [csr(Pid0, 0, active),