Skip to content

Commit 8cc35f6

Browse files
committed
fix cleanup of exclusive queues in case of restart or partition
When using different than client-local queues exclusive ones can be placed on a different host than the client. When there is a partition or restart these queues are stuck because they are not cleaned up in the nodes as the connection is still alive when the node goes offline, however the queue is dead in the partition. These queues can not be redeclared or consumed. This change fixes the test to test if the queue process is alive. Fixes: #2039
1 parent 9a57252 commit 8cc35f6

File tree

2 files changed

+71
-3
lines changed

2 files changed

+71
-3
lines changed

src/rabbit_amqqueue.erl

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1114,8 +1114,8 @@ is_mirrored(Q) ->
11141114

11151115
is_dead_exclusive(#amqqueue{exclusive_owner = none}) ->
11161116
false;
1117-
is_dead_exclusive(#amqqueue{exclusive_owner = Pid}) when is_pid(Pid) ->
1118-
not rabbit_mnesia:is_process_alive(Pid).
1117+
is_dead_exclusive(#amqqueue{exclusive_owner = OwnerPid, pid = QPid}) when is_pid(OwnerPid), is_pid(QPid) ->
1118+
not (rabbit_mnesia:is_process_alive(OwnerPid) andalso rabbit_mnesia:is_process_alive(QPid)).
11191119

11201120
on_node_up(Node) ->
11211121
ok = rabbit_misc:execute_mnesia_transaction(

test/simple_ha_SUITE.erl

Lines changed: 69 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
-include_lib("common_test/include/ct.hrl").
2020
-include_lib("amqp_client/include/amqp_client.hrl").
21+
-include_lib("eunit/include/eunit.hrl").
2122

2223
-compile(export_all).
2324

@@ -34,7 +35,8 @@ groups() ->
3435
{cluster_size_2, [], [
3536
rapid_redeclare,
3637
declare_synchrony,
37-
clean_up_exclusive_queues
38+
clean_up_exclusive_queues,
39+
clean_up_and_redeclare_exclusive_queues_on_other_nodes
3840
]},
3941
{cluster_size_3, [], [
4042
consume_survives_stop,
@@ -148,6 +150,43 @@ clean_up_exclusive_queues(Config) ->
148150
[[],[]] = rabbit_ct_broker_helpers:rpc_all(Config, rabbit_amqqueue, list, []),
149151
ok.
150152

153+
clean_up_and_redeclare_exclusive_queues_on_other_nodes(Config) ->
154+
QueueCount = 10,
155+
QueueNames = lists:map(fun(N) ->
156+
NBin = erlang:integer_to_binary(N),
157+
<<"exclusive-q-", NBin/binary>>
158+
end, lists:seq(1, QueueCount)),
159+
[A, B] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
160+
Conn = rabbit_ct_client_helpers:open_unmanaged_connection(Config, A),
161+
{ok, Ch} = amqp_connection:open_channel(Conn),
162+
163+
LocationMinMasters = [
164+
{<<"x-queue-master-locator">>, longstr, <<"min-masters">>}
165+
],
166+
lists:foreach(fun(QueueName) ->
167+
declare_exclusive(Ch, QueueName, LocationMinMasters),
168+
subscribe(Ch, QueueName)
169+
end, QueueNames),
170+
171+
ok = rabbit_ct_broker_helpers:kill_node(Config, B),
172+
173+
Cancels = receive_cancels([]),
174+
?assert(length(Cancels) > 0),
175+
176+
RemaniningQueues = rabbit_ct_broker_helpers:rpc(Config, A, rabbit_amqqueue, list, []),
177+
178+
?assertEqual(length(RemaniningQueues), QueueCount - length(Cancels)),
179+
180+
lists:foreach(fun(QueueName) ->
181+
declare_exclusive(Ch, QueueName, LocationMinMasters),
182+
true = rabbit_ct_client_helpers:publish(Ch, QueueName, 1),
183+
subscribe(Ch, QueueName)
184+
end, QueueNames),
185+
Messages = receive_messages([]),
186+
?assertEqual(10, length(Messages)),
187+
ok = rabbit_ct_client_helpers:close_connection(Conn).
188+
189+
151190
consume_survives_stop(Cf) -> consume_survives(Cf, fun stop/2, true).
152191
consume_survives_sigkill(Cf) -> consume_survives(Cf, fun sigkill/2, true).
153192
consume_survives_policy(Cf) -> consume_survives(Cf, fun policy/2, true).
@@ -273,3 +312,32 @@ open_incapable_channel(NodePort) ->
273312
client_properties = Props}),
274313
{ok, Ch} = amqp_connection:open_channel(ConsConn),
275314
Ch.
315+
316+
declare_exclusive(Ch, QueueName, Args) ->
317+
Declare = #'queue.declare'{queue = QueueName,
318+
exclusive = true,
319+
arguments = Args
320+
},
321+
#'queue.declare_ok'{} = amqp_channel:call(Ch, Declare).
322+
323+
subscribe(Ch, QueueName) ->
324+
ConsumeOk = amqp_channel:call(Ch, #'basic.consume'{queue = QueueName,
325+
no_ack = true}),
326+
#'basic.consume_ok'{} = ConsumeOk,
327+
receive ConsumeOk -> ok after ?DELAY -> throw(consume_ok_timeout) end.
328+
329+
receive_cancels(Cancels) ->
330+
receive
331+
#'basic.cancel'{} = C ->
332+
receive_cancels([C|Cancels])
333+
after ?DELAY ->
334+
Cancels
335+
end.
336+
337+
receive_messages(All) ->
338+
receive
339+
{#'basic.deliver'{}, Msg} ->
340+
receive_messages([Msg|All])
341+
after ?DELAY ->
342+
lists:reverse(All)
343+
end.

0 commit comments

Comments
 (0)