@@ -215,7 +215,8 @@ recover() ->
215215 % % Clear out remnants of old incarnation, in case we restarted
216216 % % faster than other nodes handled DOWN messages from us.
217217 on_node_down (node ()),
218- DurableQueues = find_durable_queues (),
218+ DurableQueues = queues_to_recover (),
219+
219220 L = length (DurableQueues ),
220221
221222 % % if there are not enough file handles, the server might hang
@@ -257,6 +258,31 @@ start(Qs) ->
257258 [Pid ! {self (), go } || # amqqueue {pid = Pid } <- Qs ],
258259 ok .
259260
261+ queues_to_recover () ->
262+ DurableQueues = find_durable_queues (),
263+ VHosts = rabbit_vhost :list (),
264+
265+ {QueuesWithVhost , QueuesWithoutVhost } = lists :partition (
266+ fun (# amqqueue {name = # resource {virtual_host = VHost }}) ->
267+ lists :member (VHost , VHosts )
268+ end ,
269+ DurableQueues ),
270+
271+ {LocalQueuesWithoutVhost , _RemoteQueuesWithoutVhost } = lists :partition (
272+ fun (# amqqueue {pid = QPid }) -> node (QPid ) == node () end ,
273+ QueuesWithoutVhost ),
274+
275+ {atomic , ok } =
276+ mnesia :sync_transaction (
277+ fun () ->
278+ rabbit_log :error (" Deleting ~p~n " , [LocalQueuesWithoutVhost ]),
279+ [ internal_delete1 (Name , false )
280+ || # amqqueue {name = Name } <- LocalQueuesWithoutVhost ],
281+ ok
282+ end ),
283+
284+ QueuesWithVhost .
285+
260286find_durable_queues () ->
261287 Node = node (),
262288 mnesia :async_dirty (
@@ -576,7 +602,8 @@ list_local_names() ->
576602 State =/= crashed ,
577603 node () =:= node (QPid ) ].
578604
579- list (VHostPath ) -> list (VHostPath , rabbit_queue ).
605+ list (VHostPath ) ->
606+ list (VHostPath , rabbit_queue ).
580607
581608% % Not dirty_match_object since that would not be transactional when used in a
582609% % tx context
@@ -590,12 +617,16 @@ list(VHostPath, TableName) ->
590617 end ).
591618
592619list_down (VHostPath ) ->
593- Present = list (VHostPath ),
594- Durable = list (VHostPath , rabbit_durable_queue ),
595- PresentS = sets :from_list ([N || # amqqueue {name = N } <- Present ]),
596- sets :to_list (sets :filter (fun (# amqqueue {name = N }) ->
597- not sets :is_element (N , PresentS )
598- end , sets :from_list (Durable ))).
620+ case rabbit_vhost :exists (VHostPath ) of
621+ false -> [];
622+ true ->
623+ Present = list (VHostPath ),
624+ Durable = list (VHostPath , rabbit_durable_queue ),
625+ PresentS = sets :from_list ([N || # amqqueue {name = N } <- Present ]),
626+ sets :to_list (sets :filter (fun (# amqqueue {name = N }) ->
627+ not sets :is_element (N , PresentS )
628+ end , sets :from_list (Durable )))
629+ end .
599630
600631info_keys () -> rabbit_amqqueue_process :info_keys ().
601632
0 commit comments