Skip to content

Commit c58a15e

Browse files
Merge pull request #1315 from rabbitmq/rabbitmq-server-1310
Check if vhost supervisor is running when starting mirrors
2 parents 843199f + a65b6d7 commit c58a15e

10 files changed

+210
-67
lines changed

src/rabbit_amqqueue_sup_sup.erl

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -57,15 +57,15 @@ find_for_vhost(VHost) ->
5757

5858
-spec find_for_vhost(rabbit_types:vhost(), atom()) -> {ok, pid()} | {error, term()}.
5959
find_for_vhost(VHost, Node) ->
60-
{ok, VHostSup} = rabbit_vhost_sup_sup:vhost_sup(VHost, Node),
60+
{ok, VHostSup} = rabbit_vhost_sup_sup:get_vhost_sup(VHost, Node),
6161
case supervisor2:find_child(VHostSup, rabbit_amqqueue_sup_sup) of
6262
[QSup] -> {ok, QSup};
6363
Result -> {error, {queue_supervisor_not_found, Result}}
6464
end.
6565

6666
-spec start_for_vhost(rabbit_types:vhost()) -> {ok, pid()} | {error, term()}.
6767
start_for_vhost(VHost) ->
68-
case rabbit_vhost_sup_sup:vhost_sup(VHost) of
68+
case rabbit_vhost_sup_sup:get_vhost_sup(VHost) of
6969
{ok, VHostSup} ->
7070
supervisor2:start_child(
7171
VHostSup,
@@ -82,7 +82,7 @@ start_for_vhost(VHost) ->
8282

8383
-spec stop_for_vhost(rabbit_types:vhost()) -> ok.
8484
stop_for_vhost(VHost) ->
85-
case rabbit_vhost_sup_sup:vhost_sup(VHost) of
85+
case rabbit_vhost_sup_sup:get_vhost_sup(VHost) of
8686
{ok, VHostSup} ->
8787
ok = supervisor2:terminate_child(VHostSup, rabbit_amqqueue_sup_sup),
8888
ok = supervisor2:delete_child(VHostSup, rabbit_amqqueue_sup_sup);

src/rabbit_connection_tracking_handler.erl

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -82,11 +82,14 @@ handle_event(#event{type = vhost_deleted, props = Details}, State) ->
8282
close_connections(rabbit_connection_tracking:list(VHost),
8383
rabbit_misc:format("vhost '~s' is deleted", [VHost])),
8484
{ok, State};
85+
%% Note: under normal circumstances this will be called immediately
86+
%% after the vhost_deleted above. Therefore we should be careful about
87+
%% what we log and be more defensive.
8588
handle_event(#event{type = vhost_down, props = Details}, State) ->
8689
VHost = pget(name, Details),
8790
Node = pget(node, Details),
88-
rabbit_log_connection:info("Closing all connections in vhost '~s' at node '~s'"
89-
" because the vhost database has stopped working",
91+
rabbit_log_connection:info("Closing all connections in vhost '~s' on node '~s'"
92+
" because the vhost is stopping",
9093
[VHost, Node]),
9194
close_connections(rabbit_connection_tracking:list_on_node(Node, VHost),
9295
rabbit_misc:format("vhost '~s' is down", [VHost])),
@@ -131,7 +134,17 @@ close_connections(Tracked, Message, Delay) ->
131134
ok.
132135

133136
close_connection(#tracked_connection{pid = Pid, type = network}, Message) ->
134-
rabbit_networking:close_connection(Pid, Message);
137+
try
138+
rabbit_networking:close_connection(Pid, Message)
139+
catch error:{not_a_connection, _} ->
140+
%% could has been closed concurrently, or the input
141+
%% is bogus. In any case, we should not terminate
142+
ok;
143+
_:Err ->
144+
%% ignore, don't terminate
145+
rabbit_log:warning("Could not close connection ~p: ~p", [Pid, Err]),
146+
ok
147+
end;
135148
close_connection(#tracked_connection{pid = Pid, type = direct}, Message) ->
136149
%% Do an RPC call to the node running the direct client.
137150
Node = node(Pid),

src/rabbit_mirror_queue_misc.erl

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -228,11 +228,21 @@ add_mirror(QName, MirrorNode, SyncMode) ->
228228
rabbit_misc:with_exit_handler(
229229
rabbit_misc:const(ok),
230230
fun () ->
231-
SPid = rabbit_amqqueue_sup_sup:start_queue_process(
232-
MirrorNode, Q, slave),
233-
log_info(QName, "Adding mirror on node ~p: ~p~n",
234-
[MirrorNode, SPid]),
235-
rabbit_mirror_queue_slave:go(SPid, SyncMode)
231+
#amqqueue{name = #resource{virtual_host = VHost}} = Q,
232+
case rabbit_vhost_sup_sup:get_vhost_sup(VHost, MirrorNode) of
233+
{ok, _} ->
234+
SPid = rabbit_amqqueue_sup_sup:start_queue_process(
235+
MirrorNode, Q, slave),
236+
log_info(QName, "Adding mirror on node ~p: ~p~n",
237+
[MirrorNode, SPid]),
238+
rabbit_mirror_queue_slave:go(SPid, SyncMode);
239+
{error, Error} ->
240+
log_warning(QName,
241+
"Unable to start queue mirror on node '~p'. "
242+
"Target virtual host is not running: ~p~n",
243+
[MirrorNode, Error]),
244+
ok
245+
end
236246
end);
237247
{error, not_found} = E ->
238248
E

src/rabbit_recovery_terms.erl

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@
4848
%%----------------------------------------------------------------------------
4949

5050
start(VHost) ->
51-
case rabbit_vhost_sup_sup:vhost_sup(VHost) of
51+
case rabbit_vhost_sup_sup:get_vhost_sup(VHost) of
5252
{ok, VHostSup} ->
5353
{ok, _} = supervisor2:start_child(
5454
VHostSup,
@@ -65,7 +65,7 @@ start(VHost) ->
6565
ok.
6666

6767
stop(VHost) ->
68-
case rabbit_vhost_sup_sup:vhost_sup(VHost) of
68+
case rabbit_vhost_sup_sup:get_vhost_sup(VHost) of
6969
{ok, VHostSup} ->
7070
case supervisor:terminate_child(VHostSup, ?MODULE) of
7171
ok -> supervisor:delete_child(VHostSup, ?MODULE);

src/rabbit_vhost_limit.erl

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,12 @@ notify(VHost, <<"vhost-limits">>, <<"limits">>, Limits, ActingUser) ->
5555
notify_clear(VHost, <<"vhost-limits">>, <<"limits">>, ActingUser) ->
5656
rabbit_event:notify(vhost_limits_cleared, [{name, <<"limits">>},
5757
{user_who_performed_action, ActingUser}]),
58-
update_vhost(VHost, undefined).
58+
%% If the function is called as a part of vhost deletion, the vhost can
59+
%% be already deleted.
60+
case rabbit_vhost:exists(VHost) of
61+
true -> update_vhost(VHost, undefined);
62+
false -> ok
63+
end.
5964

6065
connection_limit(VirtualHost) ->
6166
get_limit(VirtualHost, <<"max-connections">>).

src/rabbit_vhost_msg_store.erl

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@
2323

2424
start(VHost, Type, ClientRefs, StartupFunState) when is_list(ClientRefs);
2525
ClientRefs == undefined ->
26-
case rabbit_vhost_sup_sup:vhost_sup(VHost) of
26+
case rabbit_vhost_sup_sup:get_vhost_sup(VHost) of
2727
{ok, VHostSup} ->
2828
VHostDir = rabbit_vhost:msg_store_dir_path(VHost),
2929
supervisor2:start_child(VHostSup,
@@ -39,7 +39,7 @@ start(VHost, Type, ClientRefs, StartupFunState) when is_list(ClientRefs);
3939
end.
4040

4141
stop(VHost, Type) ->
42-
case rabbit_vhost_sup_sup:vhost_sup(VHost) of
42+
case rabbit_vhost_sup_sup:get_vhost_sup(VHost) of
4343
{ok, VHostSup} ->
4444
ok = supervisor2:terminate_child(VHostSup, Type),
4545
ok = supervisor2:delete_child(VHostSup, Type);
@@ -65,7 +65,7 @@ with_vhost_store(VHost, Type, Fun) ->
6565
end.
6666

6767
vhost_store_pid(VHost, Type) ->
68-
{ok, VHostSup} = rabbit_vhost_sup_sup:vhost_sup(VHost),
68+
{ok, VHostSup} = rabbit_vhost_sup_sup:get_vhost_sup(VHost),
6969
case supervisor2:find_child(VHostSup, Type) of
7070
[Pid] -> Pid;
7171
[] -> no_pid

src/rabbit_vhost_sup_sup.erl

Lines changed: 32 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@
2323
-export([init/1]).
2424

2525
-export([start_link/0, start/0]).
26-
-export([init_vhost/1, vhost_sup/1, vhost_sup/2, save_vhost_sup/3]).
26+
-export([init_vhost/1, get_vhost_sup/1, get_vhost_sup/2, save_vhost_sup/3]).
2727
-export([delete_on_all_nodes/1]).
2828
-export([start_on_all_nodes/1]).
2929

@@ -72,7 +72,7 @@ delete_on_all_nodes(VHost) ->
7272
ok.
7373

7474
stop_and_delete_vhost(VHost) ->
75-
case get_vhost_sup(VHost) of
75+
StopResult = case lookup_vhost_sup_record(VHost) of
7676
not_found -> ok;
7777
#vhost_sup{wrapper_pid = WrapperPid,
7878
vhost_sup_pid = VHostSupPid} ->
@@ -84,13 +84,15 @@ stop_and_delete_vhost(VHost) ->
8484
[VHostSupPid, VHost]),
8585
case supervisor2:terminate_child(?MODULE, WrapperPid) of
8686
ok ->
87-
ets:delete(?MODULE, VHost),
88-
ok = rabbit_vhost:delete_storage(VHost);
87+
true = ets:delete(?MODULE, VHost),
88+
ok;
8989
Other ->
9090
Other
9191
end
9292
end
93-
end.
93+
end,
94+
ok = rabbit_vhost:delete_storage(VHost),
95+
StopResult.
9496

9597
%% We take an optimistic approach whan stopping a remote VHost supervisor.
9698
stop_and_delete_vhost(VHost, Node) when Node == node(self()) ->
@@ -106,7 +108,7 @@ stop_and_delete_vhost(VHost, Node) ->
106108
{error, RpcErr}
107109
end.
108110

109-
-spec init_vhost(rabbit_types:vhost()) -> ok.
111+
-spec init_vhost(rabbit_types:vhost()) -> ok | {error, {no_such_vhost, rabbit_types:vhsot()}}.
110112
init_vhost(VHost) ->
111113
case start_vhost(VHost) of
112114
{ok, _} -> ok;
@@ -130,30 +132,32 @@ init_vhost(VHost) ->
130132
end
131133
end.
132134

133-
-spec vhost_sup(rabbit_types:vhost(), node()) -> {ok, pid()} | {error, {no_such_vhost, rabbit_types:vhost()} | term()}.
134-
vhost_sup(VHost, Node) ->
135-
case rabbit_misc:rpc_call(Node, rabbit_vhost_sup_sup, vhost_sup, [VHost]) of
135+
-type vhost_error() :: {no_such_vhost, rabbit_types:vhost()} |
136+
{vhost_supervisor_not_running, rabbit_types:vhost()}.
137+
138+
-spec get_vhost_sup(rabbit_types:vhost(), node()) -> {ok, pid()} | {error, vhost_error() | term()}.
139+
get_vhost_sup(VHost, Node) ->
140+
case rabbit_misc:rpc_call(Node, rabbit_vhost_sup_sup, get_vhost_sup, [VHost]) of
136141
{ok, Pid} when is_pid(Pid) ->
137142
{ok, Pid};
143+
{error, Err} ->
144+
{error, Err};
138145
{badrpc, RpcErr} ->
139146
{error, RpcErr}
140147
end.
141148

142-
-spec vhost_sup(rabbit_types:vhost()) -> {ok, pid()} | {error, {no_such_vhost, rabbit_types:vhost()}}.
143-
vhost_sup(VHost) ->
144-
case vhost_sup_pid(VHost) of
145-
no_pid ->
146-
case start_vhost(VHost) of
147-
{ok, Pid} ->
148-
true = is_vhost_alive(VHost),
149-
{ok, Pid};
150-
{error, {no_such_vhost, VHost}} ->
151-
{error, {no_such_vhost, VHost}};
152-
Error ->
153-
throw(Error)
154-
end;
155-
{ok, Pid} when is_pid(Pid) ->
156-
{ok, Pid}
149+
-spec get_vhost_sup(rabbit_types:vhost()) -> {ok, pid()} | {error, vhost_error()}.
150+
get_vhost_sup(VHost) ->
151+
case rabbit_vhost:exists(VHost) of
152+
false ->
153+
{error, {no_such_vhost, VHost}};
154+
true ->
155+
case vhost_sup_pid(VHost) of
156+
no_pid ->
157+
{error, {vhost_supervisor_not_running, VHost}};
158+
{ok, Pid} when is_pid(Pid) ->
159+
{ok, Pid}
160+
end
157161
end.
158162

159163
-spec start_vhost(rabbit_types:vhost(), node()) -> {ok, pid()} | {error, term()}.
@@ -181,7 +185,7 @@ start_vhost(VHost) ->
181185
is_vhost_alive(VHost) ->
182186
%% A vhost is considered alive if it's supervision tree is alive and
183187
%% saved in the ETS table
184-
case get_vhost_sup(VHost) of
188+
case lookup_vhost_sup_record(VHost) of
185189
#vhost_sup{wrapper_pid = WrapperPid,
186190
vhost_sup_pid = VHostSupPid,
187191
vhost_process_pid = VHostProcessPid}
@@ -210,16 +214,16 @@ save_vhost_process(VHost, VHostProcessPid) ->
210214
{#vhost_sup.vhost_process_pid, VHostProcessPid}),
211215
ok.
212216

213-
-spec get_vhost_sup(rabbit_types:vhost()) -> #vhost_sup{}.
214-
get_vhost_sup(VHost) ->
217+
-spec lookup_vhost_sup_record(rabbit_types:vhost()) -> #vhost_sup{} | not_found.
218+
lookup_vhost_sup_record(VHost) ->
215219
case ets:lookup(?MODULE, VHost) of
216220
[] -> not_found;
217221
[#vhost_sup{} = VHostSup] -> VHostSup
218222
end.
219223

220224
-spec vhost_sup_pid(rabbit_types:vhost()) -> no_pid | {ok, pid()}.
221225
vhost_sup_pid(VHost) ->
222-
case get_vhost_sup(VHost) of
226+
case lookup_vhost_sup_record(VHost) of
223227
not_found ->
224228
no_pid;
225229
#vhost_sup{vhost_sup_pid = Pid} = VHostSup ->

src/rabbit_vhost_sup_wrapper.erl

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,12 @@
2929
start_link(VHost) ->
3030
%% Using supervisor, because supervisor2 does not stop a started child when
3131
%% another one fails to start. Bug?
32-
supervisor:start_link(?MODULE, [VHost]).
32+
case rabbit_vhost_sup_sup:get_vhost_sup(VHost) of
33+
{ok, Pid} ->
34+
{error, {already_started, Pid}};
35+
{error, _} ->
36+
supervisor:start_link(?MODULE, [VHost])
37+
end.
3338

3439
init([VHost]) ->
3540
%% 2 restarts in 5 minutes. One per message store.

test/dynamic_ha_SUITE.erl

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -57,13 +57,15 @@ groups() ->
5757
{clustered, [], [
5858
{cluster_size_2, [], [
5959
vhost_deletion,
60-
promote_on_shutdown
60+
promote_on_shutdown,
61+
queue_survive_adding_dead_vhost_mirror
6162
]},
6263
{cluster_size_3, [], [
6364
change_policy,
6465
rapid_change,
6566
nodes_policy_should_pick_master_from_its_params,
66-
promote_slave_after_standalone_restart
67+
promote_slave_after_standalone_restart,
68+
queue_survive_adding_dead_vhost_mirror
6769
% FIXME: Re-enable those tests when the know issues are
6870
% fixed.
6971
% failing_random_policies,
@@ -218,6 +220,19 @@ rapid_loop(Config, Node, MRef) ->
218220
rapid_loop(Config, Node, MRef)
219221
end.
220222

223+
queue_survive_adding_dead_vhost_mirror(Config) ->
224+
rabbit_ct_broker_helpers:force_vhost_failure(Config, 1, <<"/">>),
225+
NodeA = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
226+
ChA = rabbit_ct_client_helpers:open_channel(Config, NodeA),
227+
QName = <<"queue_survive_adding_dead_vhost_mirror-q-1">>,
228+
amqp_channel:call(ChA, #'queue.declare'{queue = QName}),
229+
Q = find_queue(QName, NodeA),
230+
Pid = proplists:get_value(pid, Q),
231+
rabbit_ct_broker_helpers:set_ha_policy_all(Config),
232+
%% Queue should not fail
233+
Q1 = find_queue(QName, NodeA),
234+
Pid = proplists:get_value(pid, Q1).
235+
221236
%% Vhost deletion needs to successfully tear down policies and queues
222237
%% with policies. At least smoke-test that it doesn't blow up.
223238
vhost_deletion(Config) ->

0 commit comments

Comments
 (0)