From 87154f9b03c85270ca950e1ac8bb7f6ae90bf47a Mon Sep 17 00:00:00 2001 From: Karl Nilsson Date: Tue, 19 Aug 2025 08:14:57 +0100 Subject: [PATCH 1/2] QQ: when invoking drain only shut down small batches at a time Then wait for elections to complete before shutting further members down. This should help avoid election storms when enabling maintenance mode. Transfer khepri before queues to ensure meta data store is ready to accept pid updates. Some other state related tweaks. --- deps/rabbit/src/rabbit_maintenance.erl | 7 +- deps/rabbit/src/rabbit_quorum_queue.erl | 99 +++++++++++++------ .../priv/www/js/formatters.js | 4 + 3 files changed, 77 insertions(+), 33 deletions(-) diff --git a/deps/rabbit/src/rabbit_maintenance.erl b/deps/rabbit/src/rabbit_maintenance.erl index 172b115530c6..f2393d76c9e9 100644 --- a/deps/rabbit/src/rabbit_maintenance.erl +++ b/deps/rabbit/src/rabbit_maintenance.erl @@ -76,12 +76,15 @@ drain() -> }), TransferCandidates = primary_replica_transfer_candidate_nodes(), + + %% Transfer metadata store before queues as each queue needs to perform + %% a metadata update after an election + transfer_leadership_of_metadata_store(TransferCandidates), + %% Note: only QQ leadership is transferred because it is a reasonably quick thing to do a lot of queues %% in the cluster, unlike with CMQs. rabbit_queue_type:drain(TransferCandidates), - transfer_leadership_of_metadata_store(TransferCandidates), - %% allow plugins to react rabbit_event:notify(maintenance_draining, #{ reason => <<"node is being put into maintenance">> diff --git a/deps/rabbit/src/rabbit_quorum_queue.erl b/deps/rabbit/src/rabbit_quorum_queue.erl index d068d51bb57d..c7876f3c55ce 100644 --- a/deps/rabbit/src/rabbit_quorum_queue.erl +++ b/deps/rabbit/src/rabbit_quorum_queue.erl @@ -169,6 +169,7 @@ -define(MIN_CHECKPOINT_INTERVAL, 64). -define(LEADER_HEALTH_CHECK_TIMEOUT, 5_000). -define(GLOBAL_LEADER_HEALTH_CHECK_TIMEOUT, 60_000). +-define(RA_MEMBERS_TIMEOUT, 30_000). %%----------- QQ policies --------------------------------------------------- @@ -1229,7 +1230,6 @@ policy_changed(Q) -> end. -spec cluster_state(Name :: atom()) -> 'down' | 'recovering' | 'running'. - cluster_state(Name) -> case whereis(Name) of undefined -> down; @@ -1577,17 +1577,18 @@ grow(Node, VhostSpec, QueueSpec, Strategy, Membership) -> is_match(amqqueue:get_vhost(Q), VhostSpec) andalso is_match(get_resource_name(amqqueue:get_name(Q)), QueueSpec) ]. --spec transfer_leadership(amqqueue:amqqueue(), node()) -> {migrated, node()} | {not_migrated, atom()}. +-spec transfer_leadership(amqqueue:amqqueue(), node()) -> + {migrated, node()} | {not_migrated, atom()}. transfer_leadership(Q, Destination) -> - {RaName, _} = Pid = amqqueue:get_pid(Q), - case ra:transfer_leadership(Pid, {RaName, Destination}) of + {RaName, _} = Leader = amqqueue:get_pid(Q), + case ra:transfer_leadership(Leader, {RaName, Destination}) of ok -> - case ra:members(Pid) of - {_, _, {_, NewNode}} -> - {migrated, NewNode}; - {timeout, _} -> - {not_migrated, ra_members_timeout} - end; + case ra:members(Leader, ?RA_MEMBERS_TIMEOUT) of + {_, _, {_, NewNode}} -> + {migrated, NewNode}; + {timeout, _} -> + {not_migrated, ra_members_timeout} + end; already_leader -> {not_migrated, already_leader}; {error, Reason} -> @@ -1750,9 +1751,17 @@ i(memory, Q) when ?is_amqqueue(Q) -> 0 end; i(state, Q) when ?is_amqqueue(Q) -> - {Name, Node} = amqqueue:get_pid(Q), + {Name, Node} = case find_leader(Q) of + undefined -> + %% fall back to queue record + amqqueue:get_pid(Q); + Leader -> + Leader + end, %% Check against the leader or last known leader case erpc_call(Node, ?MODULE, cluster_state, [Name], ?RPC_TIMEOUT) of + {error, {erpc, timeout}} -> + timeout; {error, _} -> down; State -> @@ -1912,7 +1921,12 @@ format(Q, Ctx) when ?is_amqqueue(Q) -> rabbit_nodes:list_running() end, Online = [N || N <- Nodes, lists:member(N, Running)], - {_, LeaderNode} = amqqueue:get_pid(Q), + {_, LeaderNode} = case find_leader(Q) of + undefined -> + amqqueue:get_pid(Q); + Leader -> + Leader + end, State = case is_minority(Nodes, Online) of true when length(Online) == 0 -> down; @@ -2299,27 +2313,50 @@ drain(TransferCandidates) -> transfer_leadership([]) -> ?LOG_WARNING("Skipping leadership transfer of quorum queues: no candidate " "(online, not under maintenance) nodes to transfer to!"); -transfer_leadership(_TransferCandidates) -> +transfer_leadership(_CandidateNodes) -> %% we only transfer leadership for QQs that have local leaders - Queues = rabbit_amqqueue:list_local_leaders(), + LocalLeaderQueues = rabbit_amqqueue:list_local_leaders(), + QueuesChunked = ra_lib:lists_chunk(256, LocalLeaderQueues), ?LOG_INFO("Will transfer leadership of ~b quorum queues with current leader on this node", - [length(Queues)]), - _ = [begin - Name = amqqueue:get_name(Q), - ?LOG_DEBUG("Will trigger a leader election for local quorum queue ~ts", - [rabbit_misc:rs(Name)]), - %% we trigger an election and exclude this node from the list of candidates - %% by simply shutting its local QQ replica (Ra server) - RaLeader = amqqueue:get_pid(Q), - ?LOG_DEBUG("Will stop Ra server ~tp", [RaLeader]), - case rabbit_quorum_queue:stop_server(RaLeader) of - ok -> - ?LOG_DEBUG("Successfully stopped Ra server ~tp", [RaLeader]); - {error, nodedown} -> - ?LOG_ERROR("Failed to stop Ra server ~tp: target node was reported as down") - end - end || Q <- Queues], - ?LOG_INFO("Leadership transfer for quorum queues hosted on this node has been initiated"). + [length(LocalLeaderQueues)]), + [begin + [begin + %% we trigger an election and exclude this node from the list of candidates + %% by simply shutting its local QQ replica (Ra server) + RaLeader = amqqueue:get_pid(Q), + ?LOG_DEBUG("Will stop Ra leader ~tp", [RaLeader]), + case rabbit_quorum_queue:stop_server(RaLeader) of + ok -> + ?LOG_DEBUG("Successfully stopped Ra server ~tp", [RaLeader]); + {error, nodedown} -> + ?LOG_ERROR("Failed to stop Ra server ~tp: target node was reported as down") + end, + ok + end || Q <- Queues], + %% wait for leader elections before processing next chunk of queues + [begin + {RaName, LeaderNode} = amqqueue:get_pid(Q), + MemberNodes = lists:delete(LeaderNode, amqqueue:get_nodes(Q)), + %% we don't do any explicit error handling here as it is more + %% important to make progress + _ = lists:any(fun (N) -> + case ra:members({RaName, N}, ?RA_MEMBERS_TIMEOUT) of + {ok, _, _} -> + true; + Err -> + Name = amqqueue:get_name(Q), + ?LOG_DEBUG("Failed to wait for leader election for queue ~ts on ~tp Err ~ts", + [Name, N, Err]), + false + end + end, MemberNodes), + ok + + end || Q <- Queues], + ok + end || Queues <- QueuesChunked], + ?LOG_INFO("Leadership transfer for quorum queues hosted on this node has been initiated"), + ok. %% TODO: I just copied it over, it looks like was always called inside maintenance so... -spec stop_local_quorum_queue_followers() -> ok. diff --git a/deps/rabbitmq_management/priv/www/js/formatters.js b/deps/rabbitmq_management/priv/www/js/formatters.js index bb68af880d49..2131534a06d4 100644 --- a/deps/rabbitmq_management/priv/www/js/formatters.js +++ b/deps/rabbitmq_management/priv/www/js/formatters.js @@ -601,6 +601,10 @@ function fmt_object_state(obj) { explanation = 'The queue does not have sufficient online members to ' + 'make progress' } + else if (obj.state == 'timeout') { + colour = 'yellow'; + explanation = 'The queue did not respond to it\'s status request '; + } return fmt_state(colour, text, explanation); } From 42b3caa9640c077469dd1598a6c85dd1acd0992d Mon Sep 17 00:00:00 2001 From: Michael Klishin Date: Thu, 21 Aug 2025 19:08:31 -0400 Subject: [PATCH 2/2] Cosmetics #14401 --- deps/rabbitmq_management/priv/www/js/formatters.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/deps/rabbitmq_management/priv/www/js/formatters.js b/deps/rabbitmq_management/priv/www/js/formatters.js index 2131534a06d4..0423a28b8af8 100644 --- a/deps/rabbitmq_management/priv/www/js/formatters.js +++ b/deps/rabbitmq_management/priv/www/js/formatters.js @@ -603,7 +603,7 @@ function fmt_object_state(obj) { } else if (obj.state == 'timeout') { colour = 'yellow'; - explanation = 'The queue did not respond to it\'s status request '; + explanation = 'The queue leader did not respond to its status request '; } return fmt_state(colour, text, explanation);