Skip to content

Commit 662bbd5

Browse files
committed
Extend QQ grow command to support target quorum cluster size
1 parent f50a8d3 commit 662bbd5

File tree

2 files changed

+67
-19
lines changed

2 files changed

+67
-19
lines changed

deps/rabbit/src/rabbit_quorum_queue.erl

Lines changed: 50 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1528,25 +1528,14 @@ shrink_all(Node) ->
15281528
grow(Node, VhostSpec, QueueSpec, Strategy) ->
15291529
grow(Node, VhostSpec, QueueSpec, Strategy, promotable).
15301530

1531-
-spec grow(node(), binary(), binary(), all | even, membership()) ->
1531+
-spec grow(node() | integer(), binary(), binary(), all | even, membership()) ->
15321532
[{rabbit_amqqueue:name(),
15331533
{ok, pos_integer()} | {error, pos_integer(), term()}}].
1534-
grow(Node, VhostSpec, QueueSpec, Strategy, Membership) ->
1534+
grow(Node, VhostSpec, QueueSpec, Strategy, Membership) when is_atom(Node) ->
15351535
Running = rabbit_nodes:list_running(),
15361536
[begin
15371537
Size = length(get_nodes(Q)),
1538-
QName = amqqueue:get_name(Q),
1539-
rabbit_log:info("~ts: adding a new member (replica) on node ~w",
1540-
[rabbit_misc:rs(QName), Node]),
1541-
case add_member(Q, Node, Membership) of
1542-
ok ->
1543-
{QName, {ok, Size + 1}};
1544-
{error, Err} ->
1545-
rabbit_log:warning(
1546-
"~ts: failed to add member (replica) on node ~w, error: ~w",
1547-
[rabbit_misc:rs(QName), Node, Err]),
1548-
{QName, {error, Size, Err}}
1549-
end
1538+
maybe_grow(Q, Node, Membership, Size)
15501539
end
15511540
|| Q <- rabbit_amqqueue:list(),
15521541
amqqueue:get_type(Q) == ?MODULE,
@@ -1556,7 +1545,53 @@ grow(Node, VhostSpec, QueueSpec, Strategy, Membership) ->
15561545
lists:member(Node, Running),
15571546
matches_strategy(Strategy, get_nodes(Q)),
15581547
is_match(amqqueue:get_vhost(Q), VhostSpec) andalso
1559-
is_match(get_resource_name(amqqueue:get_name(Q)), QueueSpec) ].
1548+
is_match(get_resource_name(amqqueue:get_name(Q)), QueueSpec) ];
1549+
1550+
grow(QuorumClusterSize, VhostSpec, QueueSpec, Strategy, Membership)
1551+
when is_integer(QuorumClusterSize) ->
1552+
Running = rabbit_nodes:list_running(),
1553+
TotalRunning = length(Running),
1554+
1555+
TargetQuorumClusterSize =
1556+
if QuorumClusterSize > TotalRunning ->
1557+
%% we cant grow beyond total running nodes
1558+
TotalRunning;
1559+
true ->
1560+
QuorumClusterSize
1561+
end,
1562+
1563+
lists:flatten(
1564+
[begin
1565+
QNodes = get_nodes(Q),
1566+
case length(QNodes) of
1567+
Size when Size < TargetQuorumClusterSize ->
1568+
TargetAvailableNodes = Running -- QNodes,
1569+
Node = hd(TargetAvailableNodes),
1570+
maybe_grow(Q, Node, Membership, Size);
1571+
_ ->
1572+
[]
1573+
end
1574+
end
1575+
|| _ <- lists:seq(1, TargetQuorumClusterSize),
1576+
Q <- rabbit_amqqueue:list(),
1577+
amqqueue:get_type(Q) == ?MODULE,
1578+
matches_strategy(Strategy, get_nodes(Q)),
1579+
is_match(amqqueue:get_vhost(Q), VhostSpec) andalso
1580+
is_match(get_resource_name(amqqueue:get_name(Q)), QueueSpec)]).
1581+
1582+
maybe_grow(Q, Node, Membership, Size) ->
1583+
QName = amqqueue:get_name(Q),
1584+
rabbit_log:info("~ts: adding a new member (replica) on node ~w",
1585+
[rabbit_misc:rs(QName), Node]),
1586+
case add_member(Q, Node, Membership) of
1587+
ok ->
1588+
{QName, {ok, Size + 1}};
1589+
{error, Err} ->
1590+
rabbit_log:warning(
1591+
"~ts: failed to add member (replica) on node ~w, error: ~w",
1592+
[rabbit_misc:rs(QName), Node, Err]),
1593+
{QName, {error, Size, Err}}
1594+
end.
15601595

15611596
-spec transfer_leadership(amqqueue:amqqueue(), node()) -> {migrated, node()} | {not_migrated, atom()}.
15621597
transfer_leadership(Q, Destination) ->

deps/rabbitmq_cli/lib/rabbitmq/cli/queues/commands/grow_command.ex

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,11 @@ defmodule RabbitMQ.CLI.Queues.Commands.GrowCommand do
3939
{:validation_failure, "strategy '#{s}' is not recognised."}
4040
end
4141

42+
def validate([n, _], _)
43+
when (is_integer(n) and n <= 0) do
44+
{:validation_failure, "target quorum cluster size '#{n}' must be greater than 0."}
45+
end
46+
4247
def validate(_, %{membership: m})
4348
when not (m == "promotable" or
4449
m == "non_voter" or
@@ -60,14 +65,22 @@ defmodule RabbitMQ.CLI.Queues.Commands.GrowCommand do
6065
)
6166
end
6267

63-
def run([node, strategy], %{
68+
def run([node_or_quorum_cluster_size, strategy], %{
6469
node: node_name,
6570
vhost_pattern: vhost_pat,
6671
queue_pattern: queue_pat,
6772
membership: membership,
6873
errors_only: errors_only
6974
}) do
70-
args = [to_atom(node), vhost_pat, queue_pat, to_atom(strategy)]
75+
76+
node_or_quorum_cluster_size =
77+
if is_integer(node_or_quorum_cluster_size) do
78+
node_or_quorum_cluster_size
79+
else
80+
to_atom(node_or_quorum_cluster_size)
81+
end
82+
83+
args = [node_or_quorum_cluster_size, vhost_pat, queue_pat, to_atom(strategy)]
7184

7285
args =
7386
case to_atom(membership) do
@@ -108,11 +121,11 @@ defmodule RabbitMQ.CLI.Queues.Commands.GrowCommand do
108121

109122
def usage,
110123
do:
111-
"grow <node> <all | even> [--vhost-pattern <pattern>] [--queue-pattern <pattern>] [--membership <promotable|voter>]"
124+
"grow <node | quorum_cluster_size> <all | even> [--vhost-pattern <pattern>] [--queue-pattern <pattern>] [--membership <promotable|voter>]"
112125

113126
def usage_additional do
114127
[
115-
["<node>", "node name to place replicas on"],
128+
["<node | quorum_cluster_size>", "node name to place replicas on or desired quorum cluster size"],
116129
[
117130
"<all | even>",
118131
"add a member for all matching queues or just those whose membership count is an even number"

0 commit comments

Comments
 (0)