diff --git a/deps/rabbit/src/rabbit_stream_coordinator.erl b/deps/rabbit/src/rabbit_stream_coordinator.erl index f7d26d014ba6..f910a1880337 100644 --- a/deps/rabbit/src/rabbit_stream_coordinator.erl +++ b/deps/rabbit/src/rabbit_stream_coordinator.erl @@ -710,8 +710,7 @@ apply(#{machine_version := Vsn} = Meta, _ -> return(Meta, State0, stream_not_found, []) end; -apply(#{machine_version := Vsn} = Meta, - {nodeup, Node} = Cmd, +apply(Meta, {nodeup, Node} = Cmd, #?MODULE{monitors = Monitors0, streams = Streams0, single_active_consumer = Sac0} = State) -> @@ -735,14 +734,8 @@ apply(#{machine_version := Vsn} = Meta, {Ss#{Id => S}, E} end, {Streams0, Effects0}, Streams0), - {Sac1, Effects2} = case ?V5_OR_MORE(Vsn) of - true -> - SacMod = sac_module(Meta), - SacMod:handle_node_reconnected(Node, - Sac0, Effects1); - false -> - {Sac0, Effects1} - end, + + {Sac1, Effects2} = sac_handle_node_reconnected(Meta, Node, Sac0, Effects1), return(Meta, State#?MODULE{monitors = Monitors, streams = Streams, single_active_consumer = Sac1}, ok, Effects2); @@ -2444,6 +2437,17 @@ sac_handle_connection_down(SacState, Pid, Reason, Vsn) when ?V5_OR_MORE(Vsn) -> sac_handle_connection_down(SacState, Pid, _Reason, _Vsn) -> ?SAC_V4:handle_connection_down(Pid, SacState). +sac_handle_node_reconnected(#{machine_version := Vsn} = Meta, Node, + Sac, Effects) -> + case ?V5_OR_MORE(Vsn) of + true -> + SacMod = sac_module(Meta), + SacMod:handle_node_reconnected(Node, + Sac, Effects); + false -> + {Sac, Effects} + end. + sac_make_purge_nodes(Nodes) -> rabbit_stream_sac_coordinator:make_purge_nodes(Nodes). diff --git a/deps/rabbit/src/rabbit_stream_sac_coordinator.erl b/deps/rabbit/src/rabbit_stream_sac_coordinator.erl index 00b7fb5dde3e..5f9ceec14449 100644 --- a/deps/rabbit/src/rabbit_stream_sac_coordinator.erl +++ b/deps/rabbit/src/rabbit_stream_sac_coordinator.erl @@ -83,6 +83,11 @@ -define(DISCONNECTED_TIMEOUT_MS, 60_000). -define(SAC_ERRORS, [partition_index_conflict, not_found]). -define(IS_STATE_REC(T), is_record(T, ?MODULE)). +-define(IS_GROUP_REC(T), is_record(T, group)). +-define(SAME_CSR(C1, C2), + (is_record(C1, consumer) andalso is_record(C2, consumer) andalso + C1#consumer.pid =:= C2#consumer.pid andalso + C1#consumer.subscription_id =:= C2#consumer.subscription_id)). %% Single Active Consumer API -spec register_consumer(binary(), @@ -132,6 +137,7 @@ activate_consumer(VH, Stream, Name) -> stream = Stream, consumer_name= Name}). +%% called by a stream connection to inform it is still alive -spec connection_reconnected(connection_pid()) -> ok | {error, sac_error() | term()}. connection_reconnected(Pid) -> @@ -228,10 +234,10 @@ apply(#command_register_consumer{vhost = VirtualHost, subscription_id = SubscriptionId}, #?MODULE{groups = StreamGroups0} = State) -> case maybe_create_group(VirtualHost, - Stream, - PartitionIndex, - ConsumerName, - StreamGroups0) of + Stream, + PartitionIndex, + ConsumerName, + StreamGroups0) of {ok, StreamGroups1} -> do_register_consumer(VirtualHost, Stream, @@ -256,8 +262,7 @@ apply(#command_unregister_consumer{vhost = VirtualHost, {State0, []}; Group0 -> {Group1, Effects} = - case lookup_consumer(ConnectionPid, SubscriptionId, Group0) - of + case lookup_consumer(ConnectionPid, SubscriptionId, Group0) of {value, Consumer} -> G1 = remove_from_group(Consumer, Group0), handle_consumer_removal( @@ -274,27 +279,24 @@ apply(#command_unregister_consumer{vhost = VirtualHost, {State0#?MODULE{groups = SGS}, Effects} end, {State1, ok, Effects1}; -apply(#command_activate_consumer{vhost = VirtualHost, - stream = Stream, - consumer_name = ConsumerName}, +apply(#command_activate_consumer{vhost = VH, stream = S, consumer_name = Name}, #?MODULE{groups = StreamGroups0} = State0) -> {G, Eff} = - case lookup_group(VirtualHost, Stream, ConsumerName, StreamGroups0) of + case lookup_group(VH, S, Name, StreamGroups0) of undefined -> rabbit_log:warning("Trying to activate consumer in group ~tp, but " "the group does not longer exist", - [{VirtualHost, Stream, ConsumerName}]), + [{VH, S, Name}]), {undefined, []}; G0 -> %% keep track of the former active, if any - {ActPid, ActSubId} = - case lookup_active_consumer(G0) of - {value, #consumer{pid = ActivePid, - subscription_id = ActiveSubId}} -> - {ActivePid, ActiveSubId}; - _ -> - {-1, -1} - end, + ActCsr = case lookup_active_consumer(G0) of + {value, Consumer} -> + Consumer; + _ -> + undefined + end, + %% connected consumers are set to waiting status G1 = update_connected_consumers(G0, ?CONN_WAIT), case evaluate_active_consumer(G1) of undefined -> @@ -302,26 +304,23 @@ apply(#command_activate_consumer{vhost = VirtualHost, #consumer{status = {?DISCONNECTED, _}} -> %% we keep it this way, the consumer may come back {G1, []}; - #consumer{pid = Pid, subscription_id = SubId} -> - G2 = update_consumer_state_in_group(G1, Pid, - SubId, - ?CONN_ACT), + Csr -> + G2 = update_consumer_state_in_group(G1, Csr, ?CONN_ACT), %% do we need effects or not? Effects = - case {Pid, SubId} of - {ActPid, ActSubId} -> - %% it is the same active consumer as before - %% no need to notify it - []; - _ -> - %% new active consumer, need to notify it - [notify_consumer_effect(Pid, SubId, Stream, - ConsumerName, true)] - end, + case Csr of + Csr when ?SAME_CSR(Csr, ActCsr) -> + %% it is the same active consumer as before + %% no need to notify it + []; + _ -> + %% new active consumer, need to notify it + [notify_csr_effect(Csr, S, Name, true)] + end, {G2, Effects} end end, - StreamGroups1 = update_groups(VirtualHost, Stream, ConsumerName, + StreamGroups1 = update_groups(VH, S, Name, G, StreamGroups0), R = case G of undefined -> @@ -363,28 +362,30 @@ handle_group_connection_reconnected(Pid, #?MODULE{groups = Groups0} = S0, undefined -> {S0, Eff0}; Group -> - case has_forgotten_active(Group, Pid) of + case has_pdown_active(Group, Pid) of true -> - %% a forgotten active is coming in the connection + %% a presumed-down active is coming back in the connection %% we need to reconcile the group, %% as there may have been 2 active consumers at a time - handle_forgotten_active_reconnected(Pid, S0, Eff0, K); + handle_pdown_active_reconnected(Pid, S0, Eff0, K); false -> do_handle_group_connection_reconnected(Pid, S0, Eff0, K) end end. do_handle_group_connection_reconnected(Pid, #?MODULE{groups = Groups0} = S0, - Eff0, {VH, S, Name} = K) -> + Eff0, {VH, S, Name} = K) + when is_map_key(K, Groups0) -> G0 = #group{consumers = Consumers0} = lookup_group(VH, S, Name, Groups0), + %% update the status of the consumers from the connection {Consumers1, Updated} = - lists:foldr( - fun(#consumer{pid = P, status = {_, St}} = C, {L, _}) - when P == Pid -> - {[csr_status(C, {?CONNECTED, St}) | L], true}; - (C, {L, UpdatedFlag}) -> - {[C | L], UpdatedFlag or false} - end, {[], false}, Consumers0), + lists:foldr( + fun(#consumer{pid = P, status = {_, St}} = C, {L, _}) + when P == Pid -> + {[csr_status(C, {?CONNECTED, St}) | L], true}; + (C, {L, UpdatedFlag}) -> + {[C | L], UpdatedFlag or false} + end, {[], false}, Consumers0), case Updated of true -> @@ -394,60 +395,59 @@ do_handle_group_connection_reconnected(Pid, #?MODULE{groups = Groups0} = S0, {S0#?MODULE{groups = Groups1}, Eff ++ Eff0}; false -> {S0, Eff0} - end. + end; +do_handle_group_connection_reconnected(_, S0, Eff0, _) -> + {S0, Eff0}. -handle_forgotten_active_reconnected(Pid, - #?MODULE{groups = Groups0} = S0, - Eff0, {VH, S, Name}) -> +handle_pdown_active_reconnected(Pid, + #?MODULE{groups = Groups0} = S0, + Eff0, {VH, S, Name} = K) + when is_map_key(K, Groups0) -> G0 = #group{consumers = Consumers0} = lookup_group(VH, S, Name, Groups0), {Consumers1, Eff1} = case has_disconnected_active(G0) of true -> %% disconnected active consumer in the group, no rebalancing possible - %% we update the disconnected active consumers + %% we update the presumed-down active consumers %% and tell them to step down - lists:foldr(fun(#consumer{status = St, - pid = P, - subscription_id = SID} = C, {Cs, Eff}) + lists:foldr(fun(#consumer{status = St, pid = P} = C, {Cs, Eff}) when P =:= Pid andalso St =:= ?PDOWN_ACT -> {[csr_status(C, ?CONN_WAIT) | Cs], - [notify_consumer_effect(Pid, SID, S, - Name, false, true) | Eff]}; + [notify_csr_effect(C, S, + Name, false, true) | Eff]}; (C, {Cs, Eff}) -> {[C | Cs], Eff} end, {[], Eff0}, Consumers0); false -> - lists:foldr(fun(#consumer{status = St, - pid = P, - subscription_id = SID} = C, {Cs, Eff}) + lists:foldr(fun(#consumer{status = St, pid = P} = C, {Cs, Eff}) when P =:= Pid andalso St =:= ?PDOWN_ACT -> - %% update forgotten active + %% update presumed-down active %% tell it to step down {[csr_status(C, ?CONN_WAIT) | Cs], - [notify_consumer_effect(P, SID, S, - Name, false, true) | Eff]}; + [notify_csr_effect(C, S, + Name, false, true) | Eff]}; (#consumer{status = {?PDOWN, _}, pid = P} = C, {Cs, Eff}) when P =:= Pid -> - %% update forgotten + %% update presumed-down {[csr_status(C, ?CONN_WAIT) | Cs], Eff}; - (#consumer{status = ?CONN_ACT, - pid = P, - subscription_id = SID} = C, {Cs, Eff}) -> + (#consumer{status = ?CONN_ACT} = C, {Cs, Eff}) -> %% update connected active %% tell it to step down {[csr_status(C, ?CONN_WAIT) | Cs], - [notify_consumer_effect(P, SID, S, - Name, false, true) | Eff]}; + [notify_csr_effect(C, S, + Name, false, true) | Eff]}; (C, {Cs, Eff}) -> {[C | Cs], Eff} end, {[], Eff0}, Consumers0) end, G1 = G0#group{consumers = Consumers1}, Groups1 = update_groups(VH, S, Name, G1, Groups0), - {S0#?MODULE{groups = Groups1}, Eff1}. + {S0#?MODULE{groups = Groups1}, Eff1}; +handle_pdown_active_reconnected(_, S0, Eff0, _) -> + {S0, Eff0}. -has_forgotten_active(#group{consumers = Consumers}, Pid) -> +has_pdown_active(#group{consumers = Consumers}, Pid) -> case lists:search(fun(#consumer{status = ?PDOWN_ACT, pid = P}) when P =:= Pid -> true; @@ -473,24 +473,33 @@ has_consumer_with_status(#group{consumers = Consumers}, Status) -> true end. +maybe_rebalance_group(#group{partition_index = PI} = G0, _) when PI < -1 -> + %% should not happen + {G0, []}; +maybe_rebalance_group(#group{consumers = CS} = G0, _) when length(CS) == 0 -> + {G0, []}; maybe_rebalance_group(#group{partition_index = -1, consumers = Consumers0} = G0, {_VH, S, Name}) -> case lookup_active_consumer(G0) of - {value, ActiveConsumer} -> + {value, ActiveCsr} -> %% there is already an active consumer, we just re-arrange %% the group to make sure the active consumer is the first in the array - Consumers1 = lists:filter(fun(C) -> - not same_consumer(C, ActiveConsumer) + %% remove the active consumer from the list + Consumers1 = lists:filter(fun(C) when ?SAME_CSR(C, ActiveCsr) -> + false; + (_) -> + true end, Consumers0), - G1 = G0#group{consumers = [ActiveConsumer | Consumers1]}, + %% add it back to the front + G1 = G0#group{consumers = [ActiveCsr | Consumers1]}, {G1, []}; _ -> %% no active consumer G1 = compute_active_consumer(G0), case lookup_active_consumer(G1) of - {value, #consumer{pid = Pid, subscription_id = SubId}} -> + {value, Csr} -> %% creating the side effect to notify the new active consumer - {G1, [notify_consumer_effect(Pid, SubId, S, Name, true)]}; + {G1, [notify_csr_effect(Csr, S, Name, true)]}; _ -> %% no active consumer found in the group, nothing to do {G1, []} @@ -499,8 +508,7 @@ maybe_rebalance_group(#group{partition_index = -1, consumers = Consumers0} = G0, maybe_rebalance_group(#group{partition_index = _, consumers = Consumers} = G, {_VH, S, Name}) -> case lookup_active_consumer(G) of - {value, #consumer{pid = ActPid, - subscription_id = ActSubId} = CurrentActive} -> + {value, CurrentActive} -> case evaluate_active_consumer(G) of undefined -> %% no-one to select @@ -510,19 +518,12 @@ maybe_rebalance_group(#group{partition_index = _, consumers = Consumers} = G, {G, []}; _ -> %% there's a change, telling the active it's not longer active - {update_consumer_state_in_group(G, - ActPid, - ActSubId, + {update_consumer_state_in_group(G, CurrentActive, {?CONNECTED, ?DEACTIVATING}), - [notify_consumer_effect(ActPid, - ActSubId, - S, - Name, - false, - true)]} + [notify_csr_effect(CurrentActive, S, Name, false, true)]} end; false -> - %% no active consumer in the (non-empty) group, + %% no active consumer in the group, case lists:search(fun(#consumer{status = Status}) -> Status =:= {?CONNECTED, ?DEACTIVATING} end, Consumers) of @@ -532,22 +533,16 @@ maybe_rebalance_group(#group{partition_index = _, consumers = Consumers} = G, {G, []}; _ -> %% nothing going on in the group - %% a {disconnected, active} may have become {forgotten, active} + %% a {disconnected, active} may have become {pdown, active} %% we must select a new active case evaluate_active_consumer(G) of undefined -> %% no-one to select {G, []}; - #consumer{pid = ActPid, subscription_id = ActSubId} -> - {update_consumer_state_in_group(G, - ActPid, - ActSubId, + Csr -> + {update_consumer_state_in_group(G, Csr, {?CONNECTED, ?ACTIVE}), - [notify_consumer_effect(ActPid, - ActSubId, - S, - Name, - true)]} + [notify_csr_effect(Csr, S, Name, true)]} end end end. @@ -640,14 +635,14 @@ connectivity_label(Cnty) -> map(), ra_machine:effects()) -> {state(), map(), ra_machine:effects()}. -ensure_monitors(#command_register_consumer{vhost = VirtualHost, - stream = Stream, - consumer_name = ConsumerName, +ensure_monitors(#command_register_consumer{vhost = VH, + stream = S, + consumer_name = Name, connection_pid = Pid}, #?MODULE{pids_groups = PidsGroups0} = State0, Monitors0, Effects) -> - GroupId = {VirtualHost, Stream, ConsumerName}, + GroupId = {VH, S, Name}, %% get the group IDs that depend on the PID Groups0 = maps:get(Pid, PidsGroups0, #{}), %% add the group ID @@ -656,7 +651,7 @@ ensure_monitors(#command_register_consumer{vhost = VirtualHost, PidsGroups1 = PidsGroups0#{Pid => Groups1}, {State0#?MODULE{pids_groups = PidsGroups1}, Monitors0#{Pid => sac}, [{monitor, process, Pid}, {monitor, node, node(Pid)} | Effects]}; -ensure_monitors(#command_unregister_consumer{vhost = VirtualHost, +ensure_monitors(#command_unregister_consumer{vhost = VH, stream = Stream, consumer_name = ConsumerName, connection_pid = Pid}, @@ -664,11 +659,11 @@ ensure_monitors(#command_unregister_consumer{vhost = VirtualHost, pids_groups = PidsGroups0} = State0, Monitors, Effects) - when is_map_key(Pid, PidsGroups0) -> - GroupId = {VirtualHost, Stream, ConsumerName}, + when is_map_key(Pid, PidsGroups0) -> + GroupId = {VH, Stream, ConsumerName}, #{Pid := PidGroup0} = PidsGroups0, PidGroup1 = - case lookup_group(VirtualHost, Stream, ConsumerName, StreamGroups0) of + case lookup_group(VH, Stream, ConsumerName, StreamGroups0) of undefined -> %% group is gone, can be removed from the PID map maps:remove(GroupId, PidGroup0); @@ -785,95 +780,78 @@ presume_connection_down(Pid, #?MODULE{groups = Groups} = State0) -> {State1, Eff}. handle_group_connection_presumed_down(Pid, #?MODULE{groups = Groups0} = S0, - Eff0, {VH, S, Name} = K) -> - case lookup_group(VH, S, Name, Groups0) of - undefined -> - {S0, Eff0}; - #group{consumers = Consumers0} = G0 -> - {Consumers1, Updated} = - lists:foldr( - fun(#consumer{pid = P, status = {?DISCONNECTED, St}} = C, {L, _}) - when P == Pid -> - {[csr_status(C, {?PDOWN, St}) | L], true}; - (C, {L, UpdatedFlag}) -> - {[C | L], UpdatedFlag or false} - end, {[], false}, Consumers0), - - case Updated of - true -> - G1 = G0#group{consumers = Consumers1}, - {G2, Eff} = maybe_rebalance_group(G1, K), - Groups1 = update_groups(VH, S, Name, G2, Groups0), - {S0#?MODULE{groups = Groups1}, Eff ++ Eff0}; - false -> - {S0, Eff0} - end - end. + Eff0, {VH, S, Name} = K) + when is_map_key(K, Groups0) -> + #group{consumers = Consumers0} = G0 = lookup_group(VH, S, Name, Groups0), + {Consumers1, Updated} = + lists:foldr( + fun(#consumer{pid = P, status = {?DISCONNECTED, St}} = C, {L, _}) + when P == Pid -> + {[csr_status(C, {?PDOWN, St}) | L], true}; + (C, {L, UpdatedFlag}) -> + {[C | L], UpdatedFlag or false} + end, {[], false}, Consumers0), + + case Updated of + true -> + G1 = G0#group{consumers = Consumers1}, + {G2, Eff} = maybe_rebalance_group(G1, K), + Groups1 = update_groups(VH, S, Name, G2, Groups0), + {S0#?MODULE{groups = Groups1}, Eff ++ Eff0}; + false -> + {S0, Eff0} + end; +handle_group_connection_presumed_down(_, S0, Eff0, _) -> + {S0, Eff0}. handle_group_after_connection_down(Pid, {#?MODULE{groups = Groups0} = S0, Eff0}, - {VirtualHost, Stream, ConsumerName}) -> - case lookup_group(VirtualHost, - Stream, - ConsumerName, - Groups0) of - undefined -> - {S0, Eff0}; - #group{consumers = Consumers0} = G0 -> - %% remove the connection consumers from the group state - %% keep flags to know what happened - {Consumers1, ActiveRemoved, AnyRemoved} = - lists:foldl( - fun(#consumer{pid = P, status = S}, {L, ActiveFlag, _}) - when P == Pid -> - {L, is_active(S) or ActiveFlag, true}; - (C, {L, ActiveFlag, AnyFlag}) -> - {L ++ [C], ActiveFlag, AnyFlag} - end, {[], false, false}, Consumers0), - - case AnyRemoved of - true -> - G1 = G0#group{consumers = Consumers1}, - {G2, Effects} = handle_consumer_removal(G1, Stream, - ConsumerName, - ActiveRemoved), - Groups1 = update_groups(VirtualHost, - Stream, - ConsumerName, - G2, - Groups0), - {S0#?MODULE{groups = Groups1}, Effects ++ Eff0}; - false -> - {S0, Eff0} - end - end. + {VH, St, Name} = K) + when is_map_key(K, Groups0) -> + #group{consumers = Consumers0} = G0 = lookup_group(VH, St, Name, Groups0), + %% remove the connection consumers from the group state + %% keep flags to know what happened + {Consumers1, ActiveRemoved, AnyRemoved} = + lists:foldl( + fun(#consumer{pid = P, status = S}, {L, ActiveFlag, _}) + when P == Pid -> + {L, is_active(S) or ActiveFlag, true}; + (C, {L, ActiveFlag, AnyFlag}) -> + {L ++ [C], ActiveFlag, AnyFlag} + end, {[], false, false}, Consumers0), + + case AnyRemoved of + true -> + G1 = G0#group{consumers = Consumers1}, + {G2, Effects} = handle_consumer_removal(G1, St, + Name, + ActiveRemoved), + Groups1 = update_groups(VH, St, Name, G2, Groups0), + {S0#?MODULE{groups = Groups1}, Effects ++ Eff0}; + false -> + {S0, Eff0} + end; +handle_group_after_connection_down(_, {S0, Eff0}, _) -> + {S0, Eff0}. handle_group_after_connection_node_disconnected(ConnPid, #?MODULE{groups = Groups0} = S0, - {VirtualHost, Stream, ConsumerName}) -> - case lookup_group(VirtualHost, - Stream, - ConsumerName, - Groups0) of - undefined -> - S0; - #group{consumers = Cs0} = G0 -> - Cs1 = lists:foldr(fun(#consumer{status = {_, St}, - pid = Pid} = C0, - Acc) when Pid =:= ConnPid -> - C1 = csr_status(C0, {?DISCONNECTED, St}), - [C1 | Acc]; - (C, Acc) -> - [C | Acc] - end, [], Cs0), - G1 = G0#group{consumers = Cs1}, - Groups1 = update_groups(VirtualHost, - Stream, - ConsumerName, - G1, - Groups0), - S0#?MODULE{groups = Groups1} - end. + {VH, S, Name} = K) + when is_map_key(K, Groups0) -> + #group{consumers = Cs0} = G0 = lookup_group(VH, S, Name, Groups0), + Cs1 = lists:foldr(fun(#consumer{status = {_, St}, + pid = Pid} = C0, + Acc) when Pid =:= ConnPid -> + C1 = csr_status(C0, {?DISCONNECTED, St}), + [C1 | Acc]; + (C, Acc) -> + [C | Acc] + end, [], Cs0), + G1 = G0#group{consumers = Cs1}, + Groups1 = update_groups(VH, S, Name, G1, Groups0), + S0#?MODULE{groups = Groups1}; +handle_group_after_connection_node_disconnected(_, S0, _) -> + S0. -spec import_state(ra_machine:version(), map()) -> state(). import_state(4, #{<<"groups">> := Groups, <<"pids_groups">> := PidsGroups}) -> @@ -909,10 +887,13 @@ list_nodes(#?MODULE{groups = Groups}) -> ra_machine:effects(). state_enter(leader, #?MODULE{groups = Groups} = State) when ?IS_STATE_REC(State) -> + %% becoming leader, we re-issue monitors and timers for connections with + %% disconnected consumers + %% iterate over groups {Nodes, DisConns} = maps:fold(fun(_, #group{consumers = Cs}, Acc) -> - %% iterage over group consumers + %% iterate over group consumers lists:foldl(fun(#consumer{pid = P, status = {?DISCONNECTED, _}, ts = Ts}, @@ -922,7 +903,7 @@ state_enter(leader, #?MODULE{groups = Groups} = State) {Nodes#{node(P) => true}, DisConns#{P => Ts}}; (#consumer{pid = P}, {Nodes, DisConns}) -> - %% store connection node + %% store connection node only {Nodes#{node(P) => true}, DisConns} end, Acc, Cs) end, {#{}, #{}}, Groups), @@ -973,7 +954,12 @@ disconnected_timeout(_) -> map_to_groups(Groups) when is_map(Groups) -> maps:fold(fun(K, V, Acc) -> - Acc#{K => map_to_group(V)} + case map_to_group(V) of + G when ?IS_GROUP_REC(G) -> + Acc#{K => map_to_group(V)}; + _ -> + Acc + end end, #{}, Groups); map_to_groups(_) -> #{}. @@ -984,15 +970,26 @@ map_to_pids_groups(_) -> #{}. map_to_group(#{<<"consumers">> := Consumers, <<"partition_index">> := Index}) -> - C = lists:foldl(fun(V, Acc) -> - Acc ++ [map_to_consumer(V)] - end, [], Consumers), - #group{consumers = C, - partition_index = Index}. + {C, _} = + lists:foldl(fun(V, {Cs, Dedup}) -> + case map_to_consumer(V) of + #consumer{pid = P, subscription_id = SubId} = C + when not is_map_key({P, SubId}, Dedup) -> + {[C | Cs], Dedup#{{P, SubId} => true}}; + _ -> + {Cs, Dedup} + end + end, {[], #{}}, Consumers), + #group{consumers = lists:reverse(C), + partition_index = Index}; +map_to_group(_) -> + undefined. map_to_consumer(#{<<"pid">> := Pid, <<"subscription_id">> := SubId, <<"owner">> := Owner, <<"active">> := Active}) -> - csr(Pid, SubId, Owner, active_to_status(Active)). + csr(Pid, SubId, Owner, active_to_status(Active)); +map_to_consumer(_) -> + undefined. active_to_status(true) -> {?CONNECTED, ?ACTIVE}; @@ -1008,82 +1005,69 @@ is_active({_, ?DEACTIVATING}) -> is_active(_) -> false. -do_register_consumer(VirtualHost, - Stream, - -1 = _PartitionIndex, - ConsumerName, - ConnectionPid, - Owner, - SubscriptionId, - #?MODULE{groups = StreamGroups0} = State) -> - Group0 = lookup_group(VirtualHost, Stream, ConsumerName, StreamGroups0), - - Consumer = - case lookup_active_consumer(Group0) of - {value, _} -> - csr(ConnectionPid, SubscriptionId, Owner, ?CONN_WAIT); - false -> - csr(ConnectionPid, SubscriptionId, Owner, ?CONN_ACT) - end, +do_register_consumer(VH, S, -1 = _PI, Name, Pid, Owner, SubId, + #?MODULE{groups = StreamGroups0} = State) + when is_map_key({VH, S, Name}, StreamGroups0) -> + Group0 = lookup_group(VH, S, Name, StreamGroups0), + + Consumer = case lookup_active_consumer(Group0) of + {value, _} -> + csr(Pid, SubId, Owner, ?CONN_WAIT); + false -> + csr(Pid, SubId, Owner, ?CONN_ACT) + end, Group1 = add_to_group(Consumer, Group0), - StreamGroups1 = update_groups(VirtualHost, Stream, ConsumerName, + StreamGroups1 = update_groups(VH, S, Name, Group1, StreamGroups0), #consumer{status = Status} = Consumer, - Effects = - case Status of - {_, ?ACTIVE} -> - [notify_consumer_effect(ConnectionPid, SubscriptionId, - Stream, ConsumerName, is_active(Status))]; - _ -> - [] - end, + Effects = case Status of + {_, ?ACTIVE} -> + [notify_csr_effect(Consumer, S, Name, is_active(Status))]; + _ -> + [] + end, {State#?MODULE{groups = StreamGroups1}, {ok, is_active(Status)}, Effects}; -do_register_consumer(VirtualHost, - Stream, - _PartitionIndex, - ConsumerName, - ConnectionPid, - Owner, - SubscriptionId, - #?MODULE{groups = StreamGroups0} = State) -> - Group0 = lookup_group(VirtualHost, Stream, ConsumerName, StreamGroups0), +do_register_consumer(VH, S, _PI, Name, Pid, Owner, SubId, + #?MODULE{groups = StreamGroups0} = State) + when is_map_key({VH, S, Name}, StreamGroups0) -> + Group0 = lookup_group(VH, S, Name, StreamGroups0), {Group1, Effects} = case Group0 of #group{consumers = []} -> %% first consumer in the group, it's the active one - Consumer0 = csr(ConnectionPid, SubscriptionId, Owner, ?CONN_ACT), + Consumer0 = csr(Pid, SubId, Owner, ?CONN_ACT), G1 = add_to_group(Consumer0, Group0), {G1, - [notify_consumer_effect(ConnectionPid, SubscriptionId, - Stream, ConsumerName, true)]}; + [notify_csr_effect(Consumer0, S, Name, true)]}; _G -> - Consumer0 = csr(ConnectionPid, SubscriptionId, Owner, ?CONN_WAIT), + Consumer0 = csr(Pid, SubId, Owner, ?CONN_WAIT), G1 = add_to_group(Consumer0, Group0), - maybe_rebalance_group(G1, {VirtualHost, Stream, ConsumerName}) + maybe_rebalance_group(G1, {VH, S, Name}) end, - StreamGroups1 = update_groups(VirtualHost, Stream, ConsumerName, + StreamGroups1 = update_groups(VH, S, Name, Group1, StreamGroups0), - {value, #consumer{status = Status}} = - lookup_consumer(ConnectionPid, SubscriptionId, Group1), - {State#?MODULE{groups = StreamGroups1}, {ok, is_active(Status)}, Effects}. + {value, #consumer{status = Status}} = lookup_consumer(Pid, SubId, Group1), + {State#?MODULE{groups = StreamGroups1}, {ok, is_active(Status)}, Effects}; +do_register_consumer(_, _, _, _, _, _, _, State) -> + {State, {ok, false}, []}. handle_consumer_removal(#group{consumers = []} = G, _, _, _) -> {G, []}; handle_consumer_removal(#group{partition_index = -1} = Group0, - Stream, ConsumerName, ActiveRemoved) -> + S, Name, ActiveRemoved) -> case ActiveRemoved of true -> %% this is the active consumer we remove, computing the new one Group1 = compute_active_consumer(Group0), case lookup_active_consumer(Group1) of - {value, #consumer{pid = Pid, subscription_id = SubId}} -> + {value, Csr} -> %% creating the side effect to notify the new active consumer - {Group1, [notify_consumer_effect(Pid, SubId, Stream, ConsumerName, true)]}; + {Group1, [notify_csr_effect(Csr, S, Name, true)]}; _ -> %% no active consumer found in the group, nothing to do {Group1, []} @@ -1094,8 +1078,7 @@ handle_consumer_removal(#group{partition_index = -1} = Group0, end; handle_consumer_removal(Group0, Stream, ConsumerName, ActiveRemoved) -> case lookup_active_consumer(Group0) of - {value, #consumer{pid = ActPid, - subscription_id = ActSubId} = CurrentActive} -> + {value, CurrentActive} -> case evaluate_active_consumer(Group0) of undefined -> {Group0, []}; @@ -1104,12 +1087,10 @@ handle_consumer_removal(Group0, Stream, ConsumerName, ActiveRemoved) -> {Group0, []}; _ -> %% there's a change, telling the active it's not longer active - {update_consumer_state_in_group(Group0, - ActPid, - ActSubId, + {update_consumer_state_in_group(Group0, CurrentActive, {?CONNECTED, ?DEACTIVATING}), - [notify_consumer_effect(ActPid, ActSubId, - Stream, ConsumerName, false, true)]} + [notify_csr_effect(CurrentActive, + Stream, ConsumerName, false, true)]} end; false -> case ActiveRemoved of @@ -1118,11 +1099,10 @@ handle_consumer_removal(Group0, Stream, ConsumerName, ActiveRemoved) -> case evaluate_active_consumer(Group0) of undefined -> {Group0, []}; - #consumer{pid = P, subscription_id = SID} -> - {update_consumer_state_in_group(Group0, P, SID, + Csr -> + {update_consumer_state_in_group(Group0, Csr, {?CONNECTED, ?ACTIVE}), - [notify_consumer_effect(P, SID, - Stream, ConsumerName, true)]} + [notify_csr_effect(Csr, Stream, ConsumerName, true)]} end; false -> %% no active consumer in the (non-empty) group, @@ -1134,17 +1114,19 @@ handle_consumer_removal(Group0, Stream, ConsumerName, ActiveRemoved) -> notify_connection_effect(Pid) -> mod_call_effect(Pid, {sac, check_connection, #{}}). -notify_consumer_effect(Pid, SubId, Stream, Name, Active) -> - notify_consumer_effect(Pid, SubId, Stream, Name, Active, false). +notify_csr_effect(Csr, S, Name, Active) -> + notify_csr_effect(Csr, S, Name, Active, false). -notify_consumer_effect(Pid, SubId, Stream, Name, Active, false = _SteppingDown) -> - mod_call_effect(Pid, +notify_csr_effect(#consumer{pid = P, subscription_id = SubId}, + Stream, Name, Active, false = _SteppingDown) -> + mod_call_effect(P, {sac, #{subscription_id => SubId, stream => Stream, consumer_name => Name, active => Active}}); -notify_consumer_effect(Pid, SubId, Stream, Name, Active, true = SteppingDown) -> - mod_call_effect(Pid, +notify_csr_effect(#consumer{pid = P, subscription_id = SubId}, + Stream, Name, Active, true = SteppingDown) -> + mod_call_effect(P, {sac, #{subscription_id => SubId, stream => Stream, consumer_name => Name, @@ -1171,11 +1153,23 @@ lookup_group(VirtualHost, Stream, ConsumerName, StreamGroups) -> maps:get({VirtualHost, Stream, ConsumerName}, StreamGroups, undefined). -add_to_group(Consumer, #group{consumers = Consumers} = Group) -> - Group#group{consumers = Consumers ++ [Consumer]}. +add_to_group(#consumer{pid = Pid, subscription_id = SubId} = Consumer, + #group{consumers = Consumers} = Group) -> + case lookup_consumer(Pid, SubId, Group) of + {value, _} -> + %% the consumer is already in the group, nothing to do + Group; + false -> + Group#group{consumers = Consumers ++ [Consumer]} + end. -remove_from_group(Consumer, #group{consumers = Consumers} = Group) -> - Group#group{consumers = lists:delete(Consumer, Consumers)}. +remove_from_group(Csr, #group{consumers = Consumers} = Group) -> + CS = lists:filter(fun(C) when ?SAME_CSR(C, Csr) -> + false; + (_) -> + true + end, Consumers), + Group#group{consumers = CS}. has_consumers_from_pid(#group{consumers = Consumers}, Pid) -> lists:any(fun (#consumer{pid = P}) when P == Pid -> @@ -1192,19 +1186,19 @@ compute_active_consumer(#group{partition_index = -1, compute_active_consumer(#group{partition_index = -1, consumers = Consumers} = G) -> case lists:search(fun(#consumer{status = S}) -> - S =:= {?DISCONNECTED, ?ACTIVE} + S =:= ?DISCONN_ACT end, Consumers) of {value, _DisconnectedActive} -> + %% no rebalancing if there is a disconnected active G; false -> case evaluate_active_consumer(G) of undefined -> G; - #consumer{pid = Pid, subscription_id = SubId} -> + AC -> Consumers1 = lists:foldr( - fun(#consumer{pid = P, subscription_id = SID} = C, L) - when P =:= Pid andalso SID =:= SubId -> + fun(C, L) when ?SAME_CSR(AC, C) -> %% change status of new active [csr_status(C, ?CONN_ACT) | L]; (#consumer{status = {?CONNECTED, _}} = C, L) -> @@ -1226,11 +1220,15 @@ evaluate_active_consumer(#group{consumers = Consumers} = G) -> S =:= ?DISCONN_ACT end, Consumers) of {value, C} -> + %% no rebalancing if there is a disconnected active C; _ -> do_evaluate_active_consumer(G#group{consumers = eligible(Consumers)}) end. +do_evaluate_active_consumer(#group{partition_index = PI}) when PI < -1 -> + %% should not happen + undefined; do_evaluate_active_consumer(#group{consumers = Consumers}) when length(Consumers) == 0 -> undefined; @@ -1264,36 +1262,25 @@ lookup_active_consumer(#group{consumers = Consumers}) -> lists:search(fun(#consumer{status = Status}) -> is_active(Status) end, Consumers). -update_groups(_VirtualHost, - _Stream, - _ConsumerName, - undefined, - StreamGroups) -> - StreamGroups; -update_groups(VirtualHost, - Stream, - ConsumerName, - #group{consumers = []}, - StreamGroups) -> +update_groups(_VH, _S, _Name, undefined, Groups) -> + Groups; +update_groups(VH, S, Name, #group{consumers = []}, Groups) + when is_map_key({VH, S, Name}, Groups) -> %% the group is now empty, removing the key - maps:remove({VirtualHost, Stream, ConsumerName}, StreamGroups); -update_groups(VirtualHost, - Stream, - ConsumerName, - Group, - StreamGroups) -> - StreamGroups#{{VirtualHost, Stream, ConsumerName} => Group}. - -update_consumer_state_in_group(#group{consumers = Consumers0} = G, - Pid, - SubId, + maps:remove({VH, S, Name}, Groups); +update_groups(_VH, _S, _Name, #group{consumers = []}, Groups) -> + %% the group is now empty, but not in the group map + %% just returning the map + Groups; +update_groups(VH, S, Name, G, Groups) -> + Groups#{{VH, S, Name} => G}. + +update_consumer_state_in_group(#group{consumers = Consumers0} = G, Csr, NewStatus) -> - CS1 = lists:map(fun(C0) -> - case C0 of - #consumer{pid = Pid, subscription_id = SubId} -> + CS1 = lists:map(fun(C0) when ?SAME_CSR(C0, Csr) -> csr_status(C0, NewStatus); - C -> C - end + (C) -> + C end, Consumers0), G#group{consumers = CS1}. @@ -1314,12 +1301,6 @@ send_message(ConnectionPid, Msg) -> ConnectionPid ! Msg, ok. -same_consumer(#consumer{pid = Pid, subscription_id = SubId}, - #consumer{pid = Pid, subscription_id = SubId}) -> - true; -same_consumer(_, _) -> - false. - -spec compute_pid_group_dependencies(groups()) -> pids_groups(). compute_pid_group_dependencies(Groups) -> maps:fold(fun(K, #group{consumers = Cs}, Acc) -> diff --git a/deps/rabbit/test/rabbit_stream_sac_coordinator_SUITE.erl b/deps/rabbit/test/rabbit_stream_sac_coordinator_SUITE.erl index 800ddb656ab6..f7c6add833fa 100644 --- a/deps/rabbit/test/rabbit_stream_sac_coordinator_SUITE.erl +++ b/deps/rabbit/test/rabbit_stream_sac_coordinator_SUITE.erl @@ -562,10 +562,15 @@ import_state_v4_test(_) -> OldState5 = apply_ensure_monitors(OldMod, Cmd4, OldState4), Cmd5 = register_consumer_command(P, 1, App1, Pid2, 2), OldState6 = apply_ensure_monitors(OldMod, Cmd5, OldState5), - Cmd6 = activate_consumer_command(P, App1), + %% a duplicate consumer sneaks in + %% this should not happen in real life, but it tests the dedup + %% logic in the import function + Cmd6 = register_consumer_command(P, 1, App1, Pid0, 0), OldState7 = apply_ensure_monitors(OldMod, Cmd6, OldState6), + Cmd7 = activate_consumer_command(P, App1), + OldState8 = apply_ensure_monitors(OldMod, Cmd7, OldState7), - Export = OldMod:state_to_map(OldState7), + Export = OldMod:state_to_map(OldState8), #?STATE{groups = Groups, pids_groups = PidsGroups} = ?MOD:import_state(4, Export), assertHasGroup({<<"/">>, S, App0}, grp(-1, [csr(Pid0, 0, active),