Skip to content

Commit 6115475

Browse files
authored
Merge pull request #1293 from rabbitmq/rabbitmq-server-close-connection-on-vhost-down
Close connections when vhost is unavailable (supervision tree is down)
2 parents e66809b + dc82c25 commit 6115475

12 files changed

+613
-139
lines changed

src/rabbit_connection_tracking.erl

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@
3434
delete_tracked_connections_table_for_node/1, delete_per_vhost_tracked_connections_table_for_node/1,
3535
clear_tracked_connection_tables_for_this_node/0,
3636
register_connection/1, unregister_connection/1,
37-
list/0, list/1, list_on_node/1, list_of_user/1,
37+
list/0, list/1, list_on_node/1, list_on_node/2, list_of_user/1,
3838
tracked_connection_from_connection_created/1,
3939
tracked_connection_from_connection_state/1,
4040
count_connections_in/1]).
@@ -217,6 +217,16 @@ list_on_node(Node) ->
217217
catch exit:{aborted, {no_exists, _}} -> []
218218
end.
219219

220+
-spec list_on_node(node(), rabbit_types:vhsot()) -> [rabbit_types:tracked_connection()].
221+
222+
list_on_node(Node, VHost) ->
223+
try mnesia:dirty_match_object(
224+
tracked_connection_table_name_for(Node),
225+
#tracked_connection{vhost = VHost, _ = '_'})
226+
catch exit:{aborted, {no_exists, _}} -> []
227+
end.
228+
229+
220230
-spec list_of_user(rabbit_types:username()) -> [rabbit_types:tracked_connection()].
221231

222232
list_of_user(Username) ->

src/rabbit_connection_tracking_handler.erl

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,15 @@ handle_event(#event{type = vhost_deleted, props = Details}, State) ->
8282
close_connections(rabbit_connection_tracking:list(VHost),
8383
rabbit_misc:format("vhost '~s' is deleted", [VHost])),
8484
{ok, State};
85+
handle_event(#event{type = vhost_down, props = Details}, State) ->
86+
VHost = pget(name, Details),
87+
Node = pget(node, Details),
88+
rabbit_log_connection:info("Closing all connections in vhost '~s' at node '~s'"
89+
" because the vhost database has stopped working",
90+
[VHost, Node]),
91+
close_connections(rabbit_connection_tracking:list_on_node(Node, VHost),
92+
rabbit_misc:format("vhost '~s' is down", [VHost])),
93+
{ok, State};
8594
handle_event(#event{type = user_deleted, props = Details}, State) ->
8695
Username = pget(name, Details),
8796
rabbit_log_connection:info("Closing all connections from user '~s' because it's being deleted", [Username]),

src/rabbit_direct.erl

Lines changed: 30 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -90,16 +90,21 @@ connect(Creds, VHost, Protocol, Pid, Infos) ->
9090
true ->
9191
{error, not_allowed};
9292
false ->
93-
case AuthFun() of
94-
{ok, User = #user{username = Username}} ->
95-
notify_auth_result(Username,
96-
user_authentication_success, []),
97-
connect1(User, VHost, Protocol, Pid, Infos);
98-
{refused, Username, Msg, Args} ->
99-
notify_auth_result(Username,
100-
user_authentication_failure,
101-
[{error, rabbit_misc:format(Msg, Args)}]),
102-
{error, {auth_failure, "Refused"}}
93+
case is_vhost_alive(VHost, Creds, Pid) of
94+
false ->
95+
{error, {internal_error, vhost_is_down}};
96+
true ->
97+
case AuthFun() of
98+
{ok, User = #user{username = Username}} ->
99+
notify_auth_result(Username,
100+
user_authentication_success, []),
101+
connect1(User, VHost, Protocol, Pid, Infos);
102+
{refused, Username, Msg, Args} ->
103+
notify_auth_result(Username,
104+
user_authentication_failure,
105+
[{error, rabbit_misc:format(Msg, Args)}]),
106+
{error, {auth_failure, "Refused"}}
107+
end
103108
end
104109
end;
105110
false -> {error, broker_not_found_on_node}
@@ -140,6 +145,21 @@ maybe_call_connection_info_module(Protocol, Creds, VHost, Pid, Infos) ->
140145
[]
141146
end.
142147

148+
is_vhost_alive(VHost, {Username, _Password}, Pid) ->
149+
PrintedUsername = case Username of
150+
none -> "";
151+
_ -> Username
152+
end,
153+
case rabbit_vhost_sup_sup:is_vhost_alive(VHost) of
154+
true -> true;
155+
false ->
156+
rabbit_log_connection:error(
157+
"Error on Direct connection ~p~n"
158+
"access to vhost '~s' refused for user '~s': "
159+
"vhost '~s' is down",
160+
[Pid, VHost, PrintedUsername, VHost]),
161+
false
162+
end.
143163

144164
is_over_connection_limit(VHost, {Username, _Password}, Pid) ->
145165
PrintedUsername = case Username of

src/rabbit_msg_store.erl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1010,7 +1010,7 @@ terminate(_Reason, State = #msstate { index_state = IndexState,
10101010
ok;
10111011
{error, RTErr} ->
10121012
rabbit_log:error("Unable to save message store recovery terms"
1013-
"for directory ~p~nError: ~p~n",
1013+
" for directory ~p~nError: ~p~n",
10141014
[Dir, RTErr])
10151015
end,
10161016
State3 #msstate { index_state = undefined,

src/rabbit_reader.erl

Lines changed: 17 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -567,7 +567,7 @@ handle_other(handshake_timeout, State) ->
567567
throw({handshake_timeout, State#v1.callback});
568568
handle_other(heartbeat_timeout, State = #v1{connection_state = closed}) ->
569569
State;
570-
handle_other(heartbeat_timeout,
570+
handle_other(heartbeat_timeout,
571571
State = #v1{connection = #connection{timeout_sec = T}}) ->
572572
maybe_emit_stats(State),
573573
throw({heartbeat_timeout, T});
@@ -623,7 +623,7 @@ send_blocked(#v1{connection = #connection{protocol = Protocol,
623623
sock = Sock}, Reason) ->
624624
case rabbit_misc:table_lookup(Capabilities, <<"connection.blocked">>) of
625625
{bool, true} ->
626-
626+
627627
ok = send_on_channel0(Sock, #'connection.blocked'{reason = Reason},
628628
Protocol);
629629
_ ->
@@ -1164,6 +1164,7 @@ handle_method0(#'connection.open'{virtual_host = VHost},
11641164

11651165
ok = is_over_connection_limit(VHost, User),
11661166
ok = rabbit_access_control:check_vhost_access(User, VHost, Sock),
1167+
ok = is_vhost_alive(VHost, User),
11671168
NewConnection = Connection#connection{vhost = VHost},
11681169
ok = send_on_channel0(Sock, #'connection.open_ok'{}, Protocol),
11691170

@@ -1209,6 +1210,16 @@ handle_method0(_Method, #v1{connection_state = S}) ->
12091210
rabbit_misc:protocol_error(
12101211
channel_error, "unexpected method in connection state ~w", [S]).
12111212

1213+
is_vhost_alive(VHostPath, User) ->
1214+
case rabbit_vhost_sup_sup:is_vhost_alive(VHostPath) of
1215+
true -> ok;
1216+
false ->
1217+
rabbit_misc:protocol_error(internal_error,
1218+
"access to vhost '~s' refused for user '~s': "
1219+
"vhost '~s' is down",
1220+
[VHostPath, User#user.username, VHostPath])
1221+
end.
1222+
12121223
is_over_connection_limit(VHostPath, User) ->
12131224
try rabbit_vhost_limit:is_over_connection_limit(VHostPath) of
12141225
false -> ok;
@@ -1567,7 +1578,7 @@ maybe_block(State = #v1{connection_state = CS, throttle = Throttle}) ->
15671578
State1 = State#v1{connection_state = blocked,
15681579
throttle = update_last_blocked_at(Throttle)},
15691580
case CS of
1570-
running ->
1581+
running ->
15711582
ok = rabbit_heartbeat:pause_monitor(State#v1.heartbeater);
15721583
_ -> ok
15731584
end,
@@ -1589,7 +1600,7 @@ maybe_send_unblocked(State = #v1{throttle = Throttle}) ->
15891600
case should_send_unblocked(Throttle) of
15901601
true ->
15911602
ok = send_unblocked(State),
1592-
State#v1{throttle =
1603+
State#v1{throttle =
15931604
Throttle#throttle{connection_blocked_message_sent = false}};
15941605
false -> State
15951606
end.
@@ -1598,7 +1609,7 @@ maybe_send_blocked_or_unblocked(State = #v1{throttle = Throttle}) ->
15981609
case should_send_blocked(Throttle) of
15991610
true ->
16001611
ok = send_blocked(State, blocked_by_message(Throttle)),
1601-
State#v1{throttle =
1612+
State#v1{throttle =
16021613
Throttle#throttle{connection_blocked_message_sent = true}};
16031614
false -> maybe_send_unblocked(State)
16041615
end.
@@ -1624,7 +1635,7 @@ control_throttle(State = #v1{connection_state = CS,
16241635
running -> maybe_block(State1);
16251636
%% unblock or re-enable blocking
16261637
blocked -> maybe_block(maybe_unblock(State1));
1627-
_ -> State1
1638+
_ -> State1
16281639
end.
16291640

16301641
augment_connection_log_name(#connection{client_properties = ClientProperties,

src/rabbit_vhost.erl

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
-export([info/1, info/2, info_all/0, info_all/1, info_all/2, info_all/3]).
2727
-export([dir/1, msg_store_dir_path/1, msg_store_dir_wildcard/0]).
2828
-export([delete_storage/1]).
29+
-export([vhost_down/1]).
2930

3031
-spec add(rabbit_types:vhost(), rabbit_types:username()) -> rabbit_types:ok_or_error(any()).
3132
-spec delete(rabbit_types:vhost(), rabbit_types:username()) -> rabbit_types:ok_or_error(any()).
@@ -54,8 +55,9 @@ recover() ->
5455
%% rabbit_vhost_sup_sup will start the actual recovery.
5556
%% So recovery will be run every time a vhost supervisor is restarted.
5657
ok = rabbit_vhost_sup_sup:start(),
57-
[{ok, _} = rabbit_vhost_sup_sup:vhost_sup(VHost)
58-
|| VHost <- rabbit_vhost:list()],
58+
59+
[ ok = rabbit_vhost_sup_sup:init_vhost(VHost)
60+
|| VHost <- rabbit_vhost:list()],
5961
ok.
6062

6163
recover(VHost) ->
@@ -144,6 +146,12 @@ delete(VHostPath, ActingUser) ->
144146
rabbit_vhost_sup_sup:delete_on_all_nodes(VHostPath),
145147
ok.
146148

149+
vhost_down(VHostPath) ->
150+
ok = rabbit_event:notify(vhost_down,
151+
[{name, VHostPath},
152+
{node, node()},
153+
{user_who_performed_action, ?INTERNAL_USER}]).
154+
147155
delete_storage(VHost) ->
148156
VhostDir = msg_store_dir_path(VHost),
149157
rabbit_log:info("Deleting message store directory for vhost '~s' at '~s'~n", [VHost, VhostDir]),

src/rabbit_vhost_sup_watcher.erl renamed to src/rabbit_vhost_process.erl

Lines changed: 34 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -14,10 +14,21 @@
1414
%% Copyright (c) 2017 Pivotal Software, Inc. All rights reserved.
1515
%%
1616

17-
%% This module implements a watcher process which should stop
18-
%% the parent supervisor if its vhost is missing from the mnesia DB
17+
%% This module implements a vhost identity process.
1918

20-
-module(rabbit_vhost_sup_watcher).
19+
%% On start this process will try to recover the vhost data and
20+
%% processes structure (queues and message stores).
21+
%% If recovered successfully, the process will save it's PID
22+
%% to vhost process registry. If vhost process PID is in the registry and the
23+
%% process is alive - the vhost is considered running.
24+
25+
%% On termination, the ptocess will notify of vhost going down.
26+
27+
%% The process will also check periodically if the vhost still
28+
%% present in mnesia DB and stop the vhost supervision tree when it
29+
%% desapears.
30+
31+
-module(rabbit_vhost_process).
2132

2233
-include("rabbit.hrl").
2334

@@ -29,15 +40,26 @@
2940
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
3041
code_change/3]).
3142

32-
3343
start_link(VHost) ->
3444
gen_server2:start_link(?MODULE, [VHost], []).
3545

3646

3747
init([VHost]) ->
38-
Interval = interval(),
39-
timer:send_interval(Interval, check_vhost),
40-
{ok, VHost}.
48+
process_flag(trap_exit, true),
49+
rabbit_log:debug("Recovering data for VHost ~p~n", [VHost]),
50+
try
51+
%% Recover the vhost data and save it to vhost registry.
52+
ok = rabbit_vhost:recover(VHost),
53+
rabbit_vhost_sup_sup:save_vhost_process(VHost, self()),
54+
Interval = interval(),
55+
timer:send_interval(Interval, check_vhost),
56+
{ok, VHost}
57+
catch _:Reason ->
58+
rabbit_log:error("Unable to recover vhost ~p data. Reason ~p~n"
59+
" Stacktrace ~p",
60+
[VHost, Reason, erlang:get_stacktrace()]),
61+
{stop, Reason}
62+
end.
4163

4264
handle_call(_,_,VHost) ->
4365
{reply, ok, VHost}.
@@ -64,7 +86,11 @@ handle_info(check_vhost, VHost) ->
6486
handle_info(_, VHost) ->
6587
{noreply, VHost}.
6688

67-
terminate(_, _) -> ok.
89+
terminate(shutdown, VHost) ->
90+
%% Notify that vhost is stopped.
91+
rabbit_vhost:vhost_down(VHost);
92+
terminate(_, _VHost) ->
93+
ok.
6894

6995
code_change(_OldVsn, VHost, _Extra) ->
7096
{ok, VHost}.

src/rabbit_vhost_sup.erl

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,4 @@ start_link(VHost) ->
2828
supervisor2:start_link(?MODULE, [VHost]).
2929

3030
init([VHost]) ->
31-
{ok, {{one_for_all, 0, 1},
32-
[{rabbit_vhost_sup_watcher,
33-
{rabbit_vhost_sup_watcher, start_link, [VHost]},
34-
intrinsic, ?WORKER_WAIT, worker,
35-
[rabbit_vhost_sup]}]}}.
31+
{ok, {{one_for_all, 0, 1}, []}}.

0 commit comments

Comments
 (0)