|
152 | 152 | -define(MIN_CHECKPOINT_INTERVAL, 64). |
153 | 153 | -define(LEADER_HEALTH_CHECK_TIMEOUT, 5_000). |
154 | 154 | -define(GLOBAL_LEADER_HEALTH_CHECK_TIMEOUT, 60_000). |
| 155 | +-define(RA_MEMBERS_TIMEOUT, 30_000). |
155 | 156 |
|
156 | 157 | %%----------- QQ policies --------------------------------------------------- |
157 | 158 |
|
@@ -1218,7 +1219,6 @@ policy_changed(Q) -> |
1218 | 1219 | end. |
1219 | 1220 |
|
1220 | 1221 | -spec cluster_state(Name :: atom()) -> 'down' | 'recovering' | 'running'. |
1221 | | - |
1222 | 1222 | cluster_state(Name) -> |
1223 | 1223 | case whereis(Name) of |
1224 | 1224 | undefined -> down; |
@@ -1566,17 +1566,18 @@ grow(Node, VhostSpec, QueueSpec, Strategy, Membership) -> |
1566 | 1566 | is_match(amqqueue:get_vhost(Q), VhostSpec) andalso |
1567 | 1567 | is_match(get_resource_name(amqqueue:get_name(Q)), QueueSpec) ]. |
1568 | 1568 |
|
1569 | | --spec transfer_leadership(amqqueue:amqqueue(), node()) -> {migrated, node()} | {not_migrated, atom()}. |
| 1569 | +-spec transfer_leadership(amqqueue:amqqueue(), node()) -> |
| 1570 | + {migrated, node()} | {not_migrated, atom()}. |
1570 | 1571 | transfer_leadership(Q, Destination) -> |
1571 | | - {RaName, _} = Pid = amqqueue:get_pid(Q), |
1572 | | - case ra:transfer_leadership(Pid, {RaName, Destination}) of |
| 1572 | + {RaName, _} = Leader = amqqueue:get_pid(Q), |
| 1573 | + case ra:transfer_leadership(Leader, {RaName, Destination}) of |
1573 | 1574 | ok -> |
1574 | | - case ra:members(Pid) of |
1575 | | - {_, _, {_, NewNode}} -> |
1576 | | - {migrated, NewNode}; |
1577 | | - {timeout, _} -> |
1578 | | - {not_migrated, ra_members_timeout} |
1579 | | - end; |
| 1575 | + case ra:members(Leader, ?RA_MEMBERS_TIMEOUT) of |
| 1576 | + {_, _, {_, NewNode}} -> |
| 1577 | + {migrated, NewNode}; |
| 1578 | + {timeout, _} -> |
| 1579 | + {not_migrated, ra_members_timeout} |
| 1580 | + end; |
1580 | 1581 | already_leader -> |
1581 | 1582 | {not_migrated, already_leader}; |
1582 | 1583 | {error, Reason} -> |
@@ -1741,9 +1742,17 @@ i(memory, Q) when ?is_amqqueue(Q) -> |
1741 | 1742 | 0 |
1742 | 1743 | end; |
1743 | 1744 | i(state, Q) when ?is_amqqueue(Q) -> |
1744 | | - {Name, Node} = amqqueue:get_pid(Q), |
| 1745 | + {Name, Node} = case find_leader(Q) of |
| 1746 | + undefined -> |
| 1747 | + %% fall back to queue record |
| 1748 | + amqqueue:get_pid(Q); |
| 1749 | + Leader -> |
| 1750 | + Leader |
| 1751 | + end, |
1745 | 1752 | %% Check against the leader or last known leader |
1746 | 1753 | case erpc_call(Node, ?MODULE, cluster_state, [Name], ?RPC_TIMEOUT) of |
| 1754 | + {error, {erpc, timeout}} -> |
| 1755 | + timeout; |
1747 | 1756 | {error, _} -> |
1748 | 1757 | down; |
1749 | 1758 | State -> |
@@ -1903,7 +1912,12 @@ format(Q, Ctx) when ?is_amqqueue(Q) -> |
1903 | 1912 | rabbit_nodes:list_running() |
1904 | 1913 | end, |
1905 | 1914 | Online = [N || N <- Nodes, lists:member(N, Running)], |
1906 | | - {_, LeaderNode} = amqqueue:get_pid(Q), |
| 1915 | + {_, LeaderNode} = case find_leader(Q) of |
| 1916 | + undefined -> |
| 1917 | + amqqueue:get_pid(Q); |
| 1918 | + Leader -> |
| 1919 | + Leader |
| 1920 | + end, |
1907 | 1921 | State = case is_minority(Nodes, Online) of |
1908 | 1922 | true when length(Online) == 0 -> |
1909 | 1923 | down; |
@@ -2274,3 +2288,115 @@ maybe_log_leader_health_check_result([]) -> ok; |
2274 | 2288 | maybe_log_leader_health_check_result(Result) -> |
2275 | 2289 | Qs = lists:map(fun(R) -> catch maps:get(<<"readable_name">>, R) end, Result), |
2276 | 2290 | rabbit_log:warning("Leader health check result (unhealthy leaders detected): ~tp", [Qs]). |
| 2291 | + |
| 2292 | +policy_apply_to_name() -> |
| 2293 | + <<"quorum_queues">>. |
| 2294 | + |
| 2295 | +-spec drain([node()]) -> ok. |
| 2296 | +drain(TransferCandidates) -> |
| 2297 | + _ = transfer_leadership(TransferCandidates), |
| 2298 | + _ = stop_local_quorum_queue_followers(), |
| 2299 | + ok. |
| 2300 | + |
| 2301 | +transfer_leadership([]) -> |
| 2302 | + rabbit_log:warning("Skipping leadership transfer of quorum queues: no candidate " |
| 2303 | + "(online, not under maintenance) nodes to transfer to!"); |
| 2304 | +transfer_leadership(_CandidateNodes) -> |
| 2305 | + %% we only transfer leadership for QQs that have local leaders |
| 2306 | + LocalLeaderQueues = rabbit_amqqueue:list_local_leaders(), |
| 2307 | + QueuesChunked = ra_lib:lists_chunk(256, LocalLeaderQueues), |
| 2308 | + rabbit_log:info("Will transfer leadership of ~b quorum queues with current leader on this node", |
| 2309 | + [length(LocalLeaderQueues)]), |
| 2310 | + [begin |
| 2311 | + [begin |
| 2312 | + %% we trigger an election and exclude this node from the list of candidates |
| 2313 | + %% by simply shutting its local QQ replica (Ra server) |
| 2314 | + RaLeader = amqqueue:get_pid(Q), |
| 2315 | + rabbit_log:debug("Will stop Ra leader ~tp", [RaLeader]), |
| 2316 | + case rabbit_quorum_queue:stop_server(RaLeader) of |
| 2317 | + ok -> |
| 2318 | + rabbit_log:debug("Successfully stopped Ra server ~tp", [RaLeader]); |
| 2319 | + {error, nodedown} -> |
| 2320 | + rabbit_log:error("Failed to stop Ra server ~tp: target node was reported as down") |
| 2321 | + end, |
| 2322 | + ok |
| 2323 | + end || Q <- Queues], |
| 2324 | + %% wait for leader elections before processing next chunk of queues |
| 2325 | + [begin |
| 2326 | + {RaName, LeaderNode} = amqqueue:get_pid(Q), |
| 2327 | + MemberNodes = lists:delete(LeaderNode, amqqueue:get_nodes(Q)), |
| 2328 | + %% we don't do any explicit error handling here as it is more |
| 2329 | + %% important to make progress |
| 2330 | + _ = lists:any(fun (N) -> |
| 2331 | + case ra:members({RaName, N}, ?RA_MEMBERS_TIMEOUT) of |
| 2332 | + {ok, _, _} -> |
| 2333 | + true; |
| 2334 | + Err -> |
| 2335 | + Name = amqqueue:get_name(Q), |
| 2336 | + rabbit_log:debug("Failed to wait for leader election for queue ~ts on ~tp Err ~ts", |
| 2337 | + [Name, N, Err]), |
| 2338 | + false |
| 2339 | + end |
| 2340 | + end, MemberNodes), |
| 2341 | + ok |
| 2342 | + |
| 2343 | + end || Q <- Queues], |
| 2344 | + ok |
| 2345 | + end || Queues <- QueuesChunked], |
| 2346 | + rabbit_log:info("Leadership transfer for quorum queues hosted on this node has been initiated"), |
| 2347 | + ok. |
| 2348 | + |
| 2349 | +%% TODO: I just copied it over, it looks like was always called inside maintenance so... |
| 2350 | +-spec stop_local_quorum_queue_followers() -> ok. |
| 2351 | +stop_local_quorum_queue_followers() -> |
| 2352 | + Queues = rabbit_amqqueue:list_local_followers(), |
| 2353 | + rabbit_log:info("Will stop local follower replicas of ~b quorum queues on this node", |
| 2354 | + [length(Queues)]), |
| 2355 | + _ = [begin |
| 2356 | + Name = amqqueue:get_name(Q), |
| 2357 | + rabbit_log:debug("Will stop a local follower replica of quorum queue ~ts", |
| 2358 | + [rabbit_misc:rs(Name)]), |
| 2359 | + %% shut down Ra nodes so that they are not considered for leader election |
| 2360 | + {RegisteredName, _LeaderNode} = amqqueue:get_pid(Q), |
| 2361 | + RaNode = {RegisteredName, node()}, |
| 2362 | + rabbit_log:debug("Will stop Ra server ~tp", [RaNode]), |
| 2363 | + case rabbit_quorum_queue:stop_server(RaNode) of |
| 2364 | + ok -> |
| 2365 | + rabbit_log:debug("Successfully stopped Ra server ~tp", [RaNode]); |
| 2366 | + {error, nodedown} -> |
| 2367 | + rabbit_log:error("Failed to stop Ra server ~tp: target node was reported as down") |
| 2368 | + end |
| 2369 | + end || Q <- Queues], |
| 2370 | + rabbit_log:info("Stopped all local replicas of quorum queues hosted on this node"). |
| 2371 | + |
| 2372 | +revive() -> |
| 2373 | + revive_local_queue_members(). |
| 2374 | + |
| 2375 | +revive_local_queue_members() -> |
| 2376 | + Queues = rabbit_amqqueue:list_local_followers(), |
| 2377 | + %% NB: this function ignores the first argument so we can just pass the |
| 2378 | + %% empty binary as the vhost name. |
| 2379 | + {Recovered, Failed} = rabbit_quorum_queue:recover(<<>>, Queues), |
| 2380 | + ?LOG_DEBUG("Successfully revived ~b quorum queue replicas", |
| 2381 | + [length(Recovered)]), |
| 2382 | + case length(Failed) of |
| 2383 | + 0 -> |
| 2384 | + ok; |
| 2385 | + NumFailed -> |
| 2386 | + ?LOG_ERROR("Failed to revive ~b quorum queue replicas", |
| 2387 | + [NumFailed]) |
| 2388 | + end, |
| 2389 | + |
| 2390 | + ?LOG_INFO("Restart of local quorum queue replicas is complete"), |
| 2391 | + ok. |
| 2392 | + |
| 2393 | +queue_vm_stats_sups() -> |
| 2394 | + {[quorum_queue_procs, |
| 2395 | + quorum_queue_dlx_procs], |
| 2396 | + [[ra_server_sup_sup], |
| 2397 | + [rabbit_fifo_dlx_sup]]}. |
| 2398 | + |
| 2399 | +queue_vm_ets() -> |
| 2400 | + {[quorum_ets], |
| 2401 | + [[ra_log_ets]]}. |
| 2402 | + |
0 commit comments