Skip to content

Commit 2170396

Browse files
committed
Pass all members to synchronous Ra operations
As this will make these more work reliably when leadership is in flux. Also ensure that servers started and added after the initial declare uses the same configuration.
1 parent 4e0211d commit 2170396

File tree

2 files changed

+65
-32
lines changed

2 files changed

+65
-32
lines changed

src/rabbit_fifo.erl

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,9 @@
6464
make_credit/4,
6565
make_purge/0,
6666
make_purge_nodes/1,
67-
make_update_config/1
67+
make_update_config/1,
68+
69+
from_log/2
6870
]).
6971

7072
%% command records representing all the protocol actions that are supported
@@ -754,6 +756,24 @@ usage(Name) when is_atom(Name) ->
754756
[{_, Use}] -> Use
755757
end.
756758

759+
from_log(Log, State0) ->
760+
lists:foldl(
761+
fun ({Idx, Term, {'$usr', Meta0, Cmd, _}}, {S0, Effs}) ->
762+
Meta = Meta0#{index => Idx,
763+
term => Term},
764+
case apply(Meta, Cmd, S0) of
765+
{S, _, E} when is_list(E) ->
766+
{S, Effs ++ E};
767+
{S, _, E} ->
768+
{S, Effs ++ [E]};
769+
{S, _} ->
770+
{S, Effs}
771+
end;
772+
(_, Acc) ->
773+
Acc
774+
end, {State0, []}, Log).
775+
776+
757777
%%% Internal
758778

759779
messages_ready(#?MODULE{messages = M,

src/rabbit_quorum_queue.erl

Lines changed: 44 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -119,24 +119,9 @@ declare(Q) when ?amqqueue_is_quorum(Q) ->
119119
NewQ1 = amqqueue:set_quorum_nodes(NewQ0, Nodes),
120120
case rabbit_amqqueue:internal_declare(NewQ1, false) of
121121
{created, NewQ} ->
122-
RaMachine = ra_machine(NewQ),
123-
ServerIds = [{RaName, Node} || Node <- Nodes],
124-
ClusterName = RaName,
125122
TickTimeout = application:get_env(rabbit, quorum_tick_interval, ?TICK_TIMEOUT),
126-
RaConfs = [begin
127-
UId = ra:new_uid(ra_lib:to_binary(ClusterName)),
128-
FName = rabbit_misc:rs(QName),
129-
#{cluster_name => ClusterName,
130-
id => ServerId,
131-
uid => UId,
132-
friendly_name => FName,
133-
metrics_key => QName,
134-
initial_members => ServerIds,
135-
log_init_args => #{uid => UId},
136-
tick_timeout => TickTimeout,
137-
machine => RaMachine}
138-
end || ServerId <- ServerIds],
139-
123+
RaConfs = [make_ra_conf(NewQ, ServerId, TickTimeout)
124+
|| ServerId <- members(NewQ)],
140125
case ra:start_cluster(RaConfs) of
141126
{ok, _, _} ->
142127
rabbit_event:notify(queue_created,
@@ -324,7 +309,6 @@ reductions(Name) ->
324309
recover(Queues) ->
325310
[begin
326311
{Name, _} = amqqueue:get_pid(Q0),
327-
Nodes = amqqueue:get_quorum_nodes(Q0),
328312
case ra:restart_server({Name, node()}) of
329313
ok ->
330314
% queue was restarted, good
@@ -334,10 +318,12 @@ recover(Queues) ->
334318
Err1 == name_not_registered ->
335319
% queue was never started on this node
336320
% so needs to be started from scratch.
337-
Machine = ra_machine(Q0),
338-
RaNodes = [{Name, Node} || Node <- Nodes],
339-
case ra:start_server(Name, {Name, node()}, Machine, RaNodes) of
340-
ok -> ok;
321+
TickTimeout = application:get_env(rabbit, quorum_tick_interval,
322+
?TICK_TIMEOUT),
323+
Conf = make_ra_conf(Q0, {Name, node()}, TickTimeout),
324+
case ra:start_server(Conf) of
325+
ok ->
326+
ok;
341327
Err2 ->
342328
rabbit_log:warning("recover: quorum queue ~w could not"
343329
" be started ~w", [Name, Err2]),
@@ -723,15 +709,17 @@ add_member(VHost, Name, Node) ->
723709
end.
724710

725711
add_member(Q, Node) when ?amqqueue_is_quorum(Q) ->
726-
{RaName, _} = ServerRef = amqqueue:get_pid(Q),
712+
{RaName, _} = amqqueue:get_pid(Q),
727713
QName = amqqueue:get_name(Q),
728-
QNodes = amqqueue:get_quorum_nodes(Q),
729714
%% TODO parallel calls might crash this, or add a duplicate in quorum_nodes
730715
ServerId = {RaName, Node},
731-
case ra:start_server(RaName, ServerId, ra_machine(Q),
732-
[{RaName, N} || N <- QNodes]) of
716+
Members = members(Q),
717+
TickTimeout = application:get_env(rabbit, quorum_tick_interval,
718+
?TICK_TIMEOUT),
719+
Conf = make_ra_conf(Q, ServerId, TickTimeout),
720+
case ra:start_server(Conf) of
733721
ok ->
734-
case ra:add_member(ServerRef, ServerId) of
722+
case ra:add_member(Members, ServerId) of
735723
{ok, _, Leader} ->
736724
Fun = fun(Q1) ->
737725
Q2 = amqqueue:set_quorum_nodes(
@@ -770,16 +758,18 @@ delete_member(VHost, Name, Node) ->
770758
E
771759
end.
772760

761+
773762
delete_member(Q, Node) when ?amqqueue_is_quorum(Q) ->
774763
QName = amqqueue:get_name(Q),
775764
{RaName, _} = amqqueue:get_pid(Q),
776765
ServerId = {RaName, Node},
777-
case amqqueue:get_quorum_nodes(Q) of
778-
[Node] ->
766+
case members(Q) of
767+
[{_, Node}] ->
768+
779769
%% deleting the last member is not allowed
780770
{error, last_node};
781-
_ ->
782-
case ra:leave_and_delete_server(amqqueue:get_pid(Q), ServerId) of
771+
Members ->
772+
case ra:leave_and_delete_server(Members, ServerId) of
783773
ok ->
784774
Fun = fun(Q1) ->
785775
amqqueue:set_quorum_nodes(
@@ -1143,3 +1133,26 @@ select_quorum_nodes(0, _, Selected) ->
11431133
select_quorum_nodes(Size, Rest, Selected) ->
11441134
S = lists:nth(rand:uniform(length(Rest)), Rest),
11451135
select_quorum_nodes(Size - 1, lists:delete(S, Rest), [S | Selected]).
1136+
1137+
%% member with the current leader first
1138+
members(Q) when ?amqqueue_is_quorum(Q) ->
1139+
{RaName, LeaderNode} = amqqueue:get_pid(Q),
1140+
Nodes = lists:delete(LeaderNode, amqqueue:get_quorum_nodes(Q)),
1141+
[{RaName, N} || N <- [LeaderNode | Nodes]].
1142+
1143+
make_ra_conf(Q, ServerId, TickTimeout) ->
1144+
QName = amqqueue:get_name(Q),
1145+
RaMachine = ra_machine(Q),
1146+
[{ClusterName, _} | _] = Members = members(Q),
1147+
UId = ra:new_uid(ra_lib:to_binary(ClusterName)),
1148+
FName = rabbit_misc:rs(QName),
1149+
#{cluster_name => ClusterName,
1150+
id => ServerId,
1151+
uid => UId,
1152+
friendly_name => FName,
1153+
metrics_key => QName,
1154+
initial_members => Members,
1155+
log_init_args => #{uid => UId},
1156+
tick_timeout => TickTimeout,
1157+
machine => RaMachine}.
1158+

0 commit comments

Comments
 (0)