Skip to content

Commit bd8f08f

Browse files
committed
Sort for better determinism
1 parent 6b96a22 commit bd8f08f

File tree

1 file changed

+30
-31
lines changed

1 file changed

+30
-31
lines changed

deps/rabbit/src/rabbit_stream_sac_coordinator.erl

Lines changed: 30 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -327,9 +327,9 @@ apply(#command_activate_consumer{vhost = VirtualHost,
327327
apply(#command_connection_reconnected{pid = Pid},
328328
#?MODULE{groups = Groups0} = State0) ->
329329
{State1, Eff} =
330-
maps:fold(fun(G, _, {St, Eff}) ->
331-
handle_group_connection_reconnected(Pid, St, Eff, G)
332-
end, {State0, []}, Groups0),
330+
maps:fold(fun(G, _, {St, Eff}) ->
331+
handle_group_connection_reconnected(Pid, St, Eff, G)
332+
end, {State0, []}, Groups0),
333333

334334
{State1, ok, Eff};
335335
apply(#command_purge_nodes{nodes = Nodes}, State0) ->
@@ -554,26 +554,26 @@ consumer_groups(VirtualHost, InfoKeys, #?MODULE{groups = Groups} = S)
554554
#group{consumers = Consumers,
555555
partition_index = PartitionIndex},
556556
Acc)
557-
when VH == VirtualHost ->
557+
when VH == VirtualHost ->
558558
Record =
559-
lists:foldr(fun (stream, RecAcc) ->
560-
[{stream, Stream} | RecAcc];
561-
(reference, RecAcc) ->
562-
[{reference, Reference}
563-
| RecAcc];
564-
(partition_index, RecAcc) ->
565-
[{partition_index,
566-
PartitionIndex}
567-
| RecAcc];
568-
(consumers, RecAcc) ->
569-
[{consumers,
570-
length(Consumers)}
571-
| RecAcc];
572-
(Unknown, RecAcc) ->
573-
[{Unknown, unknown_field}
574-
| RecAcc]
575-
end,
576-
[], InfoKeys),
559+
lists:foldr(fun (stream, RecAcc) ->
560+
[{stream, Stream} | RecAcc];
561+
(reference, RecAcc) ->
562+
[{reference, Reference}
563+
| RecAcc];
564+
(partition_index, RecAcc) ->
565+
[{partition_index,
566+
PartitionIndex}
567+
| RecAcc];
568+
(consumers, RecAcc) ->
569+
[{consumers,
570+
length(Consumers)}
571+
| RecAcc];
572+
(Unknown, RecAcc) ->
573+
[{Unknown, unknown_field}
574+
| RecAcc]
575+
end,
576+
[], InfoKeys),
577577
[Record | Acc];
578578
(_GroupId, _Group, Acc) ->
579579
Acc
@@ -745,10 +745,10 @@ handle_connection_node_disconnected(ConnPid,
745745
{Groups, PidsGroups1} ->
746746
State1 = State0#?MODULE{pids_groups = PidsGroups1},
747747
State2 =
748-
maps:fold(fun(G, _, Acc) ->
749-
handle_group_after_connection_node_disconnected(
750-
ConnPid, Acc, G)
751-
end, State1, Groups),
748+
maps:fold(fun(G, _, Acc) ->
749+
handle_group_after_connection_node_disconnected(
750+
ConnPid, Acc, G)
751+
end, State1, Groups),
752752
T = disconnected_timeout(State2),
753753
{State2, [node_disconnected_timer_effect(ConnPid, T)]}
754754
end.
@@ -897,7 +897,7 @@ list_nodes(#?MODULE{groups = Groups}) ->
897897
GNodes = nodes_from_group(G),
898898
maps:merge(Acc, GNodes)
899899
end, #{}, Groups),
900-
maps:keys(Nodes).
900+
lists:sort(maps:keys(Nodes)).
901901

902902
-spec state_enter(ra_server:ra_state(), state() | term()) ->
903903
ra_machine:effects().
@@ -923,7 +923,7 @@ state_enter(leader, #?MODULE{groups = Groups} = State)
923923
DisTimeout = disconnected_timeout(State),
924924
%% monitor involved nodes
925925
%% reset a timer for disconnected connections
926-
[{monitor, node, N} || N <- maps:keys(Nodes)] ++
926+
[{monitor, node, N} || N <- lists:sort(maps:keys(Nodes))] ++
927927
[begin
928928
Time = case ts() - Ts of
929929
T when T < 10_000 ->
@@ -933,7 +933,7 @@ state_enter(leader, #?MODULE{groups = Groups} = State)
933933
DisTimeout
934934
end,
935935
node_disconnected_timer_effect(P, Time)
936-
end || P := Ts <- DisConns];
936+
end || P := Ts <- maps:iterator(DisConns, ordered)];
937937
state_enter(_, _) ->
938938
[].
939939

@@ -1323,7 +1323,7 @@ compute_pid_group_dependencies(Groups) ->
13231323
PG1 = PG0#{K => true},
13241324
AccIn#{Pid => PG1}
13251325
end, Acc, Cs)
1326-
end, #{}, Groups).
1326+
end, #{}, Groups).
13271327

13281328
-spec compute_node_pid_group_dependencies(node(), groups()) -> pids_groups().
13291329
compute_node_pid_group_dependencies(Node, Groups) ->
@@ -1357,4 +1357,3 @@ node_disconnected_timer_effect(Pid, T) ->
13571357

13581358
ts() ->
13591359
erlang:system_time(millisecond).
1360-

0 commit comments

Comments
 (0)