Skip to content

Commit 112d2b6

Browse files
committed
Miscellaneous minor improvements in stream SAC coordinator
This commit handles edge cases in the stream SAC coordinator to make sure it does not crash during execution. Most of these edge cases consist in an inconsistent state, so there are very unlikely to happen. This commit also makes sure there is no duplicate in the consumer list of a group. Consumers are also now identified only by their connection PID and their subscription ID, as now the timestamp they contain in their state does not allow a field-by-field comparison.
1 parent 4bca14a commit 112d2b6

File tree

3 files changed

+303
-312
lines changed

3 files changed

+303
-312
lines changed

deps/rabbit/src/rabbit_stream_coordinator.erl

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -735,14 +735,8 @@ apply(#{machine_version := Vsn} = Meta,
735735
{Ss#{Id => S}, E}
736736
end, {Streams0, Effects0}, Streams0),
737737

738-
{Sac1, Effects2} = case ?V5_OR_MORE(Vsn) of
739-
true ->
740-
SacMod = sac_module(Meta),
741-
SacMod:handle_node_reconnected(Node,
742-
Sac0, Effects1);
743-
false ->
744-
{Sac0, Effects1}
745-
end,
738+
739+
{Sac1, Effects2} = sac_handle_node_reconnected(Meta, Node, Sac0, Effects1),
746740
return(Meta, State#?MODULE{monitors = Monitors,
747741
streams = Streams,
748742
single_active_consumer = Sac1}, ok, Effects2);
@@ -2444,6 +2438,17 @@ sac_handle_connection_down(SacState, Pid, Reason, Vsn) when ?V5_OR_MORE(Vsn) ->
24442438
sac_handle_connection_down(SacState, Pid, _Reason, _Vsn) ->
24452439
?SAC_V4:handle_connection_down(Pid, SacState).
24462440

2441+
sac_handle_node_reconnected(#{machine_version := Vsn} = Meta, Node,
2442+
Sac, Effects) ->
2443+
case ?V5_OR_MORE(Vsn) of
2444+
true ->
2445+
SacMod = sac_module(Meta),
2446+
SacMod:handle_node_reconnected(Node,
2447+
Sac, Effects);
2448+
false ->
2449+
{Sac, Effects}
2450+
end.
2451+
24472452
sac_make_purge_nodes(Nodes) ->
24482453
rabbit_stream_sac_coordinator:make_purge_nodes(Nodes).
24492454

0 commit comments

Comments
 (0)