@@ -456,31 +456,53 @@ with(Name, F, E) ->
456456
457457with (Name , F , E , RetriesLeft ) ->
458458 case lookup (Name ) of
459- {ok , Q = # amqqueue {}} when RetriesLeft =:= 0 ->
459+ {ok , Q = # amqqueue {state = live }} when RetriesLeft =:= 0 ->
460460 % % Something bad happened to that queue, we are bailing out
461461 % % on processing current request.
462462 E ({absent , Q , timeout });
463+ {ok , Q = # amqqueue {state = stopped }} when RetriesLeft =:= 0 ->
464+ % % The queue was stopped and not migrated
465+ E ({absent , Q , stopped });
466+ % % The queue process has crashed with unknown error
463467 {ok , Q = # amqqueue {state = crashed }} ->
464468 E ({absent , Q , crashed });
469+ % % The queue process has been stopped by a supervisor.
470+ % % In that case a synchronised slave can take over
471+ % % so we should retry.
465472 {ok , Q = # amqqueue {state = stopped }} ->
466473 % % The queue process was stopped by the supervisor
467- E ({absent , Q , stopped });
468- {ok , Q = # amqqueue {pid = QPid }} ->
474+ rabbit_misc :with_exit_handler (
475+ fun () -> retry_wait (Q , F , E , RetriesLeft ) end ,
476+ fun () -> F (Q ) end );
477+ % % The queue is supposed to be active.
478+ % % The master node can go away or queue can be killed
479+ % % so we retry, waiting for a slave to take over.
480+ {ok , Q = # amqqueue {state = live }} ->
469481 % % We check is_process_alive(QPid) in case we receive a
470482 % % nodedown (for example) in F() that has nothing to do
471483 % % with the QPid. F() should be written s.t. that this
472484 % % cannot happen, so we bail if it does since that
473485 % % indicates a code bug and we don't want to get stuck in
474486 % % the retry loop.
475487 rabbit_misc :with_exit_handler (
476- fun () -> false = rabbit_mnesia :is_process_alive (QPid ),
477- timer :sleep (30 ),
478- with (Name , F , E , RetriesLeft - 1 )
479- end , fun () -> F (Q ) end );
488+ fun () -> retry_wait (Q , F , E , RetriesLeft ) end ,
489+ fun () -> F (Q ) end );
480490 {error , not_found } ->
481491 E (not_found_or_absent_dirty (Name ))
482492 end .
483493
494+ retry_wait (Q = # amqqueue {pid = QPid , name = Name , state = QState }, F , E , RetriesLeft ) ->
495+ case {QState , is_mirrored (Q )} of
496+ % % We don't want to repeat an operation if
497+ % % there are no slaves to migrate to
498+ {stopped , false } ->
499+ E ({absent , Q , stopped });
500+ _ ->
501+ false = rabbit_mnesia :is_process_alive (QPid ),
502+ timer :sleep (30 ),
503+ with (Name , F , E , RetriesLeft - 1 )
504+ end .
505+
484506with (Name , F ) -> with (Name , F , fun (E ) -> {error , E } end ).
485507
486508with_or_die (Name , F ) ->
0 commit comments