@@ -193,18 +193,20 @@ start_cluster(Q) ->
193193 {error , {too_long , N }} ->
194194 rabbit_data_coercion :to_atom (ra :new_uid (N ))
195195 end ,
196- {Leader , Followers } = rabbit_queue_location :select_leader_and_followers (Q , QuorumSize ),
197- LeaderId = {RaName , Leader },
196+ {LeaderNode , FollowerNodes } =
197+ rabbit_queue_location :select_leader_and_followers (Q , QuorumSize ),
198+ LeaderId = {RaName , LeaderNode },
198199 NewQ0 = amqqueue :set_pid (Q , LeaderId ),
199- NewQ1 = amqqueue :set_type_state (NewQ0 , #{nodes => [Leader | Followers ]}),
200+ NewQ1 = amqqueue :set_type_state (NewQ0 ,
201+ #{nodes => [LeaderNode | FollowerNodes ]}),
200202
201203 rabbit_log :debug (" Will start up to ~w replicas for quorum ~ts with leader on node '~ts '" ,
202- [QuorumSize , rabbit_misc :rs (QName ), Leader ]),
204+ [QuorumSize , rabbit_misc :rs (QName ), LeaderNode ]),
203205 case rabbit_amqqueue :internal_declare (NewQ1 , false ) of
204206 {created , NewQ } ->
205207 RaConfs = [make_ra_conf (NewQ , ServerId )
206208 || ServerId <- members (NewQ )],
207- try erpc_call (Leader , ra , start_cluster ,
209+ try erpc_call (LeaderNode , ra , start_cluster ,
208210 [? RA_SYSTEM , RaConfs , ? START_CLUSTER_TIMEOUT ],
209211 ? START_CLUSTER_RPC_TIMEOUT ) of
210212 {ok , _ , _ } ->
@@ -228,10 +230,10 @@ start_cluster(Q) ->
228230 ActingUser }]),
229231 {new , NewQ };
230232 {error , Error } ->
231- declare_queue_error (Error , NewQ , Leader , ActingUser )
233+ declare_queue_error (Error , NewQ , LeaderNode , ActingUser )
232234 catch
233235 error :Error ->
234- declare_queue_error (Error , NewQ , Leader , ActingUser )
236+ declare_queue_error (Error , NewQ , LeaderNode , ActingUser )
235237 end ;
236238 {existing , _ } = Ex ->
237239 Ex
@@ -321,26 +323,28 @@ local_or_remote_handler(ChPid, Module, Function, Args) ->
321323 end .
322324
323325become_leader (QName , Name ) ->
326+ % % as this function is called synchronously when a ra node becomes leader
327+ % % we need to ensure there is no chance of blocking as else the ra node
328+ % % may not be able to establish its leadership
329+ spawn (fun () -> become_leader0 (QName , Name ) end ).
330+
331+ become_leader0 (QName , Name ) ->
324332 Fun = fun (Q1 ) ->
325333 amqqueue :set_state (
326334 amqqueue :set_pid (Q1 , {Name , node ()}),
327335 live )
328336 end ,
329- % % as this function is called synchronously when a ra node becomes leader
330- % % we need to ensure there is no chance of blocking as else the ra node
331- % % may not be able to establish its leadership
332- spawn (fun () ->
333- _ = rabbit_amqqueue :update (QName , Fun ),
334- case rabbit_amqqueue :lookup (QName ) of
335- {ok , Q0 } when ? is_amqqueue (Q0 ) ->
336- Nodes = get_nodes (Q0 ),
337- [_ = erpc_call (Node , ? MODULE , rpc_delete_metrics ,
338- [QName ], ? RPC_TIMEOUT )
339- || Node <- Nodes , Node =/= node ()];
340- _ ->
341- ok
342- end
343- end ).
337+ _ = rabbit_amqqueue :update (QName , Fun ),
338+ case rabbit_amqqueue :lookup (QName ) of
339+ {ok , Q0 } when ? is_amqqueue (Q0 ) ->
340+ Nodes = get_nodes (Q0 ),
341+ _ = [_ = erpc_call (Node , ? MODULE , rpc_delete_metrics ,
342+ [QName ], ? RPC_TIMEOUT )
343+ || Node <- Nodes , Node =/= node ()],
344+ ok ;
345+ _ ->
346+ ok
347+ end .
344348
345349-spec all_replica_states () -> {node (), #{atom () => atom ()}}.
346350all_replica_states () ->
@@ -496,7 +500,7 @@ handle_tick(QName,
496500 catch
497501 _ :Err ->
498502 rabbit_log :debug (" ~ts : handle tick failed with ~p " ,
499- [rabbit_misc :rs (QName ), Err ]),
503+ [rabbit_misc :rs (QName ), Err ]),
500504 ok
501505 end
502506 end ).
@@ -512,7 +516,7 @@ repair_leader_record(QName, Self) ->
512516 rabbit_log :debug (" ~ts : repairing leader record" ,
513517 [rabbit_misc :rs (QName )]),
514518 {_ , Name } = erlang :process_info (Self , registered_name ),
515- become_leader (QName , Name ),
519+ ok = become_leader0 (QName , Name ),
516520 ok
517521 end ,
518522 ok .
@@ -579,7 +583,7 @@ recover(_Vhost, Queues) ->
579583 Err1 == name_not_registered ->
580584 rabbit_log :warning (" Quorum queue recovery: configured member of ~ts was not found on this node. Starting member as a new one. "
581585 " Context: ~s " ,
582- [rabbit_misc :rs (QName ), Err1 ]),
586+ [rabbit_misc :rs (QName ), Err1 ]),
583587 % queue was never started on this node
584588 % so needs to be started from scratch.
585589 case start_server (make_ra_conf (Q0 , ServerId )) of
0 commit comments