Skip to content

Commit b4f7d46

Browse files
committed
Miscellaneous minor improvements in stream SAC coordinator
This commit handles edge cases in the stream SAC coordinator to make sure it does not crash during execution. Most of these edge cases consist in an inconsistent state, so there are very unlikely to happen. This commit also makes sure there is no duplicate in the consumer list of a group. Consumers are also now identified only by their connection PID and their subscription ID, as now the timestamp they contain in their state does not allow a field-by-field comparison.
1 parent 4bca14a commit b4f7d46

File tree

3 files changed

+304
-314
lines changed

3 files changed

+304
-314
lines changed

deps/rabbit/src/rabbit_stream_coordinator.erl

Lines changed: 14 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -710,8 +710,7 @@ apply(#{machine_version := Vsn} = Meta,
710710
_ ->
711711
return(Meta, State0, stream_not_found, [])
712712
end;
713-
apply(#{machine_version := Vsn} = Meta,
714-
{nodeup, Node} = Cmd,
713+
apply(Meta, {nodeup, Node} = Cmd,
715714
#?MODULE{monitors = Monitors0,
716715
streams = Streams0,
717716
single_active_consumer = Sac0} = State) ->
@@ -735,14 +734,8 @@ apply(#{machine_version := Vsn} = Meta,
735734
{Ss#{Id => S}, E}
736735
end, {Streams0, Effects0}, Streams0),
737736

738-
{Sac1, Effects2} = case ?V5_OR_MORE(Vsn) of
739-
true ->
740-
SacMod = sac_module(Meta),
741-
SacMod:handle_node_reconnected(Node,
742-
Sac0, Effects1);
743-
false ->
744-
{Sac0, Effects1}
745-
end,
737+
738+
{Sac1, Effects2} = sac_handle_node_reconnected(Meta, Node, Sac0, Effects1),
746739
return(Meta, State#?MODULE{monitors = Monitors,
747740
streams = Streams,
748741
single_active_consumer = Sac1}, ok, Effects2);
@@ -2444,6 +2437,17 @@ sac_handle_connection_down(SacState, Pid, Reason, Vsn) when ?V5_OR_MORE(Vsn) ->
24442437
sac_handle_connection_down(SacState, Pid, _Reason, _Vsn) ->
24452438
?SAC_V4:handle_connection_down(Pid, SacState).
24462439

2440+
sac_handle_node_reconnected(#{machine_version := Vsn} = Meta, Node,
2441+
Sac, Effects) ->
2442+
case ?V5_OR_MORE(Vsn) of
2443+
true ->
2444+
SacMod = sac_module(Meta),
2445+
SacMod:handle_node_reconnected(Node,
2446+
Sac, Effects);
2447+
false ->
2448+
{Sac, Effects}
2449+
end.
2450+
24472451
sac_make_purge_nodes(Nodes) ->
24482452
rabbit_stream_sac_coordinator:make_purge_nodes(Nodes).
24492453

0 commit comments

Comments
 (0)