@@ -1582,12 +1582,12 @@ transfer_leadership(Q, Destination) ->
15821582 {RaName , _ } = Pid = amqqueue :get_pid (Q ),
15831583 case ra :transfer_leadership (Pid , {RaName , Destination }) of
15841584 ok ->
1585- case ra :members (Pid ) of
1586- {_ , _ , {_ , NewNode }} ->
1587- {migrated , NewNode };
1588- {timeout , _ } ->
1589- {not_migrated , ra_members_timeout }
1590- end ;
1585+ case ra :members (Pid ) of
1586+ {_ , _ , {_ , NewNode }} ->
1587+ {migrated , NewNode };
1588+ {timeout , _ } ->
1589+ {not_migrated , ra_members_timeout }
1590+ end ;
15911591 already_leader ->
15921592 {not_migrated , already_leader };
15931593 {error , Reason } ->
@@ -2299,27 +2299,51 @@ drain(TransferCandidates) ->
22992299transfer_leadership ([]) ->
23002300 ? LOG_WARNING (" Skipping leadership transfer of quorum queues: no candidate "
23012301 " (online, not under maintenance) nodes to transfer to!" );
2302- transfer_leadership (_TransferCandidates ) ->
2302+ transfer_leadership (_CandidateNodes ) ->
23032303 % % we only transfer leadership for QQs that have local leaders
2304- Queues = rabbit_amqqueue :list_local_leaders (),
2304+ LocalLeaderQueues = rabbit_amqqueue :list_local_leaders (),
2305+ QueuesChunked = ra_lib :lists_chunk (16 , LocalLeaderQueues ),
23052306 ? LOG_INFO (" Will transfer leadership of ~b quorum queues with current leader on this node" ,
2306- [length (Queues )]),
2307- _ = [begin
2308- Name = amqqueue :get_name (Q ),
2309- ? LOG_DEBUG (" Will trigger a leader election for local quorum queue ~ts " ,
2310- [rabbit_misc :rs (Name )]),
2311- % % we trigger an election and exclude this node from the list of candidates
2312- % % by simply shutting its local QQ replica (Ra server)
2313- RaLeader = amqqueue :get_pid (Q ),
2314- ? LOG_DEBUG (" Will stop Ra server ~tp " , [RaLeader ]),
2315- case rabbit_quorum_queue :stop_server (RaLeader ) of
2316- ok ->
2317- ? LOG_DEBUG (" Successfully stopped Ra server ~tp " , [RaLeader ]);
2318- {error , nodedown } ->
2319- ? LOG_ERROR (" Failed to stop Ra server ~tp : target node was reported as down" )
2320- end
2321- end || Q <- Queues ],
2322- ? LOG_INFO (" Leadership transfer for quorum queues hosted on this node has been initiated" ).
2307+ [length (LocalLeaderQueues )]),
2308+ [begin
2309+ [begin
2310+ % ?LOG_DEBUG("Will trigger a leader election for local quorum queue ~ts",
2311+ % [rabbit_misc:rs(Name)]),
2312+ % % we trigger an election and exclude this node from the list of candidates
2313+ % % by simply shutting its local QQ replica (Ra server)
2314+ RaLeader = amqqueue :get_pid (Q ),
2315+ ? LOG_DEBUG (" Will stop Ra leader ~tp " , [RaLeader ]),
2316+ case rabbit_quorum_queue :stop_server (RaLeader ) of
2317+ ok ->
2318+ ? LOG_DEBUG (" Successfully stopped Ra server ~tp " , [RaLeader ]);
2319+ {error , nodedown } ->
2320+ ? LOG_ERROR (" Failed to stop Ra server ~tp : target node was reported as down" )
2321+ end ,
2322+ ok
2323+ end || Q <- Queues ],
2324+ % % wait for leader elections before processing next chunk of queues
2325+ [begin
2326+ {RaName , LeaderNode } = amqqueue :get_pid (Q ),
2327+ MemberNodes = lists :delete (LeaderNode , amqqueue :get_nodes (Q )),
2328+ % % we don't do any explicit error handling here
2329+ _ = lists :any (fun (N ) ->
2330+ case ra :members ({RaName , N }, 30000 ) of
2331+ {ok , _ , _ } ->
2332+ true ;
2333+ Err ->
2334+ Name = amqqueue :get_name (Q ),
2335+ ? LOG_DEBUG (" Failed to wait for leader election for queue ~ts on ~tp Err ~ts " ,
2336+ [Name , N , Err ]),
2337+ false
2338+ end
2339+ end , MemberNodes ),
2340+ ok
2341+
2342+ end || Q <- Queues ],
2343+ ok
2344+ end || Queues <- QueuesChunked ],
2345+ ? LOG_INFO (" Leadership transfer for quorum queues hosted on this node has been initiated" ),
2346+ ok .
23232347
23242348% % TODO: I just copied it over, it looks like was always called inside maintenance so...
23252349- spec stop_local_quorum_queue_followers () -> ok .
0 commit comments