Skip to content

Commit 843199f

Browse files
Merge pull request #1309 from rabbitmq/rabbitmq-server-1303
Set queue state to 'stopped' when terminating.
2 parents 98566cd + 1df9b98 commit 843199f

File tree

6 files changed

+63
-41
lines changed

6 files changed

+63
-41
lines changed

src/rabbit_amqqueue.erl

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
-export([update_mirroring/1, sync_mirrors/1, cancel_sync_mirrors/1, is_mirrored/1]).
4141

4242
-export([pid_of/1, pid_of/2]).
43+
-export([mark_local_durable_queues_stopped/1]).
4344

4445
%% internal
4546
-export([internal_declare/2, internal_delete/2, run_backing_queue/3,
@@ -255,6 +256,15 @@ start(Qs) ->
255256
[Pid ! {self(), go} || #amqqueue{pid = Pid} <- Qs],
256257
ok.
257258

259+
mark_local_durable_queues_stopped(VHost) ->
260+
Qs = find_durable_queues(VHost),
261+
rabbit_misc:execute_mnesia_transaction(
262+
fun() ->
263+
[ store_queue(Q#amqqueue{ state = stopped })
264+
|| Q = #amqqueue{ state = State } <- Qs,
265+
State =/= stopped ]
266+
end).
267+
258268
find_durable_queues(VHost) ->
259269
Node = node(),
260270
mnesia:async_dirty(
@@ -452,6 +462,9 @@ with(Name, F, E, RetriesLeft) ->
452462
E({absent, Q, timeout});
453463
{ok, Q = #amqqueue{state = crashed}} ->
454464
E({absent, Q, crashed});
465+
{ok, Q = #amqqueue{state = stopped}} ->
466+
%% The queue process was stopped by the supervisor
467+
E({absent, Q, stopped});
455468
{ok, Q = #amqqueue{pid = QPid}} ->
456469
%% We check is_process_alive(QPid) in case we receive a
457470
%% nodedown (for example) in F() that has nothing to do
@@ -642,10 +655,13 @@ info_keys() -> rabbit_amqqueue_process:info_keys().
642655
map(Qs, F) -> rabbit_misc:filter_exit_map(F, Qs).
643656

644657
info(Q = #amqqueue{ state = crashed }) -> info_down(Q, crashed);
658+
info(Q = #amqqueue{ state = stopped }) -> info_down(Q, stopped);
645659
info(#amqqueue{ pid = QPid }) -> delegate:invoke(QPid, {gen_server2, call, [info, infinity]}).
646660

647661
info(Q = #amqqueue{ state = crashed }, Items) ->
648662
info_down(Q, Items, crashed);
663+
info(Q = #amqqueue{ state = stopped }, Items) ->
664+
info_down(Q, Items, stopped);
649665
info(#amqqueue{ pid = QPid }, Items) ->
650666
case delegate:invoke(QPid, {gen_server2, call, [{info, Items}, infinity]}) of
651667
{ok, Res} -> Res;

src/rabbit_amqqueue_process.erl

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -265,9 +265,18 @@ init_with_backing_queue_state(Q = #amqqueue{exclusive_owner = Owner}, BQ, BQS,
265265
notify_decorators(startup, State3),
266266
State3.
267267

268-
terminate(shutdown = R, State = #q{backing_queue = BQ}) ->
268+
terminate(shutdown = R, State = #q{backing_queue = BQ, q = #amqqueue{ name = QName }}) ->
269269
rabbit_core_metrics:queue_deleted(qname(State)),
270-
terminate_shutdown(fun (BQS) -> BQ:terminate(R, BQS) end, State);
270+
terminate_shutdown(
271+
fun (BQS) ->
272+
rabbit_misc:execute_mnesia_transaction(
273+
fun() ->
274+
[Q] = mnesia:read({rabbit_queue, QName}),
275+
Q2 = Q#amqqueue{state = stopped},
276+
rabbit_amqqueue:store_queue(Q2)
277+
end),
278+
BQ:terminate(R, BQS)
279+
end, State);
271280
terminate({shutdown, missing_owner} = Reason, State) ->
272281
%% if the owner was missing then there will be no queue, so don't emit stats
273282
terminate_shutdown(terminate_delete(false, Reason, State), State);

src/rabbit_channel.erl

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2142,6 +2142,8 @@ handle_method(#'queue.delete'{queue = QueueNameBin,
21422142
fun (not_found) -> {ok, 0};
21432143
({absent, Q, crashed}) -> rabbit_amqqueue:delete_crashed(Q, Username),
21442144
{ok, 0};
2145+
({absent, Q, stopped}) -> rabbit_amqqueue:delete_crashed(Q, Username),
2146+
{ok, 0};
21452147
({absent, Q, Reason}) -> rabbit_misc:absent(Q, Reason)
21462148
end) of
21472149
{error, in_use} ->

src/rabbit_vhost_process.erl

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ init([VHost]) ->
5555
timer:send_interval(Interval, check_vhost),
5656
{ok, VHost}
5757
catch _:Reason ->
58+
rabbit_amqqueue:mark_local_durable_queues_stopped(VHost),
5859
rabbit_log:error("Unable to recover vhost ~p data. Reason ~p~n"
5960
" Stacktrace ~p",
6061
[VHost, Reason, erlang:get_stacktrace()]),

test/rabbitmqctl_integration_SUITE.erl

Lines changed: 29 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
-export([list_queues_local/1
3232
,list_queues_offline/1
3333
,list_queues_online/1
34+
,list_queues_stopped/1
3435
]).
3536

3637
all() ->
@@ -44,6 +45,7 @@ groups() ->
4445
[list_queues_local
4546
,list_queues_online
4647
,list_queues_offline
48+
,list_queues_stopped
4749
]}
4850
].
4951

@@ -96,13 +98,19 @@ end_per_group(list_queues, Config0) ->
9698
rabbit_ct_helpers:run_steps(Config1,
9799
rabbit_ct_client_helpers:teardown_steps() ++
98100
rabbit_ct_broker_helpers:teardown_steps());
99-
end_per_group(global_parameters, Config) ->
100-
rabbit_ct_helpers:run_teardown_steps(Config,
101-
rabbit_ct_client_helpers:teardown_steps() ++
102-
rabbit_ct_broker_helpers:teardown_steps());
103101
end_per_group(_, Config) ->
104102
Config.
105103

104+
init_per_testcase(list_queues_stopped, Config0) ->
105+
%% Start node 3 to crash it's queues
106+
rabbit_ct_broker_helpers:start_node(Config0, 2),
107+
%% Make vhost "down" on nodes 2 and 3
108+
rabbit_ct_broker_helpers:force_vhost_failure(Config0, 1, <<"/">>),
109+
rabbit_ct_broker_helpers:force_vhost_failure(Config0, 2, <<"/">>),
110+
111+
rabbit_ct_broker_helpers:stop_node(Config0, 2),
112+
rabbit_ct_helpers:testcase_started(Config0, list_queues_stopped);
113+
106114
init_per_testcase(Testcase, Config0) ->
107115
rabbit_ct_helpers:testcase_started(Config0, Testcase).
108116

@@ -134,6 +142,23 @@ list_queues_offline(Config) ->
134142
assert_ctl_queues(Config, 1, ["--offline"], OfflineQueues),
135143
ok.
136144

145+
list_queues_stopped(Config) ->
146+
Node1Queues = lists:sort(lists:nth(1, ?config(per_node_queues, Config))),
147+
Node2Queues = lists:sort(lists:nth(2, ?config(per_node_queues, Config))),
148+
Node3Queues = lists:sort(lists:nth(3, ?config(per_node_queues, Config))),
149+
150+
%% All queues are listed
151+
ListedQueues =
152+
[ {Name, State}
153+
|| [Name, State] <- rabbit_ct_broker_helpers:rabbitmqctl_list(
154+
Config, 0, ["list_queues", "name", "state"]) ],
155+
156+
[ <<"running">> = proplists:get_value(Q, ListedQueues) || Q <- Node1Queues ],
157+
%% Node is running. Vhost is down
158+
[ <<"stopped">> = proplists:get_value(Q, ListedQueues) || Q <- Node2Queues ],
159+
%% Node is not running. Vhost is down
160+
[ <<"down">> = proplists:get_value(Q, ListedQueues) || Q <- Node3Queues ].
161+
137162
%%----------------------------------------------------------------------------
138163
%% Helpers
139164
%%----------------------------------------------------------------------------

test/vhost_SUITE.erl

Lines changed: 4 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -161,7 +161,7 @@ vhost_failure_forces_connection_closure(Config) ->
161161
[_Conn2] = open_connections(Config, [{0, VHost2}]),
162162
?assertEqual(1, count_connections_in(Config, VHost2)),
163163

164-
force_vhost_failure(Config, VHost2),
164+
rabbit_ct_broker_helpers:force_vhost_failure(Config, VHost2),
165165
timer:sleep(200),
166166
?assertEqual(0, count_connections_in(Config, VHost2)),
167167

@@ -181,7 +181,7 @@ dead_vhost_connection_refused(Config) ->
181181
?assertEqual(0, count_connections_in(Config, VHost1)),
182182
?assertEqual(0, count_connections_in(Config, VHost2)),
183183

184-
force_vhost_failure(Config, VHost2),
184+
rabbit_ct_broker_helpers:force_vhost_failure(Config, VHost2),
185185
timer:sleep(200),
186186

187187
[_Conn1] = open_connections(Config, [{0, VHost1}]),
@@ -213,7 +213,7 @@ vhost_failure_forces_connection_closure_on_failure_node(Config) ->
213213
[_Conn21] = open_connections(Config, [{1, VHost2}]),
214214
?assertEqual(2, count_connections_in(Config, VHost2)),
215215

216-
force_vhost_failure(Config, 0, VHost2),
216+
rabbit_ct_broker_helpers:force_vhost_failure(Config, 0, VHost2),
217217
timer:sleep(200),
218218
%% Vhost2 connection on node 1 is still alive
219219
?assertEqual(1, count_connections_in(Config, VHost2)),
@@ -236,7 +236,7 @@ dead_vhost_connection_refused_on_failure_node(Config) ->
236236
?assertEqual(0, count_connections_in(Config, VHost1)),
237237
?assertEqual(0, count_connections_in(Config, VHost2)),
238238

239-
force_vhost_failure(Config, 0, VHost2),
239+
rabbit_ct_broker_helpers:force_vhost_failure(Config, 0, VHost2),
240240
timer:sleep(200),
241241
%% Can open connections to vhost1 on node 0 and 1
242242
[_Conn10] = open_connections(Config, [{0, VHost1}]),
@@ -257,37 +257,6 @@ dead_vhost_connection_refused_on_failure_node(Config) ->
257257
rabbit_ct_broker_helpers:delete_vhost(Config, VHost2),
258258
rabbit_ct_broker_helpers:delete_vhost(Config, VHost1).
259259

260-
force_vhost_failure(Config, VHost) -> force_vhost_failure(Config, 0, VHost).
261-
262-
force_vhost_failure(Config, Node, VHost) ->
263-
force_vhost_failure(Config, Node, VHost, 10).
264-
265-
force_vhost_failure(_Config, _Node, VHost, 0) ->
266-
error({failed_to_force_vhost_failure, no_more_attempts_left, VHost});
267-
force_vhost_failure(Config, Node, VHost, Attempts) ->
268-
MessageStorePid = get_message_store_pid(Config, VHost),
269-
rabbit_ct_broker_helpers:rpc(Config, Node,
270-
erlang, exit,
271-
[MessageStorePid, force_vhost_failure]),
272-
%% Give it a time to fail
273-
timer:sleep(200),
274-
case rabbit_ct_broker_helpers:rpc(Config, 0,
275-
rabbit_vhost_sup_sup, is_vhost_alive,
276-
[VHost]) of
277-
true -> force_vhost_failure(Config, Node, VHost, Attempts - 1);
278-
false -> ok
279-
end.
280-
281-
get_message_store_pid(Config, VHost) ->
282-
{ok, VHostSup} = rabbit_ct_broker_helpers:rpc(Config, 0,
283-
rabbit_vhost_sup_sup, vhost_sup, [VHost]),
284-
Children = rabbit_ct_broker_helpers:rpc(Config, 0,
285-
supervisor, which_children,
286-
[VHostSup]),
287-
[MsgStorePid] = [Pid || {Name, Pid, _, _} <- Children,
288-
Name == msg_store_persistent],
289-
MsgStorePid.
290-
291260
cluster_vhost_deletion_forces_connection_closure(Config) ->
292261
VHost1 = <<"vhost1">>,
293262
VHost2 = <<"vhost2">>,

0 commit comments

Comments
 (0)