diff --git a/deps/rabbit/src/rabbit_maintenance.erl b/deps/rabbit/src/rabbit_maintenance.erl index f6ee1f340287..023900a93c44 100644 --- a/deps/rabbit/src/rabbit_maintenance.erl +++ b/deps/rabbit/src/rabbit_maintenance.erl @@ -76,6 +76,11 @@ drain() -> }), TransferCandidates = primary_replica_transfer_candidate_nodes(), + + %% Transfer metadata store before queues as each queue needs to perform + %% a metadata update after an election + transfer_leadership_of_metadata_store(TransferCandidates), + %% Note: only QQ leadership is transferred because it is a reasonably quick thing to do a lot of queues %% in the cluster, unlike with CMQs. transfer_leadership_of_quorum_queues(TransferCandidates), @@ -86,8 +91,6 @@ drain() -> _Pid -> transfer_leadership_of_stream_coordinator(TransferCandidates) end, - transfer_leadership_of_metadata_store(TransferCandidates), - %% allow plugins to react rabbit_event:notify(maintenance_draining, #{ reason => <<"node is being put into maintenance">> diff --git a/deps/rabbit/src/rabbit_quorum_queue.erl b/deps/rabbit/src/rabbit_quorum_queue.erl index c2758af70784..e3ebd78038f3 100644 --- a/deps/rabbit/src/rabbit_quorum_queue.erl +++ b/deps/rabbit/src/rabbit_quorum_queue.erl @@ -152,6 +152,7 @@ -define(MIN_CHECKPOINT_INTERVAL, 64). -define(LEADER_HEALTH_CHECK_TIMEOUT, 5_000). -define(GLOBAL_LEADER_HEALTH_CHECK_TIMEOUT, 60_000). +-define(RA_MEMBERS_TIMEOUT, 30_000). %%----------- QQ policies --------------------------------------------------- @@ -1218,7 +1219,6 @@ policy_changed(Q) -> end. -spec cluster_state(Name :: atom()) -> 'down' | 'recovering' | 'running'. - cluster_state(Name) -> case whereis(Name) of undefined -> down; @@ -1566,17 +1566,18 @@ grow(Node, VhostSpec, QueueSpec, Strategy, Membership) -> is_match(amqqueue:get_vhost(Q), VhostSpec) andalso is_match(get_resource_name(amqqueue:get_name(Q)), QueueSpec) ]. --spec transfer_leadership(amqqueue:amqqueue(), node()) -> {migrated, node()} | {not_migrated, atom()}. +-spec transfer_leadership(amqqueue:amqqueue(), node()) -> + {migrated, node()} | {not_migrated, atom()}. transfer_leadership(Q, Destination) -> - {RaName, _} = Pid = amqqueue:get_pid(Q), - case ra:transfer_leadership(Pid, {RaName, Destination}) of + {RaName, _} = Leader = amqqueue:get_pid(Q), + case ra:transfer_leadership(Leader, {RaName, Destination}) of ok -> - case ra:members(Pid) of - {_, _, {_, NewNode}} -> - {migrated, NewNode}; - {timeout, _} -> - {not_migrated, ra_members_timeout} - end; + case ra:members(Leader, ?RA_MEMBERS_TIMEOUT) of + {_, _, {_, NewNode}} -> + {migrated, NewNode}; + {timeout, _} -> + {not_migrated, ra_members_timeout} + end; already_leader -> {not_migrated, already_leader}; {error, Reason} -> @@ -1741,9 +1742,17 @@ i(memory, Q) when ?is_amqqueue(Q) -> 0 end; i(state, Q) when ?is_amqqueue(Q) -> - {Name, Node} = amqqueue:get_pid(Q), + {Name, Node} = case find_leader(Q) of + undefined -> + %% fall back to queue record + amqqueue:get_pid(Q); + Leader -> + Leader + end, %% Check against the leader or last known leader case erpc_call(Node, ?MODULE, cluster_state, [Name], ?RPC_TIMEOUT) of + {error, {erpc, timeout}} -> + timeout; {error, _} -> down; State -> @@ -1903,7 +1912,12 @@ format(Q, Ctx) when ?is_amqqueue(Q) -> rabbit_nodes:list_running() end, Online = [N || N <- Nodes, lists:member(N, Running)], - {_, LeaderNode} = amqqueue:get_pid(Q), + {_, LeaderNode} = case find_leader(Q) of + undefined -> + amqqueue:get_pid(Q); + Leader -> + Leader + end, State = case is_minority(Nodes, Online) of true when length(Online) == 0 -> down; @@ -2274,3 +2288,115 @@ maybe_log_leader_health_check_result([]) -> ok; maybe_log_leader_health_check_result(Result) -> Qs = lists:map(fun(R) -> catch maps:get(<<"readable_name">>, R) end, Result), rabbit_log:warning("Leader health check result (unhealthy leaders detected): ~tp", [Qs]). + +policy_apply_to_name() -> + <<"quorum_queues">>. + +-spec drain([node()]) -> ok. +drain(TransferCandidates) -> + _ = transfer_leadership(TransferCandidates), + _ = stop_local_quorum_queue_followers(), + ok. + +transfer_leadership([]) -> + rabbit_log:warning("Skipping leadership transfer of quorum queues: no candidate " + "(online, not under maintenance) nodes to transfer to!"); +transfer_leadership(_CandidateNodes) -> + %% we only transfer leadership for QQs that have local leaders + LocalLeaderQueues = rabbit_amqqueue:list_local_leaders(), + QueuesChunked = ra_lib:lists_chunk(256, LocalLeaderQueues), + rabbit_log:info("Will transfer leadership of ~b quorum queues with current leader on this node", + [length(LocalLeaderQueues)]), + [begin + [begin + %% we trigger an election and exclude this node from the list of candidates + %% by simply shutting its local QQ replica (Ra server) + RaLeader = amqqueue:get_pid(Q), + rabbit_log:debug("Will stop Ra leader ~tp", [RaLeader]), + case rabbit_quorum_queue:stop_server(RaLeader) of + ok -> + rabbit_log:debug("Successfully stopped Ra server ~tp", [RaLeader]); + {error, nodedown} -> + rabbit_log:error("Failed to stop Ra server ~tp: target node was reported as down") + end, + ok + end || Q <- Queues], + %% wait for leader elections before processing next chunk of queues + [begin + {RaName, LeaderNode} = amqqueue:get_pid(Q), + MemberNodes = lists:delete(LeaderNode, amqqueue:get_nodes(Q)), + %% we don't do any explicit error handling here as it is more + %% important to make progress + _ = lists:any(fun (N) -> + case ra:members({RaName, N}, ?RA_MEMBERS_TIMEOUT) of + {ok, _, _} -> + true; + Err -> + Name = amqqueue:get_name(Q), + rabbit_log:debug("Failed to wait for leader election for queue ~ts on ~tp Err ~ts", + [Name, N, Err]), + false + end + end, MemberNodes), + ok + + end || Q <- Queues], + ok + end || Queues <- QueuesChunked], + rabbit_log:info("Leadership transfer for quorum queues hosted on this node has been initiated"), + ok. + +%% TODO: I just copied it over, it looks like was always called inside maintenance so... +-spec stop_local_quorum_queue_followers() -> ok. +stop_local_quorum_queue_followers() -> + Queues = rabbit_amqqueue:list_local_followers(), + rabbit_log:info("Will stop local follower replicas of ~b quorum queues on this node", + [length(Queues)]), + _ = [begin + Name = amqqueue:get_name(Q), + rabbit_log:debug("Will stop a local follower replica of quorum queue ~ts", + [rabbit_misc:rs(Name)]), + %% shut down Ra nodes so that they are not considered for leader election + {RegisteredName, _LeaderNode} = amqqueue:get_pid(Q), + RaNode = {RegisteredName, node()}, + rabbit_log:debug("Will stop Ra server ~tp", [RaNode]), + case rabbit_quorum_queue:stop_server(RaNode) of + ok -> + rabbit_log:debug("Successfully stopped Ra server ~tp", [RaNode]); + {error, nodedown} -> + rabbit_log:error("Failed to stop Ra server ~tp: target node was reported as down") + end + end || Q <- Queues], + rabbit_log:info("Stopped all local replicas of quorum queues hosted on this node"). + +revive() -> + revive_local_queue_members(). + +revive_local_queue_members() -> + Queues = rabbit_amqqueue:list_local_followers(), + %% NB: this function ignores the first argument so we can just pass the + %% empty binary as the vhost name. + {Recovered, Failed} = rabbit_quorum_queue:recover(<<>>, Queues), + ?LOG_DEBUG("Successfully revived ~b quorum queue replicas", + [length(Recovered)]), + case length(Failed) of + 0 -> + ok; + NumFailed -> + ?LOG_ERROR("Failed to revive ~b quorum queue replicas", + [NumFailed]) + end, + + ?LOG_INFO("Restart of local quorum queue replicas is complete"), + ok. + +queue_vm_stats_sups() -> + {[quorum_queue_procs, + quorum_queue_dlx_procs], + [[ra_server_sup_sup], + [rabbit_fifo_dlx_sup]]}. + +queue_vm_ets() -> + {[quorum_ets], + [[ra_log_ets]]}. + diff --git a/deps/rabbitmq_management/priv/www/js/formatters.js b/deps/rabbitmq_management/priv/www/js/formatters.js index 556253697043..03fb0fe91527 100644 --- a/deps/rabbitmq_management/priv/www/js/formatters.js +++ b/deps/rabbitmq_management/priv/www/js/formatters.js @@ -601,6 +601,10 @@ function fmt_object_state(obj) { explanation = 'The queue does not have sufficient online members to ' + 'make progress' } + else if (obj.state == 'timeout') { + colour = 'yellow'; + explanation = 'The queue leader did not respond to its status request '; + } return fmt_state(colour, text, explanation); }