Skip to content

Commit 098ac57

Browse files
Merge pull request #2046 from rabbitmq/qq-sync-op-fixes
Pass all members to synchronous Ra operations
2 parents 33a7f97 + f1fe600 commit 098ac57

File tree

2 files changed

+66
-33
lines changed

2 files changed

+66
-33
lines changed

src/rabbit_fifo.erl

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,9 @@
6464
make_credit/4,
6565
make_purge/0,
6666
make_purge_nodes/1,
67-
make_update_config/1
67+
make_update_config/1,
68+
69+
from_log/2
6870
]).
6971

7072
%% command records representing all the protocol actions that are supported
@@ -754,6 +756,24 @@ usage(Name) when is_atom(Name) ->
754756
[{_, Use}] -> Use
755757
end.
756758

759+
from_log(Log, State0) ->
760+
lists:foldl(
761+
fun ({Idx, Term, {'$usr', Meta0, Cmd, _}}, {S0, Effs}) ->
762+
Meta = Meta0#{index => Idx,
763+
term => Term},
764+
case apply(Meta, Cmd, S0) of
765+
{S, _, E} when is_list(E) ->
766+
{S, Effs ++ E};
767+
{S, _, E} ->
768+
{S, Effs ++ [E]};
769+
{S, _} ->
770+
{S, Effs}
771+
end;
772+
(_, Acc) ->
773+
Acc
774+
end, {State0, []}, Log).
775+
776+
757777
%%% Internal
758778

759779
messages_ready(#?MODULE{messages = M,

src/rabbit_quorum_queue.erl

Lines changed: 45 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -120,24 +120,9 @@ declare(Q) when ?amqqueue_is_quorum(Q) ->
120120
NewQ1 = amqqueue:set_quorum_nodes(NewQ0, Nodes),
121121
case rabbit_amqqueue:internal_declare(NewQ1, false) of
122122
{created, NewQ} ->
123-
RaMachine = ra_machine(NewQ),
124-
ServerIds = [{RaName, Node} || Node <- Nodes],
125-
ClusterName = RaName,
126123
TickTimeout = application:get_env(rabbit, quorum_tick_interval, ?TICK_TIMEOUT),
127-
RaConfs = [begin
128-
UId = ra:new_uid(ra_lib:to_binary(ClusterName)),
129-
FName = rabbit_misc:rs(QName),
130-
#{cluster_name => ClusterName,
131-
id => ServerId,
132-
uid => UId,
133-
friendly_name => FName,
134-
metrics_key => QName,
135-
initial_members => ServerIds,
136-
log_init_args => #{uid => UId},
137-
tick_timeout => TickTimeout,
138-
machine => RaMachine}
139-
end || ServerId <- ServerIds],
140-
124+
RaConfs = [make_ra_conf(NewQ, ServerId, TickTimeout)
125+
|| ServerId <- members(NewQ)],
141126
case ra:start_cluster(RaConfs) of
142127
{ok, _, _} ->
143128
rabbit_event:notify(queue_created,
@@ -325,7 +310,6 @@ reductions(Name) ->
325310
recover(Queues) ->
326311
[begin
327312
{Name, _} = amqqueue:get_pid(Q0),
328-
Nodes = amqqueue:get_quorum_nodes(Q0),
329313
case ra:restart_server({Name, node()}) of
330314
ok ->
331315
% queue was restarted, good
@@ -335,10 +319,12 @@ recover(Queues) ->
335319
Err1 == name_not_registered ->
336320
% queue was never started on this node
337321
% so needs to be started from scratch.
338-
Machine = ra_machine(Q0),
339-
RaNodes = [{Name, Node} || Node <- Nodes],
340-
case ra:start_server(Name, {Name, node()}, Machine, RaNodes) of
341-
ok -> ok;
322+
TickTimeout = application:get_env(rabbit, quorum_tick_interval,
323+
?TICK_TIMEOUT),
324+
Conf = make_ra_conf(Q0, {Name, node()}, TickTimeout),
325+
case ra:start_server(Conf) of
326+
ok ->
327+
ok;
342328
Err2 ->
343329
rabbit_log:warning("recover: quorum queue ~w could not"
344330
" be started ~w", [Name, Err2]),
@@ -724,15 +710,17 @@ add_member(VHost, Name, Node, Timeout) ->
724710
end.
725711

726712
add_member(Q, Node, Timeout) when ?amqqueue_is_quorum(Q) ->
727-
{RaName, _} = ServerRef = amqqueue:get_pid(Q),
713+
{RaName, _} = amqqueue:get_pid(Q),
728714
QName = amqqueue:get_name(Q),
729-
QNodes = amqqueue:get_quorum_nodes(Q),
730715
%% TODO parallel calls might crash this, or add a duplicate in quorum_nodes
731716
ServerId = {RaName, Node},
732-
case ra:start_server(RaName, ServerId, ra_machine(Q),
733-
[{RaName, N} || N <- QNodes]) of
717+
Members = members(Q),
718+
TickTimeout = application:get_env(rabbit, quorum_tick_interval,
719+
?TICK_TIMEOUT),
720+
Conf = make_ra_conf(Q, ServerId, TickTimeout),
721+
case ra:start_server(Conf) of
734722
ok ->
735-
case ra:add_member(ServerRef, ServerId, Timeout) of
723+
case ra:add_member(Members, ServerId, Timeout) of
736724
{ok, _, Leader} ->
737725
Fun = fun(Q1) ->
738726
Q2 = amqqueue:set_quorum_nodes(
@@ -745,7 +733,7 @@ add_member(Q, Node, Timeout) when ?amqqueue_is_quorum(Q) ->
745733
ok;
746734
{timeout, _} ->
747735
_ = ra:force_delete_server(ServerId),
748-
_ = ra:remove_member(ServerRef, ServerId),
736+
_ = ra:remove_member(Members, ServerId),
749737
{error, timeout};
750738
E ->
751739
_ = ra:force_delete_server(ServerId),
@@ -773,16 +761,18 @@ delete_member(VHost, Name, Node) ->
773761
E
774762
end.
775763

764+
776765
delete_member(Q, Node) when ?amqqueue_is_quorum(Q) ->
777766
QName = amqqueue:get_name(Q),
778767
{RaName, _} = amqqueue:get_pid(Q),
779768
ServerId = {RaName, Node},
780-
case amqqueue:get_quorum_nodes(Q) of
781-
[Node] ->
769+
case members(Q) of
770+
[{_, Node}] ->
771+
782772
%% deleting the last member is not allowed
783773
{error, last_node};
784-
_ ->
785-
case ra:leave_and_delete_server(amqqueue:get_pid(Q), ServerId) of
774+
Members ->
775+
case ra:leave_and_delete_server(Members, ServerId) of
786776
ok ->
787777
Fun = fun(Q1) ->
788778
amqqueue:set_quorum_nodes(
@@ -1146,3 +1136,26 @@ select_quorum_nodes(0, _, Selected) ->
11461136
select_quorum_nodes(Size, Rest, Selected) ->
11471137
S = lists:nth(rand:uniform(length(Rest)), Rest),
11481138
select_quorum_nodes(Size - 1, lists:delete(S, Rest), [S | Selected]).
1139+
1140+
%% member with the current leader first
1141+
members(Q) when ?amqqueue_is_quorum(Q) ->
1142+
{RaName, LeaderNode} = amqqueue:get_pid(Q),
1143+
Nodes = lists:delete(LeaderNode, amqqueue:get_quorum_nodes(Q)),
1144+
[{RaName, N} || N <- [LeaderNode | Nodes]].
1145+
1146+
make_ra_conf(Q, ServerId, TickTimeout) ->
1147+
QName = amqqueue:get_name(Q),
1148+
RaMachine = ra_machine(Q),
1149+
[{ClusterName, _} | _] = Members = members(Q),
1150+
UId = ra:new_uid(ra_lib:to_binary(ClusterName)),
1151+
FName = rabbit_misc:rs(QName),
1152+
#{cluster_name => ClusterName,
1153+
id => ServerId,
1154+
uid => UId,
1155+
friendly_name => FName,
1156+
metrics_key => QName,
1157+
initial_members => Members,
1158+
log_init_args => #{uid => UId},
1159+
tick_timeout => TickTimeout,
1160+
machine => RaMachine}.
1161+

0 commit comments

Comments
 (0)