Skip to content

Commit 64ba80f

Browse files
committed
Ensure to only grow QQs when all existing members are in 'voter' status
1 parent 5d67171 commit 64ba80f

File tree

4 files changed

+86
-18
lines changed

4 files changed

+86
-18
lines changed

deps/rabbit/src/rabbit_quorum_queue.erl

Lines changed: 32 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1588,19 +1588,43 @@ grow(QuorumClusterSize, _VhostSpec, _QueueSpec, _Strategy, _Membership)
15881588
{error, bad_quorum_cluster_size}.
15891589

15901590
maybe_grow(Q, Node, Membership, Size) ->
1591+
QNodes = get_nodes(Q),
1592+
maybe_grow(Q, Node, Membership, Size, QNodes).
1593+
1594+
maybe_grow(Q, Node, Membership, Size, QNodes) ->
15911595
QName = amqqueue:get_name(Q),
1592-
rabbit_log:info("~ts: adding a new member (replica) on node ~w",
1593-
[rabbit_misc:rs(QName), Node]),
1594-
case add_member(Q, Node, Membership) of
1595-
ok ->
1596-
{QName, {ok, Size + 1}};
1597-
{error, Err} ->
1596+
{ok, RaName} = qname_to_internal_name(QName),
1597+
case check_all_memberships(RaName, QNodes, voter) of
1598+
true ->
1599+
rabbit_log:info("~ts: adding a new member (replica) on node ~w",
1600+
[rabbit_misc:rs(QName), Node]),
1601+
case add_member(Q, Node, Membership) of
1602+
ok ->
1603+
{QName, {ok, Size + 1}};
1604+
{error, Err} ->
1605+
rabbit_log:warning(
1606+
"~ts: failed to add member (replica) on node ~w, error: ~w",
1607+
[rabbit_misc:rs(QName), Node, Err]),
1608+
{QName, {error, Size, Err}}
1609+
end;
1610+
false ->
1611+
Err = {error, non_voters_found},
15981612
rabbit_log:warning(
1599-
"~ts: failed to add member (replica) on node ~w, error: ~w",
1600-
[rabbit_misc:rs(QName), Node, Err]),
1613+
"~ts: failed to add member (replica) on node ~w, error: ~w",
1614+
[rabbit_misc:rs(QName), Node, Err]),
16011615
{QName, {error, Size, Err}}
16021616
end.
16031617

1618+
check_all_memberships(RaName, QNodes, CompareMembership) ->
1619+
case rpc:multicall(QNodes, ets, lookup, [ra_state, RaName]) of
1620+
{Result, []} ->
1621+
lists:all(
1622+
fun(M) -> M == CompareMembership end,
1623+
[Membership || [{_RaName, _RaState, Membership}] <- Result]);
1624+
_ ->
1625+
false
1626+
end.
1627+
16041628
-spec transfer_leadership(amqqueue:amqqueue(), node()) -> {migrated, node()} | {not_migrated, atom()}.
16051629
transfer_leadership(Q, Destination) ->
16061630
{RaName, _} = Pid = amqqueue:get_pid(Q),

deps/rabbit/test/quorum_queue_SUITE.erl

Lines changed: 39 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1538,7 +1538,7 @@ subscribe_from_each(Config) ->
15381538
ok.
15391539

15401540
grow_queue(Config) ->
1541-
[Server0, Server1, _Server2, _Server3, _Server4] =
1541+
[Server0, Server1, Server2, _Server3, _Server4] =
15421542
rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
15431543

15441544
Ch = rabbit_ct_client_helpers:open_channel(Config, Server0),
@@ -1571,42 +1571,72 @@ grow_queue(Config) ->
15711571

15721572
%% grow queues to node 'Server1'
15731573
TargetClusterSize_2 = 2,
1574-
rpc:call(Server0, rabbit_quorum_queue, grow, [Server1, <<"/">>, <<".*">>, all]),
1574+
Result1 = rpc:call(Server0, rabbit_quorum_queue, grow, [Server1, <<"/">>, <<".*">>, all]),
1575+
%% [{{resource,<<"/">>,queue,<<"grow_queue">>},{ok,2}},
1576+
%% {{resource,<<"/">>,queue,<<"grow_queue_alt">>},{ok,2}},...]
1577+
?assert(lists:all(fun({_, {R, _}}) -> R =:= ok end, Result1)),
15751578
assert_grown_queues(QQs, Server0, TargetClusterSize_2, MsgCount),
15761579

15771580
%% grow queues to quorum cluster size '2' has no effect
1578-
rpc:call(Server0, rabbit_quorum_queue, grow, [TargetClusterSize_2, <<"/">>, <<".*">>, all]),
1581+
Result2 = rpc:call(Server0, rabbit_quorum_queue, grow, [TargetClusterSize_2, <<"/">>, <<".*">>, all]),
1582+
?assertEqual([], Result2),
15791583
assert_grown_queues(QQs, Server0, TargetClusterSize_2, MsgCount),
15801584

15811585
%% grow queues to quorum cluster size '3'
15821586
TargetClusterSize_3 = 3,
1583-
rpc:call(Server0, rabbit_quorum_queue, grow, [TargetClusterSize_3, <<"/">>, <<".*">>, all]),
1587+
Result3 = rpc:call(Server0, rabbit_quorum_queue, grow, [TargetClusterSize_3, <<"/">>, <<".*">>, all, voter]),
1588+
?assert(lists:all(fun({_, {R, _}}) -> R =:= ok end, Result3)),
15841589
assert_grown_queues(QQs, Server0, TargetClusterSize_3, MsgCount),
15851590

15861591
%% grow queues to quorum cluster size '5'
15871592
TargetClusterSize_5 = 5,
1588-
rpc:call(Server0, rabbit_quorum_queue, grow, [TargetClusterSize_5, <<"/">>, <<".*">>, all]),
1593+
Result4 = rpc:call(Server0, rabbit_quorum_queue, grow, [TargetClusterSize_5, <<"/">>, <<".*">>, all, voter]),
1594+
?assert(lists:all(fun({_, {R, _}}) -> R =:= ok end, Result4)),
15891595
assert_grown_queues(QQs, Server0, TargetClusterSize_5, MsgCount),
15901596

1591-
%% shrink all queues again
1597+
%% shrink all queues again down to 1 member
15921598
rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_quorum_queue,
15931599
force_all_queues_shrink_member_to_current_member, []),
15941600
assert_grown_queues(QQs, Server0, TargetClusterSize_1, MsgCount),
15951601

15961602
%% grow queues to quorum cluster size > '5' (limit = 5).
15971603
TargetClusterSize_10 = 10,
1598-
rpc:call(Server0, rabbit_quorum_queue, grow, [TargetClusterSize_10, <<"/">>, <<".*">>, all]),
1604+
Result5 = rpc:call(Server0, rabbit_quorum_queue, grow, [TargetClusterSize_10, <<"/">>, <<".*">>, all]),
1605+
?assert(lists:all(fun({_, {R, _}}) -> R =:= ok end, Result5)),
15991606
assert_grown_queues(QQs, Server0, TargetClusterSize_5, MsgCount),
16001607

1601-
%% shrink all queues again
1608+
%% shrink all queues again down to 1 member
16021609
rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_quorum_queue,
16031610
force_all_queues_shrink_member_to_current_member, []),
16041611
assert_grown_queues(QQs, Server0, TargetClusterSize_1, MsgCount),
16051612

16061613
%% attempt to grow queues to quorum cluster size < '0'.
16071614
BadTargetClusterSize = -5,
16081615
?assertEqual({error, bad_quorum_cluster_size},
1609-
rpc:call(Server0, rabbit_quorum_queue, grow, [BadTargetClusterSize, <<"/">>, <<".*">>, all])).
1616+
rpc:call(Server0, rabbit_quorum_queue, grow, [BadTargetClusterSize, <<"/">>, <<".*">>, all])),
1617+
1618+
%% shrink all queues again down to 1 member
1619+
rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_quorum_queue,
1620+
force_all_queues_shrink_member_to_current_member, []),
1621+
assert_grown_queues(QQs, Server0, TargetClusterSize_1, MsgCount),
1622+
1623+
%% grow queues to node 'Server1': non_voter
1624+
rpc:call(Server0, rabbit_quorum_queue, grow, [Server1, <<"/">>, <<".*">>, all, non_voter]),
1625+
assert_grown_queues(QQs, Server0, TargetClusterSize_2, MsgCount),
1626+
1627+
%% grow queues to node 'Server2': fail, non_voters found
1628+
Result6 = rpc:call(Server0, rabbit_quorum_queue, grow, [Server2, <<"/">>, <<".*">>, all, voter]),
1629+
%% [{{resource,<<"/">>,queue,<<"grow_queue">>},{error, 2, {error, non_voters_found}},
1630+
%% {{resource,<<"/">>,queue,<<"grow_queue_alt">>},{error, 2, {error, non_voters_found}},...]
1631+
?assert(lists:all(
1632+
fun({_, Err}) -> Err =:= {error, TargetClusterSize_2, {error, non_voters_found}} end, Result6)),
1633+
assert_grown_queues(QQs, Server0, TargetClusterSize_2, MsgCount),
1634+
1635+
%% grow queues to target quorum cluster size '5': fail, non_voters found
1636+
Result7 = rpc:call(Server0, rabbit_quorum_queue, grow, [TargetClusterSize_5, <<"/">>, <<".*">>, all]),
1637+
?assert(lists:all(
1638+
fun({_, Err}) -> Err =:= {error, TargetClusterSize_2, {error, non_voters_found}} end, Result7)),
1639+
assert_grown_queues(QQs, Server0, TargetClusterSize_2, MsgCount).
16101640

16111641
assert_grown_queues(Qs, Node, TargetClusterSize, MsgCount) ->
16121642
[begin

deps/rabbitmq_cli/lib/rabbitmq/cli/queues/commands/grow_command.ex

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,11 @@ defmodule RabbitMQ.CLI.Queues.Commands.GrowCommand do
4444
{:validation_failure, "target quorum cluster size '#{n}' must be greater than 0."}
4545
end
4646

47+
def validate([n, _], %{membership: m})
48+
when (is_integer(n) and not (m == "voter" or m == "promotable")) do
49+
{:validation_failure, "voter status '#{m}' must be 'voter' or 'promotable' to grow to target quorum cluster size '#{n}'."}
50+
end
51+
4752
def validate(_, %{membership: m})
4853
when not (m == "promotable" or
4954
m == "non_voter" or

deps/rabbitmq_cli/test/queues/grow_command_test.exs

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -82,10 +82,14 @@ defmodule RabbitMQ.CLI.Queues.Commands.GrowCommandTest do
8282
{:validation_failure, "voter status 'banana' is not recognised."}
8383
end
8484

85-
test "validate: when target quorum cluster size greater than zero, returns a success" do
85+
test "validate: when target quorum cluster size greater than zero and membership is voter, returns a success" do
8686
assert @command.validate([7, "all"], %{membership: "voter", queue_pattern: "qq.*"}) == :ok
8787
end
8888

89+
test "validate: when target quorum cluster size greater than zero and membership is promotable, returns a success" do
90+
assert @command.validate([5, "all"], %{membership: "promotable", queue_pattern: "qq.*"}) == :ok
91+
end
92+
8993
test "validate: when target quorum cluster size is zero, returns failure" do
9094
assert @command.validate([0, "all"], %{membership: "voter", queue_pattern: "qq.*"}) ==
9195
{:validation_failure, "target quorum cluster size '0' must be greater than 0."}
@@ -96,6 +100,11 @@ defmodule RabbitMQ.CLI.Queues.Commands.GrowCommandTest do
96100
{:validation_failure, "target quorum cluster size '-1' must be greater than 0."}
97101
end
98102

103+
test "validate: when target quorum cluster size is provided and membership is not voter, returns failure" do
104+
assert @command.validate([5, "all"], %{membership: "non_voter", queue_pattern: "qq.*"}) ==
105+
{:validation_failure, "voter status 'non_voter' must be 'voter' or 'promotable' to grow to target quorum cluster size '5'."}
106+
end
107+
99108
@tag test_timeout: 3000
100109
test "run: targeting an unreachable node throws a badrpc when growing to a target node", context do
101110
assert match?(

0 commit comments

Comments
 (0)