Skip to content

Commit e9c4165

Browse files
Merge pull request #2040 from luos/fix_cleanup_of_exclusive_queues_2
fix cleanup of exclusive queues in case of restart or partition
2 parents 5316f76 + 8cc35f6 commit e9c4165

File tree

2 files changed

+71
-3
lines changed

2 files changed

+71
-3
lines changed

src/rabbit_amqqueue.erl

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1113,8 +1113,8 @@ is_mirrored(Q) ->
11131113

11141114
is_dead_exclusive(#amqqueue{exclusive_owner = none}) ->
11151115
false;
1116-
is_dead_exclusive(#amqqueue{exclusive_owner = Pid}) when is_pid(Pid) ->
1117-
not rabbit_mnesia:is_process_alive(Pid).
1116+
is_dead_exclusive(#amqqueue{exclusive_owner = OwnerPid, pid = QPid}) when is_pid(OwnerPid), is_pid(QPid) ->
1117+
not (rabbit_mnesia:is_process_alive(OwnerPid) andalso rabbit_mnesia:is_process_alive(QPid)).
11181118

11191119
on_node_up(Node) ->
11201120
ok = rabbit_misc:execute_mnesia_transaction(

test/simple_ha_SUITE.erl

Lines changed: 69 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
-include_lib("common_test/include/ct.hrl").
2020
-include_lib("amqp_client/include/amqp_client.hrl").
21+
-include_lib("eunit/include/eunit.hrl").
2122

2223
-compile(export_all).
2324

@@ -34,7 +35,8 @@ groups() ->
3435
{cluster_size_2, [], [
3536
rapid_redeclare,
3637
declare_synchrony,
37-
clean_up_exclusive_queues
38+
clean_up_exclusive_queues,
39+
clean_up_and_redeclare_exclusive_queues_on_other_nodes
3840
]},
3941
{cluster_size_3, [], [
4042
consume_survives_stop,
@@ -148,6 +150,43 @@ clean_up_exclusive_queues(Config) ->
148150
[[],[]] = rabbit_ct_broker_helpers:rpc_all(Config, rabbit_amqqueue, list, []),
149151
ok.
150152

153+
clean_up_and_redeclare_exclusive_queues_on_other_nodes(Config) ->
154+
QueueCount = 10,
155+
QueueNames = lists:map(fun(N) ->
156+
NBin = erlang:integer_to_binary(N),
157+
<<"exclusive-q-", NBin/binary>>
158+
end, lists:seq(1, QueueCount)),
159+
[A, B] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
160+
Conn = rabbit_ct_client_helpers:open_unmanaged_connection(Config, A),
161+
{ok, Ch} = amqp_connection:open_channel(Conn),
162+
163+
LocationMinMasters = [
164+
{<<"x-queue-master-locator">>, longstr, <<"min-masters">>}
165+
],
166+
lists:foreach(fun(QueueName) ->
167+
declare_exclusive(Ch, QueueName, LocationMinMasters),
168+
subscribe(Ch, QueueName)
169+
end, QueueNames),
170+
171+
ok = rabbit_ct_broker_helpers:kill_node(Config, B),
172+
173+
Cancels = receive_cancels([]),
174+
?assert(length(Cancels) > 0),
175+
176+
RemaniningQueues = rabbit_ct_broker_helpers:rpc(Config, A, rabbit_amqqueue, list, []),
177+
178+
?assertEqual(length(RemaniningQueues), QueueCount - length(Cancels)),
179+
180+
lists:foreach(fun(QueueName) ->
181+
declare_exclusive(Ch, QueueName, LocationMinMasters),
182+
true = rabbit_ct_client_helpers:publish(Ch, QueueName, 1),
183+
subscribe(Ch, QueueName)
184+
end, QueueNames),
185+
Messages = receive_messages([]),
186+
?assertEqual(10, length(Messages)),
187+
ok = rabbit_ct_client_helpers:close_connection(Conn).
188+
189+
151190
consume_survives_stop(Cf) -> consume_survives(Cf, fun stop/2, true).
152191
consume_survives_sigkill(Cf) -> consume_survives(Cf, fun sigkill/2, true).
153192
consume_survives_policy(Cf) -> consume_survives(Cf, fun policy/2, true).
@@ -273,3 +312,32 @@ open_incapable_channel(NodePort) ->
273312
client_properties = Props}),
274313
{ok, Ch} = amqp_connection:open_channel(ConsConn),
275314
Ch.
315+
316+
declare_exclusive(Ch, QueueName, Args) ->
317+
Declare = #'queue.declare'{queue = QueueName,
318+
exclusive = true,
319+
arguments = Args
320+
},
321+
#'queue.declare_ok'{} = amqp_channel:call(Ch, Declare).
322+
323+
subscribe(Ch, QueueName) ->
324+
ConsumeOk = amqp_channel:call(Ch, #'basic.consume'{queue = QueueName,
325+
no_ack = true}),
326+
#'basic.consume_ok'{} = ConsumeOk,
327+
receive ConsumeOk -> ok after ?DELAY -> throw(consume_ok_timeout) end.
328+
329+
receive_cancels(Cancels) ->
330+
receive
331+
#'basic.cancel'{} = C ->
332+
receive_cancels([C|Cancels])
333+
after ?DELAY ->
334+
Cancels
335+
end.
336+
337+
receive_messages(All) ->
338+
receive
339+
{#'basic.deliver'{}, Msg} ->
340+
receive_messages([Msg|All])
341+
after ?DELAY ->
342+
lists:reverse(All)
343+
end.

0 commit comments

Comments
 (0)