Skip to content

Commit 486f3ad

Browse files
Merge pull request #2268 from rabbitmq/rabbitmq-management-782
Allow only one rebalance operation to happen at a time (cherry picked from commit f05114e)
1 parent d3ca25b commit 486f3ad

File tree

2 files changed

+55
-6
lines changed

2 files changed

+55
-6
lines changed

src/rabbit_amqqueue.erl

Lines changed: 31 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -514,9 +514,29 @@ not_found_or_absent_dirty(Name) ->
514514
{ok, Q} -> {absent, Q, nodedown}
515515
end.
516516

517+
-spec get_rebalance_lock(pid()) ->
518+
{true, {rebalance_queues, pid()}} | false.
519+
get_rebalance_lock(Pid) when is_pid(Pid) ->
520+
Id = {rebalance_queues, Pid},
521+
Nodes = [node()|nodes()],
522+
%% Note that we're not re-trying. We want to immediately know
523+
%% if a re-balance is taking place and stop accordingly.
524+
case global:set_lock(Id, Nodes, 0) of
525+
true ->
526+
{true, Id};
527+
false ->
528+
false
529+
end.
530+
517531
-spec rebalance('all' | 'quorum' | 'classic', binary(), binary()) ->
518-
{ok, [{node(), pos_integer()}]}.
532+
{ok, [{node(), pos_integer()}]} | {error, term()}.
519533
rebalance(Type, VhostSpec, QueueSpec) ->
534+
%% We have not yet acquired the rebalance_queues global lock.
535+
maybe_rebalance(get_rebalance_lock(self()), Type, VhostSpec, QueueSpec).
536+
537+
maybe_rebalance({true, Id}, Type, VhostSpec, QueueSpec) ->
538+
rabbit_log:info("Starting queue rebalance operation: '~s' for vhosts matching '~s' and queues matching '~s'",
539+
[Type, VhostSpec, QueueSpec]),
520540
Running = rabbit_mnesia:cluster_nodes(running),
521541
NumRunning = length(Running),
522542
ToRebalance = [Q || Q <- rabbit_amqqueue:list(),
@@ -527,11 +547,17 @@ rebalance(Type, VhostSpec, QueueSpec) ->
527547
NumToRebalance = length(ToRebalance),
528548
ByNode = group_by_node(ToRebalance),
529549
Rem = case (NumToRebalance rem NumRunning) of
530-
0 -> 0;
531-
_ -> 1
532-
end,
550+
0 -> 0;
551+
_ -> 1
552+
end,
533553
MaxQueuesDesired = (NumToRebalance div NumRunning) + Rem,
534-
iterative_rebalance(ByNode, MaxQueuesDesired).
554+
Result = iterative_rebalance(ByNode, MaxQueuesDesired),
555+
global:del_lock(Id),
556+
rabbit_log:info("Finished queue rebalance operation"),
557+
Result;
558+
maybe_rebalance(false, _Type, _VhostSpec, _QueueSpec) ->
559+
rabbit_log:warning("Queue rebalance operation is in progress, please wait."),
560+
{error, rebalance_in_progress}.
535561

536562
filter_per_type(all, _) ->
537563
true;

test/dynamic_ha_SUITE.erl

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,8 @@ groups() ->
7474
queue_survive_adding_dead_vhost_mirror,
7575
rebalance_all,
7676
rebalance_exactly,
77-
rebalance_nodes
77+
rebalance_nodes,
78+
rebalance_multiple_blocked
7879
% FIXME: Re-enable those tests when the know issues are
7980
% fixed.
8081
% failing_random_policies,
@@ -666,6 +667,28 @@ rebalance_nodes(Config) ->
666667

667668
ok.
668669

670+
rebalance_multiple_blocked(Config) ->
671+
[A, _, _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
672+
ACh = rabbit_ct_client_helpers:open_channel(Config, A),
673+
Q1 = <<"q1">>,
674+
Q2 = <<"q2">>,
675+
Q3 = <<"q3">>,
676+
Q4 = <<"q4">>,
677+
Q5 = <<"q5">>,
678+
amqp_channel:call(ACh, #'queue.declare'{queue = Q1}),
679+
amqp_channel:call(ACh, #'queue.declare'{queue = Q2}),
680+
amqp_channel:call(ACh, #'queue.declare'{queue = Q3}),
681+
amqp_channel:call(ACh, #'queue.declare'{queue = Q4}),
682+
amqp_channel:call(ACh, #'queue.declare'{queue = Q5}),
683+
?assertEqual(A, node(proplists:get_value(pid, find_queue(Q1, A)))),
684+
?assertEqual(A, node(proplists:get_value(pid, find_queue(Q2, A)))),
685+
?assertEqual(A, node(proplists:get_value(pid, find_queue(Q3, A)))),
686+
?assertEqual(A, node(proplists:get_value(pid, find_queue(Q4, A)))),
687+
?assertEqual(A, node(proplists:get_value(pid, find_queue(Q5, A)))),
688+
true = rpc:cast(A, rabbit_amqqueue, rebalance, [classic, ".*", ".*"]),
689+
{error, rebalance_in_progress} = rpc:call(A, rabbit_amqqueue, rebalance, [classic, ".*", ".*"]),
690+
ok.
691+
669692
%%----------------------------------------------------------------------------
670693

671694
assert_slaves(RPCNode, QName, Exp) ->

0 commit comments

Comments
 (0)