Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions deps/rabbit/src/rabbit_maintenance.erl
Original file line number Diff line number Diff line change
Expand Up @@ -76,12 +76,15 @@ drain() ->
}),

TransferCandidates = primary_replica_transfer_candidate_nodes(),

%% Transfer metadata store before queues as each queue needs to perform
%% a metadata update after an election
transfer_leadership_of_metadata_store(TransferCandidates),

%% Note: only QQ leadership is transferred because it is a reasonably quick thing to do a lot of queues
%% in the cluster, unlike with CMQs.
rabbit_queue_type:drain(TransferCandidates),

transfer_leadership_of_metadata_store(TransferCandidates),

%% allow plugins to react
rabbit_event:notify(maintenance_draining, #{
reason => <<"node is being put into maintenance">>
Expand Down
99 changes: 68 additions & 31 deletions deps/rabbit/src/rabbit_quorum_queue.erl
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,7 @@
-define(MIN_CHECKPOINT_INTERVAL, 64).
-define(LEADER_HEALTH_CHECK_TIMEOUT, 5_000).
-define(GLOBAL_LEADER_HEALTH_CHECK_TIMEOUT, 60_000).
-define(RA_MEMBERS_TIMEOUT, 30_000).

%%----------- QQ policies ---------------------------------------------------

Expand Down Expand Up @@ -1229,7 +1230,6 @@ policy_changed(Q) ->
end.

-spec cluster_state(Name :: atom()) -> 'down' | 'recovering' | 'running'.

cluster_state(Name) ->
case whereis(Name) of
undefined -> down;
Expand Down Expand Up @@ -1577,17 +1577,18 @@ grow(Node, VhostSpec, QueueSpec, Strategy, Membership) ->
is_match(amqqueue:get_vhost(Q), VhostSpec) andalso
is_match(get_resource_name(amqqueue:get_name(Q)), QueueSpec) ].

-spec transfer_leadership(amqqueue:amqqueue(), node()) -> {migrated, node()} | {not_migrated, atom()}.
-spec transfer_leadership(amqqueue:amqqueue(), node()) ->
{migrated, node()} | {not_migrated, atom()}.
transfer_leadership(Q, Destination) ->
{RaName, _} = Pid = amqqueue:get_pid(Q),
case ra:transfer_leadership(Pid, {RaName, Destination}) of
{RaName, _} = Leader = amqqueue:get_pid(Q),
case ra:transfer_leadership(Leader, {RaName, Destination}) of
ok ->
case ra:members(Pid) of
{_, _, {_, NewNode}} ->
{migrated, NewNode};
{timeout, _} ->
{not_migrated, ra_members_timeout}
end;
case ra:members(Leader, ?RA_MEMBERS_TIMEOUT) of
{_, _, {_, NewNode}} ->
{migrated, NewNode};
{timeout, _} ->
{not_migrated, ra_members_timeout}
end;
already_leader ->
{not_migrated, already_leader};
{error, Reason} ->
Expand Down Expand Up @@ -1750,9 +1751,17 @@ i(memory, Q) when ?is_amqqueue(Q) ->
0
end;
i(state, Q) when ?is_amqqueue(Q) ->
{Name, Node} = amqqueue:get_pid(Q),
{Name, Node} = case find_leader(Q) of
undefined ->
%% fall back to queue record
amqqueue:get_pid(Q);
Leader ->
Leader
end,
%% Check against the leader or last known leader
case erpc_call(Node, ?MODULE, cluster_state, [Name], ?RPC_TIMEOUT) of
{error, {erpc, timeout}} ->
timeout;
{error, _} ->
down;
State ->
Expand Down Expand Up @@ -1912,7 +1921,12 @@ format(Q, Ctx) when ?is_amqqueue(Q) ->
rabbit_nodes:list_running()
end,
Online = [N || N <- Nodes, lists:member(N, Running)],
{_, LeaderNode} = amqqueue:get_pid(Q),
{_, LeaderNode} = case find_leader(Q) of
undefined ->
amqqueue:get_pid(Q);
Leader ->
Leader
end,
State = case is_minority(Nodes, Online) of
true when length(Online) == 0 ->
down;
Expand Down Expand Up @@ -2299,27 +2313,50 @@ drain(TransferCandidates) ->
transfer_leadership([]) ->
?LOG_WARNING("Skipping leadership transfer of quorum queues: no candidate "
"(online, not under maintenance) nodes to transfer to!");
transfer_leadership(_TransferCandidates) ->
transfer_leadership(_CandidateNodes) ->
%% we only transfer leadership for QQs that have local leaders
Queues = rabbit_amqqueue:list_local_leaders(),
LocalLeaderQueues = rabbit_amqqueue:list_local_leaders(),
QueuesChunked = ra_lib:lists_chunk(256, LocalLeaderQueues),
?LOG_INFO("Will transfer leadership of ~b quorum queues with current leader on this node",
[length(Queues)]),
_ = [begin
Name = amqqueue:get_name(Q),
?LOG_DEBUG("Will trigger a leader election for local quorum queue ~ts",
[rabbit_misc:rs(Name)]),
%% we trigger an election and exclude this node from the list of candidates
%% by simply shutting its local QQ replica (Ra server)
RaLeader = amqqueue:get_pid(Q),
?LOG_DEBUG("Will stop Ra server ~tp", [RaLeader]),
case rabbit_quorum_queue:stop_server(RaLeader) of
ok ->
?LOG_DEBUG("Successfully stopped Ra server ~tp", [RaLeader]);
{error, nodedown} ->
?LOG_ERROR("Failed to stop Ra server ~tp: target node was reported as down")
end
end || Q <- Queues],
?LOG_INFO("Leadership transfer for quorum queues hosted on this node has been initiated").
[length(LocalLeaderQueues)]),
[begin
[begin
%% we trigger an election and exclude this node from the list of candidates
%% by simply shutting its local QQ replica (Ra server)
RaLeader = amqqueue:get_pid(Q),
?LOG_DEBUG("Will stop Ra leader ~tp", [RaLeader]),
case rabbit_quorum_queue:stop_server(RaLeader) of
ok ->
?LOG_DEBUG("Successfully stopped Ra server ~tp", [RaLeader]);
{error, nodedown} ->
?LOG_ERROR("Failed to stop Ra server ~tp: target node was reported as down")
end,
ok
end || Q <- Queues],
%% wait for leader elections before processing next chunk of queues
[begin
{RaName, LeaderNode} = amqqueue:get_pid(Q),
MemberNodes = lists:delete(LeaderNode, amqqueue:get_nodes(Q)),
%% we don't do any explicit error handling here as it is more
%% important to make progress
_ = lists:any(fun (N) ->
case ra:members({RaName, N}, ?RA_MEMBERS_TIMEOUT) of
{ok, _, _} ->
true;
Err ->
Name = amqqueue:get_name(Q),
?LOG_DEBUG("Failed to wait for leader election for queue ~ts on ~tp Err ~ts",
[Name, N, Err]),
false
end
end, MemberNodes),
ok

end || Q <- Queues],
ok
end || Queues <- QueuesChunked],
?LOG_INFO("Leadership transfer for quorum queues hosted on this node has been initiated"),
ok.

%% TODO: I just copied it over, it looks like was always called inside maintenance so...
-spec stop_local_quorum_queue_followers() -> ok.
Expand Down
4 changes: 4 additions & 0 deletions deps/rabbitmq_management/priv/www/js/formatters.js
Original file line number Diff line number Diff line change
Expand Up @@ -601,6 +601,10 @@ function fmt_object_state(obj) {
explanation = 'The queue does not have sufficient online members to ' +
'make progress'
}
else if (obj.state == 'timeout') {
colour = 'yellow';
explanation = 'The queue leader did not respond to its status request ';
}

return fmt_state(colour, text, explanation);
}
Expand Down
Loading