Skip to content

Commit 87154f9

Browse files
committed
QQ: when invoking drain only shut down small batches at a time
Then wait for elections to complete before shutting further members down. This should help avoid election storms when enabling maintenance mode. Transfer khepri before queues to ensure meta data store is ready to accept pid updates. Some other state related tweaks.
1 parent 268a16c commit 87154f9

File tree

3 files changed

+77
-33
lines changed

3 files changed

+77
-33
lines changed

deps/rabbit/src/rabbit_maintenance.erl

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -76,12 +76,15 @@ drain() ->
7676
}),
7777

7878
TransferCandidates = primary_replica_transfer_candidate_nodes(),
79+
80+
%% Transfer metadata store before queues as each queue needs to perform
81+
%% a metadata update after an election
82+
transfer_leadership_of_metadata_store(TransferCandidates),
83+
7984
%% Note: only QQ leadership is transferred because it is a reasonably quick thing to do a lot of queues
8085
%% in the cluster, unlike with CMQs.
8186
rabbit_queue_type:drain(TransferCandidates),
8287

83-
transfer_leadership_of_metadata_store(TransferCandidates),
84-
8588
%% allow plugins to react
8689
rabbit_event:notify(maintenance_draining, #{
8790
reason => <<"node is being put into maintenance">>

deps/rabbit/src/rabbit_quorum_queue.erl

Lines changed: 68 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -169,6 +169,7 @@
169169
-define(MIN_CHECKPOINT_INTERVAL, 64).
170170
-define(LEADER_HEALTH_CHECK_TIMEOUT, 5_000).
171171
-define(GLOBAL_LEADER_HEALTH_CHECK_TIMEOUT, 60_000).
172+
-define(RA_MEMBERS_TIMEOUT, 30_000).
172173

173174
%%----------- QQ policies ---------------------------------------------------
174175

@@ -1229,7 +1230,6 @@ policy_changed(Q) ->
12291230
end.
12301231

12311232
-spec cluster_state(Name :: atom()) -> 'down' | 'recovering' | 'running'.
1232-
12331233
cluster_state(Name) ->
12341234
case whereis(Name) of
12351235
undefined -> down;
@@ -1577,17 +1577,18 @@ grow(Node, VhostSpec, QueueSpec, Strategy, Membership) ->
15771577
is_match(amqqueue:get_vhost(Q), VhostSpec) andalso
15781578
is_match(get_resource_name(amqqueue:get_name(Q)), QueueSpec) ].
15791579

1580-
-spec transfer_leadership(amqqueue:amqqueue(), node()) -> {migrated, node()} | {not_migrated, atom()}.
1580+
-spec transfer_leadership(amqqueue:amqqueue(), node()) ->
1581+
{migrated, node()} | {not_migrated, atom()}.
15811582
transfer_leadership(Q, Destination) ->
1582-
{RaName, _} = Pid = amqqueue:get_pid(Q),
1583-
case ra:transfer_leadership(Pid, {RaName, Destination}) of
1583+
{RaName, _} = Leader = amqqueue:get_pid(Q),
1584+
case ra:transfer_leadership(Leader, {RaName, Destination}) of
15841585
ok ->
1585-
case ra:members(Pid) of
1586-
{_, _, {_, NewNode}} ->
1587-
{migrated, NewNode};
1588-
{timeout, _} ->
1589-
{not_migrated, ra_members_timeout}
1590-
end;
1586+
case ra:members(Leader, ?RA_MEMBERS_TIMEOUT) of
1587+
{_, _, {_, NewNode}} ->
1588+
{migrated, NewNode};
1589+
{timeout, _} ->
1590+
{not_migrated, ra_members_timeout}
1591+
end;
15911592
already_leader ->
15921593
{not_migrated, already_leader};
15931594
{error, Reason} ->
@@ -1750,9 +1751,17 @@ i(memory, Q) when ?is_amqqueue(Q) ->
17501751
0
17511752
end;
17521753
i(state, Q) when ?is_amqqueue(Q) ->
1753-
{Name, Node} = amqqueue:get_pid(Q),
1754+
{Name, Node} = case find_leader(Q) of
1755+
undefined ->
1756+
%% fall back to queue record
1757+
amqqueue:get_pid(Q);
1758+
Leader ->
1759+
Leader
1760+
end,
17541761
%% Check against the leader or last known leader
17551762
case erpc_call(Node, ?MODULE, cluster_state, [Name], ?RPC_TIMEOUT) of
1763+
{error, {erpc, timeout}} ->
1764+
timeout;
17561765
{error, _} ->
17571766
down;
17581767
State ->
@@ -1912,7 +1921,12 @@ format(Q, Ctx) when ?is_amqqueue(Q) ->
19121921
rabbit_nodes:list_running()
19131922
end,
19141923
Online = [N || N <- Nodes, lists:member(N, Running)],
1915-
{_, LeaderNode} = amqqueue:get_pid(Q),
1924+
{_, LeaderNode} = case find_leader(Q) of
1925+
undefined ->
1926+
amqqueue:get_pid(Q);
1927+
Leader ->
1928+
Leader
1929+
end,
19161930
State = case is_minority(Nodes, Online) of
19171931
true when length(Online) == 0 ->
19181932
down;
@@ -2299,27 +2313,50 @@ drain(TransferCandidates) ->
22992313
transfer_leadership([]) ->
23002314
?LOG_WARNING("Skipping leadership transfer of quorum queues: no candidate "
23012315
"(online, not under maintenance) nodes to transfer to!");
2302-
transfer_leadership(_TransferCandidates) ->
2316+
transfer_leadership(_CandidateNodes) ->
23032317
%% we only transfer leadership for QQs that have local leaders
2304-
Queues = rabbit_amqqueue:list_local_leaders(),
2318+
LocalLeaderQueues = rabbit_amqqueue:list_local_leaders(),
2319+
QueuesChunked = ra_lib:lists_chunk(256, LocalLeaderQueues),
23052320
?LOG_INFO("Will transfer leadership of ~b quorum queues with current leader on this node",
2306-
[length(Queues)]),
2307-
_ = [begin
2308-
Name = amqqueue:get_name(Q),
2309-
?LOG_DEBUG("Will trigger a leader election for local quorum queue ~ts",
2310-
[rabbit_misc:rs(Name)]),
2311-
%% we trigger an election and exclude this node from the list of candidates
2312-
%% by simply shutting its local QQ replica (Ra server)
2313-
RaLeader = amqqueue:get_pid(Q),
2314-
?LOG_DEBUG("Will stop Ra server ~tp", [RaLeader]),
2315-
case rabbit_quorum_queue:stop_server(RaLeader) of
2316-
ok ->
2317-
?LOG_DEBUG("Successfully stopped Ra server ~tp", [RaLeader]);
2318-
{error, nodedown} ->
2319-
?LOG_ERROR("Failed to stop Ra server ~tp: target node was reported as down")
2320-
end
2321-
end || Q <- Queues],
2322-
?LOG_INFO("Leadership transfer for quorum queues hosted on this node has been initiated").
2321+
[length(LocalLeaderQueues)]),
2322+
[begin
2323+
[begin
2324+
%% we trigger an election and exclude this node from the list of candidates
2325+
%% by simply shutting its local QQ replica (Ra server)
2326+
RaLeader = amqqueue:get_pid(Q),
2327+
?LOG_DEBUG("Will stop Ra leader ~tp", [RaLeader]),
2328+
case rabbit_quorum_queue:stop_server(RaLeader) of
2329+
ok ->
2330+
?LOG_DEBUG("Successfully stopped Ra server ~tp", [RaLeader]);
2331+
{error, nodedown} ->
2332+
?LOG_ERROR("Failed to stop Ra server ~tp: target node was reported as down")
2333+
end,
2334+
ok
2335+
end || Q <- Queues],
2336+
%% wait for leader elections before processing next chunk of queues
2337+
[begin
2338+
{RaName, LeaderNode} = amqqueue:get_pid(Q),
2339+
MemberNodes = lists:delete(LeaderNode, amqqueue:get_nodes(Q)),
2340+
%% we don't do any explicit error handling here as it is more
2341+
%% important to make progress
2342+
_ = lists:any(fun (N) ->
2343+
case ra:members({RaName, N}, ?RA_MEMBERS_TIMEOUT) of
2344+
{ok, _, _} ->
2345+
true;
2346+
Err ->
2347+
Name = amqqueue:get_name(Q),
2348+
?LOG_DEBUG("Failed to wait for leader election for queue ~ts on ~tp Err ~ts",
2349+
[Name, N, Err]),
2350+
false
2351+
end
2352+
end, MemberNodes),
2353+
ok
2354+
2355+
end || Q <- Queues],
2356+
ok
2357+
end || Queues <- QueuesChunked],
2358+
?LOG_INFO("Leadership transfer for quorum queues hosted on this node has been initiated"),
2359+
ok.
23232360

23242361
%% TODO: I just copied it over, it looks like was always called inside maintenance so...
23252362
-spec stop_local_quorum_queue_followers() -> ok.

deps/rabbitmq_management/priv/www/js/formatters.js

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -601,6 +601,10 @@ function fmt_object_state(obj) {
601601
explanation = 'The queue does not have sufficient online members to ' +
602602
'make progress'
603603
}
604+
else if (obj.state == 'timeout') {
605+
colour = 'yellow';
606+
explanation = 'The queue did not respond to it\'s status request ';
607+
}
604608

605609
return fmt_state(colour, text, explanation);
606610
}

0 commit comments

Comments
 (0)