Skip to content

Commit c0c80fc

Browse files
committed
Extend QQ grow command to support target quorum cluster size
1 parent 3ea83ac commit c0c80fc

File tree

2 files changed

+70
-20
lines changed

2 files changed

+70
-20
lines changed

deps/rabbit/src/rabbit_quorum_queue.erl

Lines changed: 53 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1543,29 +1543,20 @@ shrink_all(Node) ->
15431543
amqqueue:get_type(Q) == ?MODULE,
15441544
lists:member(Node, get_nodes(Q))].
15451545

1546-
1546+
-spec grow(node() | integer(), binary(), binary(), all | even) ->
1547+
[{rabbit_amqqueue:name(),
1548+
{ok, pos_integer()} | {error, pos_integer(), term()}}].
15471549
grow(Node, VhostSpec, QueueSpec, Strategy) ->
15481550
grow(Node, VhostSpec, QueueSpec, Strategy, promotable).
15491551

1550-
-spec grow(node(), binary(), binary(), all | even, membership()) ->
1552+
-spec grow(node() | integer(), binary(), binary(), all | even, membership()) ->
15511553
[{rabbit_amqqueue:name(),
15521554
{ok, pos_integer()} | {error, pos_integer(), term()}}].
1553-
grow(Node, VhostSpec, QueueSpec, Strategy, Membership) ->
1555+
grow(Node, VhostSpec, QueueSpec, Strategy, Membership) when is_atom(Node) ->
15541556
Running = rabbit_nodes:list_running(),
15551557
[begin
15561558
Size = length(get_nodes(Q)),
1557-
QName = amqqueue:get_name(Q),
1558-
?LOG_INFO("~ts: adding a new member (replica) on node ~w",
1559-
[rabbit_misc:rs(QName), Node]),
1560-
case add_member(Q, Node, Membership) of
1561-
ok ->
1562-
{QName, {ok, Size + 1}};
1563-
{error, Err} ->
1564-
?LOG_WARNING(
1565-
"~ts: failed to add member (replica) on node ~w, error: ~w",
1566-
[rabbit_misc:rs(QName), Node, Err]),
1567-
{QName, {error, Size, Err}}
1568-
end
1559+
maybe_grow(Q, Node, Membership, Size)
15691560
end
15701561
|| Q <- rabbit_amqqueue:list(),
15711562
amqqueue:get_type(Q) == ?MODULE,
@@ -1575,7 +1566,53 @@ grow(Node, VhostSpec, QueueSpec, Strategy, Membership) ->
15751566
lists:member(Node, Running),
15761567
matches_strategy(Strategy, get_nodes(Q)),
15771568
is_match(amqqueue:get_vhost(Q), VhostSpec) andalso
1578-
is_match(get_resource_name(amqqueue:get_name(Q)), QueueSpec) ].
1569+
is_match(get_resource_name(amqqueue:get_name(Q)), QueueSpec) ];
1570+
1571+
grow(QuorumClusterSize, VhostSpec, QueueSpec, Strategy, Membership)
1572+
when is_integer(QuorumClusterSize) ->
1573+
Running = rabbit_nodes:list_running(),
1574+
TotalRunning = length(Running),
1575+
1576+
TargetQuorumClusterSize =
1577+
if QuorumClusterSize > TotalRunning ->
1578+
%% we cant grow beyond total running nodes
1579+
TotalRunning;
1580+
true ->
1581+
QuorumClusterSize
1582+
end,
1583+
1584+
lists:flatten(
1585+
[begin
1586+
QNodes = get_nodes(Q),
1587+
case length(QNodes) of
1588+
Size when Size < TargetQuorumClusterSize ->
1589+
TargetAvailableNodes = Running -- QNodes,
1590+
Node = hd(TargetAvailableNodes),
1591+
maybe_grow(Q, Node, Membership, Size);
1592+
_ ->
1593+
[]
1594+
end
1595+
end
1596+
|| _ <- lists:seq(1, TargetQuorumClusterSize),
1597+
Q <- rabbit_amqqueue:list(),
1598+
amqqueue:get_type(Q) == ?MODULE,
1599+
matches_strategy(Strategy, get_nodes(Q)),
1600+
is_match(amqqueue:get_vhost(Q), VhostSpec) andalso
1601+
is_match(get_resource_name(amqqueue:get_name(Q)), QueueSpec)]).
1602+
1603+
maybe_grow(Q, Node, Membership, Size) ->
1604+
QName = amqqueue:get_name(Q),
1605+
?LOG_INFO("~ts: adding a new member (replica) on node ~w",
1606+
[rabbit_misc:rs(QName), Node]),
1607+
case add_member(Q, Node, Membership) of
1608+
ok ->
1609+
{QName, {ok, Size + 1}};
1610+
{error, Err} ->
1611+
?LOG_WARNING(
1612+
"~ts: failed to add member (replica) on node ~w, error: ~w",
1613+
[rabbit_misc:rs(QName), Node, Err]),
1614+
{QName, {error, Size, Err}}
1615+
end.
15791616

15801617
-spec transfer_leadership(amqqueue:amqqueue(), node()) -> {migrated, node()} | {not_migrated, atom()}.
15811618
transfer_leadership(Q, Destination) ->

deps/rabbitmq_cli/lib/rabbitmq/cli/queues/commands/grow_command.ex

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,11 @@ defmodule RabbitMQ.CLI.Queues.Commands.GrowCommand do
3939
{:validation_failure, "strategy '#{s}' is not recognised."}
4040
end
4141

42+
def validate([n, _], _)
43+
when (is_integer(n) and n <= 0) do
44+
{:validation_failure, "target quorum cluster size '#{n}' must be greater than 0."}
45+
end
46+
4247
def validate(_, %{membership: m})
4348
when not (m == "promotable" or
4449
m == "non_voter" or
@@ -60,14 +65,22 @@ defmodule RabbitMQ.CLI.Queues.Commands.GrowCommand do
6065
)
6166
end
6267

63-
def run([node, strategy], %{
68+
def run([node_or_quorum_cluster_size, strategy], %{
6469
node: node_name,
6570
vhost_pattern: vhost_pat,
6671
queue_pattern: queue_pat,
6772
membership: membership,
6873
errors_only: errors_only
6974
}) do
70-
args = [to_atom(node), vhost_pat, queue_pat, to_atom(strategy)]
75+
76+
node_or_quorum_cluster_size =
77+
if is_integer(node_or_quorum_cluster_size) do
78+
node_or_quorum_cluster_size
79+
else
80+
to_atom(node_or_quorum_cluster_size)
81+
end
82+
83+
args = [node_or_quorum_cluster_size, vhost_pat, queue_pat, to_atom(strategy)]
7184

7285
args =
7386
case to_atom(membership) do
@@ -108,11 +121,11 @@ defmodule RabbitMQ.CLI.Queues.Commands.GrowCommand do
108121

109122
def usage,
110123
do:
111-
"grow <node> <all | even> [--vhost-pattern <pattern>] [--queue-pattern <pattern>] [--membership <promotable|voter>]"
124+
"grow <node | quorum_cluster_size> <all | even> [--vhost-pattern <pattern>] [--queue-pattern <pattern>] [--membership <promotable|voter>]"
112125

113126
def usage_additional do
114127
[
115-
["<node>", "node name to place replicas on"],
128+
["<node | quorum_cluster_size>", "node name to place replicas on or desired quorum cluster size"],
116129
[
117130
"<all | even>",
118131
"add a member for all matching queues or just those whose membership count is an even number"

0 commit comments

Comments
 (0)