Skip to content

Commit ec5ffe7

Browse files
Merge pull request #1325 from rabbitmq/rabbitmq-server-1323
Clean up orphaned exclusive queues
2 parents 2a01b6c + af397dd commit ec5ffe7

File tree

2 files changed

+30
-7
lines changed

2 files changed

+30
-7
lines changed

src/rabbit_amqqueue.erl

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,8 @@
3434
-export([notify_down_all/2, notify_down_all/3, activate_limit_all/2, credit/5]).
3535
-export([on_node_up/1, on_node_down/1]).
3636
-export([update/2, store_queue/1, update_decorators/1, policy_changed/2]).
37-
-export([update_mirroring/1, sync_mirrors/1, cancel_sync_mirrors/1, is_mirrored/1]).
37+
-export([update_mirroring/1, sync_mirrors/1, cancel_sync_mirrors/1]).
38+
-export([is_mirrored/1, is_dead_exclusive/1]). % Note: exported due to use in qlc expression.
3839

3940
-export([pid_of/1, pid_of/2]).
4041

@@ -919,6 +920,11 @@ cancel_sync_mirrors(QPid) -> delegate:call(QPid, cancel_sync_mi
919920
is_mirrored(Q) ->
920921
rabbit_mirror_queue_misc:is_mirrored(Q).
921922

923+
is_dead_exclusive(#amqqueue{exclusive_owner = none}) ->
924+
false;
925+
is_dead_exclusive(#amqqueue{exclusive_owner = Pid}) when is_pid(Pid) ->
926+
not rabbit_mnesia:is_process_alive(Pid).
927+
922928
on_node_up(Node) ->
923929
ok = rabbit_misc:execute_mnesia_transaction(
924930
fun () ->
@@ -963,11 +969,12 @@ on_node_down(Node) ->
963969
rabbit_misc:execute_mnesia_tx_with_tail(
964970
fun () -> QsDels =
965971
qlc:e(qlc:q([{QName, delete_queue(QName)} ||
966-
#amqqueue{name = QName, pid = Pid} = Q
967-
<- mnesia:table(rabbit_queue),
968-
not rabbit_amqqueue:is_mirrored(Q) andalso
969-
node(Pid) == Node andalso
970-
not rabbit_mnesia:is_process_alive(Pid)])),
972+
#amqqueue{name = QName, pid = Pid} =
973+
Q <- mnesia:table(rabbit_queue),
974+
node(Pid) == Node andalso
975+
not rabbit_mnesia:is_process_alive(Pid) andalso
976+
(not rabbit_amqqueue:is_mirrored(Q) orelse
977+
rabbit_amqqueue:is_dead_exclusive(Q))])),
971978
{Qs, Dels} = lists:unzip(QsDels),
972979
T = rabbit_binding:process_deletions(
973980
lists:foldl(fun rabbit_binding:combine_deletions/2,

test/simple_ha_SUITE.erl

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,8 @@ groups() ->
3131
[
3232
{cluster_size_2, [], [
3333
rapid_redeclare,
34-
declare_synchrony
34+
declare_synchrony,
35+
clean_up_exclusive_queues
3536
]},
3637
{cluster_size_3, [], [
3738
consume_survives_stop,
@@ -125,6 +126,21 @@ declare_synchrony(Config) ->
125126
declare(Ch, Name) ->
126127
amqp_channel:call(Ch, #'queue.declare'{durable = true, queue = Name}).
127128

129+
%% Ensure that exclusive queues are cleaned up when part of ha cluster
130+
%% and node is killed abruptly then restarted
131+
clean_up_exclusive_queues(Config) ->
132+
QName = <<"excl">>,
133+
rabbit_ct_broker_helpers:set_ha_policy(Config, 0, <<".*">>, <<"all">>),
134+
[A, B] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
135+
ChA = rabbit_ct_client_helpers:open_channel(Config, A),
136+
amqp_channel:call(ChA, #'queue.declare'{queue = QName,
137+
exclusive = true}),
138+
ok = rabbit_ct_broker_helpers:kill_node(Config, A),
139+
[] = rabbit_ct_broker_helpers:rpc(Config, B, rabbit_amqqueue, list, []),
140+
ok = rabbit_ct_broker_helpers:start_node(Config, A),
141+
[[],[]] = rabbit_ct_broker_helpers:rpc_all(Config, rabbit_amqqueue, list, []),
142+
ok.
143+
128144
consume_survives_stop(Cf) -> consume_survives(Cf, fun stop/2, true).
129145
consume_survives_sigkill(Cf) -> consume_survives(Cf, fun sigkill/2, true).
130146
consume_survives_policy(Cf) -> consume_survives(Cf, fun policy/2, true).

0 commit comments

Comments
 (0)