Skip to content

Commit 38eaaeb

Browse files
committed
Prevent blocked groups in stream SAC with fine-grained status
A boolean status in the stream SAC coordinator is not enough to follow the evolution of a consumer. For example a former active consumer that is stepping down can go down before another consumer in the group is activated, letting the coordinator expect an activation request that will never arrive, leaving the group without any active consumer. This commit introduces 3 status: active (formerly "true"), waiting (formerly "false"), and deactivating. The coordinator will now know when a deactivating consumer goes down and will trigger a rebalancing to avoid a stuck group. This commit also introduces a status related to the connectivity state of a consumer. The possible values are: connected, disconnected, and presumed_down. Consumers are by default connected, they can become disconnected if the coordinator receives a down event with a noconnection reason, meaning the node of the consumer has been disconnected from the other nodes. Consumers can become connected again when their node joins the other nodes again. Disconnected consumers are still considered part of a group, as they are expected to come back at some point. For example there is no rebalancing in a group if the active consumer got disconnected. The coordinator sets a timer when a disconnection occurs. When the timer expires, corresponding disconnected consumers pass into the "presumed down" state. At this point they are no longer considered part of their respective group and are excluded from rebalancing decision. They are expected to get removed from the group by the appropriate down event of a monitor. So the consumer status is now a tuple, e.g. {connected, active}. Note this is an implementation: only the stream SAC coordinator deals with the status of stream SAC consumers. 2 new configuration entries are introduced: * rabbit.stream_sac_disconnected_timeout: this is the duration in ms of the disconnected-to-forgotten timer. * rabbit.stream_cmd_timeout: this is the timeout in ms to apply RA commands in the coordinator. It used to be a fixed value of 30 seconds. The default value is still the same. The setting has been introduced to make integration tests faster.
1 parent e05fa94 commit 38eaaeb

File tree

1 file changed

+15
-6
lines changed

1 file changed

+15
-6
lines changed

deps/rabbit/src/rabbit_stream_sac_coordinator.erl

Lines changed: 15 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,8 @@
7878
-define(DISCONNECTED_TIMEOUT_APP_KEY, stream_sac_disconnected_timeout).
7979
-define(DISCONNECTED_TIMEOUT_CONF_KEY, disconnected_timeout).
8080
-define(DISCONNECTED_TIMEOUT_MS, 60_000).
81+
-define(SAC_ERRORS, [partition_index_conflict, not_found]).
82+
-define(IS_STATE_REC(T), is_record(T, ?MODULE)).
8183

8284
%% Single Active Consumer API
8385
-spec register_consumer(binary(),
@@ -538,7 +540,8 @@ maybe_rebalance_group(#group{partition_index = _, consumers = Consumers} = G,
538540
end.
539541

540542
-spec consumer_groups(binary(), [atom()], state()) -> {ok, [term()]}.
541-
consumer_groups(VirtualHost, InfoKeys, #?MODULE{groups = Groups}) ->
543+
consumer_groups(VirtualHost, InfoKeys, #?MODULE{groups = Groups} = S)
544+
when ?IS_STATE_REC(S) ->
542545
Res = maps:fold(fun ({VH, Stream, Reference},
543546
#group{consumers = Consumers,
544547
partition_index = PartitionIndex},
@@ -568,7 +571,9 @@ consumer_groups(VirtualHost, InfoKeys, #?MODULE{groups = Groups}) ->
568571
Acc
569572
end,
570573
[], Groups),
571-
{ok, lists:reverse(Res)}.
574+
{ok, lists:reverse(Res)};
575+
consumer_groups(VirtualHost, InfoKeys, S) ->
576+
rabbit_stream_sac_coordinator_v4:consumer_groups(VirtualHost, InfoKeys, S).
572577

573578
-spec group_consumers(binary(),
574579
binary(),
@@ -580,7 +585,8 @@ group_consumers(VirtualHost,
580585
Stream,
581586
Reference,
582587
InfoKeys,
583-
#?MODULE{groups = Groups}) ->
588+
#?MODULE{groups = Groups} = S)
589+
when ?IS_STATE_REC(S) ->
584590
GroupId = {VirtualHost, Stream, Reference},
585591
case Groups of
586592
#{GroupId := #group{consumers = Consumers}} ->
@@ -612,7 +618,10 @@ group_consumers(VirtualHost,
612618
{ok, Cs};
613619
_ ->
614620
{error, not_found}
615-
end.
621+
end;
622+
group_consumers(VirtualHost, Stream, Reference, InfoKeys, S) ->
623+
rabbit_stream_sac_coordinator_v4:group_consumers(VirtualHost, Stream,
624+
Reference, InfoKeys, S).
616625

617626
cli_consumer_status_label({?PDOWN, _}) ->
618627
inactive;
@@ -868,7 +877,7 @@ import_state(4, #{<<"groups">> := Groups, <<"pids_groups">> := PidsGroups}) ->
868877
conf = #{disconnected_timeout => ?DISCONNECTED_TIMEOUT_MS}}.
869878

870879
-spec check_conf_change(state() | term()) -> {new, conf()} | unchanged.
871-
check_conf_change(State) when is_record(State, ?MODULE) ->
880+
check_conf_change(State) when ?IS_STATE_REC(State) ->
872881
#?MODULE{conf = Conf} = State,
873882
DisconTimeout = lookup_disconnected_timeout(),
874883
case Conf of
@@ -894,7 +903,7 @@ list_nodes(#?MODULE{groups = Groups}) ->
894903
-spec state_enter(ra_server:ra_state(), state() | term()) ->
895904
ra_machine:effects().
896905
state_enter(leader, #?MODULE{groups = Groups} = State)
897-
when is_record(State, ?MODULE)->
906+
when ?IS_STATE_REC(State) ->
898907
%% iterate over groups
899908
{Nodes, DisConns} =
900909
maps:fold(fun(_, #group{consumers = Cs}, Acc) ->

0 commit comments

Comments
 (0)