Skip to content

Commit 2fdd1da

Browse files
Merge branch 'master' into rabbitmq_cli_log_commands
2 parents 7025d2a + 33a7f97 commit 2fdd1da

12 files changed

+162
-41
lines changed

src/rabbit.erl

Lines changed: 7 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -33,8 +33,6 @@
3333
-export([log_locations/0, config_files/0, decrypt_config/2]). %% for testing and mgmt-agent
3434
-export([is_booted/1, is_booted/0, is_booting/1, is_booting/0]).
3535

36-
-deprecated([{force_event_refresh, 1, eventually}]).
37-
3836
-ifdef(TEST).
3937

4038
-export([start_logger/0]).
@@ -1123,17 +1121,16 @@ start_logger() ->
11231121
log_locations() ->
11241122
rabbit_lager:log_locations().
11251123

1126-
%% This feature was used by the management API up-to and including
1127-
%% RabbitMQ 3.7.x. It is unused in 3.8.x and thus deprecated. We keep it
1128-
%% to support in-place upgrades to 3.8.x (i.e. mixed-version clusters).
1129-
11301124
-spec force_event_refresh(reference()) -> 'ok'.
11311125

1126+
% Note: https://www.pivotaltracker.com/story/show/166962656
1127+
% This event is necessary for the stats timer to be initialized with
1128+
% the correct values once the management agent has started
11321129
force_event_refresh(Ref) ->
1133-
rabbit_direct:force_event_refresh(Ref),
1134-
rabbit_networking:force_connection_event_refresh(Ref),
1135-
rabbit_channel:force_event_refresh(Ref),
1136-
rabbit_amqqueue:force_event_refresh(Ref).
1130+
ok = rabbit_direct:force_event_refresh(Ref),
1131+
ok = rabbit_networking:force_connection_event_refresh(Ref),
1132+
ok = rabbit_channel:force_event_refresh(Ref),
1133+
ok = rabbit_amqqueue:force_event_refresh(Ref).
11371134

11381135
%%---------------------------------------------------------------------------
11391136
%% misc

src/rabbit_access_control.erl

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@
2121
-export([check_user_pass_login/2, check_user_login/2, check_user_loopback/2,
2222
check_vhost_access/4, check_resource_access/4, check_topic_access/4]).
2323

24+
-export([permission_cache_can_expire/1, update_state/2]).
25+
2426
%%----------------------------------------------------------------------------
2527

2628
-export_type([permission_atom/0]).
@@ -217,3 +219,38 @@ check_access(Fun, Module, ErrStr, ErrArgs, ErrName) ->
217219
rabbit_log:error(FullErrStr, FullErrArgs),
218220
rabbit_misc:protocol_error(ErrName, FullErrStr, FullErrArgs)
219221
end.
222+
223+
-spec update_state(User :: rabbit_types:user(), NewState :: term()) ->
224+
{'ok', rabbit_types:auth_user()} |
225+
{'refused', string()} |
226+
{'error', any()}.
227+
228+
update_state(User = #user{authz_backends = Backends0}, NewState) ->
229+
%% N.B.: we use foldl/3 and prepending, so the final list of
230+
%% backends is in reverse order from the original list.
231+
Backends = lists:foldl(
232+
fun({Module, Impl}, {ok, Acc}) ->
233+
case Module:state_can_expire() of
234+
true ->
235+
case Module:update_state(auth_user(User, Impl), NewState) of
236+
{ok, #auth_user{impl = Impl1}} ->
237+
{ok, [{Module, Impl1} | Acc]};
238+
Else -> Else
239+
end;
240+
false ->
241+
{ok, [{Module, Impl} | Acc]}
242+
end;
243+
(_, {error, _} = Err) -> Err;
244+
(_, {refused, _, _} = Err) -> Err
245+
end, {ok, []}, Backends0),
246+
case Backends of
247+
{ok, Pairs} -> {ok, User#user{authz_backends = lists:reverse(Pairs)}};
248+
Else -> Else
249+
end.
250+
251+
-spec permission_cache_can_expire(User :: rabbit_types:user()) -> boolean().
252+
253+
%% Returns true if any of the backends support credential expiration,
254+
%% otherwise returns false.
255+
permission_cache_can_expire(#user{authz_backends = Backends}) ->
256+
lists:any(fun ({Module, _State}) -> Module:state_can_expire() end, Backends).

src/rabbit_amqqueue.erl

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -51,8 +51,6 @@
5151
-export([pid_of/1, pid_of/2]).
5252
-export([mark_local_durable_queues_stopped/1]).
5353

54-
-deprecated([{force_event_refresh, 1, eventually}]).
55-
5654
%% internal
5755
-export([internal_declare/2, internal_delete/2, run_backing_queue/3,
5856
set_ram_duration_target/2, set_maximum_since_use/2,
@@ -1031,6 +1029,9 @@ list_local(VHostPath) ->
10311029

10321030
-spec force_event_refresh(reference()) -> 'ok'.
10331031

1032+
% Note: https://www.pivotaltracker.com/story/show/166962656
1033+
% This event is necessary for the stats timer to be initialized with
1034+
% the correct values once the management agent has started
10341035
force_event_refresh(Ref) ->
10351036
[gen_server2:cast(amqqueue:get_pid(Q),
10361037
{force_event_refresh, Ref}) || Q <- list()],

src/rabbit_amqqueue_process.erl

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1615,6 +1615,9 @@ handle_cast({credit, ChPid, CTag, Credit, Drain},
16151615
run_message_queue(true, State1)
16161616
end);
16171617

1618+
% Note: https://www.pivotaltracker.com/story/show/166962656
1619+
% This event is necessary for the stats timer to be initialized with
1620+
% the correct values once the management agent has started
16181621
handle_cast({force_event_refresh, Ref},
16191622
State = #q{consumers = Consumers,
16201623
active_consumer = Holder}) ->

src/rabbit_auth_backend_internal.erl

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,8 @@
4040
list_user_vhost_permissions/2,
4141
list_user_topic_permissions/1, list_vhost_topic_permissions/1, list_user_vhost_topic_permissions/2]).
4242

43+
-export([state_can_expire/0]).
44+
4345
%% for testing
4446
-export([hashing_module_for_user/1, expand_topic_permission/2]).
4547

@@ -93,6 +95,8 @@ user_login_authentication(Username, AuthProps) ->
9395
false -> exit({unknown_auth_props, Username, AuthProps})
9496
end.
9597

98+
state_can_expire() -> false.
99+
96100
user_login_authorization(Username, _AuthProps) ->
97101
case user_login_authentication(Username, []) of
98102
{ok, #auth_user{impl = Impl, tags = Tags}} -> {ok, Impl, Tags};
@@ -123,7 +127,7 @@ check_vhost_access(#auth_user{username = Username}, VHostPath, _AuthzData) ->
123127

124128
check_resource_access(#auth_user{username = Username},
125129
#resource{virtual_host = VHostPath, name = Name},
126-
Permission,
130+
Permission,
127131
_AuthContext) ->
128132
case mnesia:dirty_read({rabbit_user_permission,
129133
#user_vhost{username = Username,

src/rabbit_auth_mechanism_plain.erl

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -31,9 +31,6 @@
3131
%% SASL PLAIN, as used by the Qpid Java client and our clients. Also,
3232
%% apparently, by OpenAMQ.
3333

34-
%% TODO: reimplement this using the binary module? - that makes use of
35-
%% BIFs to do binary matching and will thus be much faster.
36-
3734
description() ->
3835
[{description, <<"SASL PLAIN authentication mechanism">>}].
3936

src/rabbit_channel.erl

Lines changed: 32 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -63,15 +63,13 @@
6363
-export([refresh_config_local/0, ready_for_close/1]).
6464
-export([refresh_interceptors/0]).
6565
-export([force_event_refresh/1]).
66-
-export([source/2]).
66+
-export([source/2, update_user_state/2]).
6767

6868
-export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2,
6969
handle_info/2, handle_pre_hibernate/1, handle_post_hibernate/1,
7070
prioritise_call/4, prioritise_cast/3, prioritise_info/3,
7171
format_message_queue/2]).
7272

73-
-deprecated([{force_event_refresh, 1, eventually}]).
74-
7573
%% Internal
7674
-export([list_local/0, emit_info_local/3, deliver_reply_local/3]).
7775
-export([get_vhost/1, get_user/1]).
@@ -452,18 +450,31 @@ ready_for_close(Pid) ->
452450

453451
-spec force_event_refresh(reference()) -> 'ok'.
454452

453+
% Note: https://www.pivotaltracker.com/story/show/166962656
454+
% This event is necessary for the stats timer to be initialized with
455+
% the correct values once the management agent has started
455456
force_event_refresh(Ref) ->
456457
[gen_server2:cast(C, {force_event_refresh, Ref}) || C <- list()],
457458
ok.
458459

459460
list_queue_states(Pid) ->
460461
gen_server2:call(Pid, list_queue_states).
461462

462-
-spec source(pid(), any()) -> any().
463+
-spec source(pid(), any()) -> 'ok' | {error, channel_terminated}.
463464

464465
source(Pid, Source) when is_pid(Pid) ->
465466
case erlang:is_process_alive(Pid) of
466-
true -> Pid ! {channel_source, Source};
467+
true -> Pid ! {channel_source, Source},
468+
ok;
469+
false -> {error, channel_terminated}
470+
end.
471+
472+
-spec update_user_state(pid(), rabbit_types:auth_user()) -> 'ok' | {error, channel_terminated}.
473+
474+
update_user_state(Pid, UserState) when is_pid(Pid) ->
475+
case erlang:is_process_alive(Pid) of
476+
true -> Pid ! {update_user_state, UserState},
477+
ok;
467478
false -> {error, channel_terminated}
468479
end.
469480

@@ -489,6 +500,8 @@ init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost,
489500
_ ->
490501
Limiter0
491502
end,
503+
%% Process dictionary is used here because permission cache already uses it. MK.
504+
put(permission_cache_can_expire, rabbit_access_control:permission_cache_can_expire(User)),
492505
MaxMessageSize = get_max_message_size(),
493506
ConsumerTimeout = get_consumer_timeout(),
494507
State = #ch{cfg = #conf{state = starting,
@@ -691,6 +704,9 @@ handle_cast({send_drained, CTagCredit},
691704
|| {ConsumerTag, CreditDrained} <- CTagCredit],
692705
noreply(State);
693706

707+
% Note: https://www.pivotaltracker.com/story/show/166962656
708+
% This event is necessary for the stats timer to be initialized with
709+
% the correct values once the management agent has started
694710
handle_cast({force_event_refresh, Ref}, State) ->
695711
rabbit_event:notify(channel_created, infos(?CREATION_EVENT_KEYS, State),
696712
Ref),
@@ -838,6 +854,10 @@ handle_info({{Ref, Node}, LateAnswer},
838854
noreply(State);
839855

840856
handle_info(tick, State0 = #ch{queue_states = QueueStates0}) ->
857+
case get(permission_cache_can_expire) of
858+
true -> ok = clear_permission_cache();
859+
_ -> ok
860+
end,
841861
QueueStates1 =
842862
maps:filter(fun(_, QS) ->
843863
QName = rabbit_quorum_queue:queue_name(QS),
@@ -850,7 +870,10 @@ handle_info(tick, State0 = #ch{queue_states = QueueStates0}) ->
850870
Return
851871
end;
852872
handle_info({channel_source, Source}, State = #ch{cfg = Cfg}) ->
853-
noreply(State#ch{cfg = Cfg#conf{source = Source}}).
873+
noreply(State#ch{cfg = Cfg#conf{source = Source}});
874+
handle_info({update_user_state, User}, State = #ch{cfg = Cfg}) ->
875+
noreply(State#ch{cfg = Cfg#conf{user = User}}).
876+
854877

855878
handle_pre_hibernate(State0) ->
856879
ok = clear_permission_cache(),
@@ -973,7 +996,7 @@ return_queue_declare_ok(#resource{name = ActualName},
973996

974997
check_resource_access(User, Resource, Perm, Context) ->
975998
V = {Resource, Context, Perm},
976-
999+
9771000
Cache = case get(permission_cache) of
9781001
undefined -> [];
9791002
Other -> Other
@@ -1051,8 +1074,8 @@ check_topic_authorisation(_, _, _, _, _, _) ->
10511074
ok.
10521075

10531076
check_topic_authorisation(#exchange{name = Name = #resource{virtual_host = VHost}, type = topic},
1054-
User = #user{username = Username},
1055-
AmqpParams, RoutingKey, Permission) ->
1077+
User = #user{username = Username},
1078+
AmqpParams, RoutingKey, Permission) ->
10561079
Resource = Name#resource{kind = topic},
10571080
VariableMap = build_topic_variable_map(AmqpParams, VHost, Username),
10581081
Context = #{routing_key => RoutingKey,

src/rabbit_quorum_queue.erl

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@
3232
-export([rpc_delete_metrics/1]).
3333
-export([format/1]).
3434
-export([open_files/1]).
35-
-export([add_member/3]).
35+
-export([add_member/4]).
3636
-export([delete_member/3]).
3737
-export([requeue/3]).
3838
-export([policy_changed/2]).
@@ -69,6 +69,7 @@
6969
-define(RPC_TIMEOUT, 1000).
7070
-define(TICK_TIMEOUT, 5000). %% the ra server tick time
7171
-define(DELETE_TIMEOUT, 5000).
72+
-define(ADD_MEMBER_TIMEOUT, 5000).
7273

7374
%%----------------------------------------------------------------------------
7475

@@ -699,7 +700,7 @@ get_sys_status(Proc) ->
699700
end.
700701

701702

702-
add_member(VHost, Name, Node) ->
703+
add_member(VHost, Name, Node, Timeout) ->
703704
QName = #resource{virtual_host = VHost, name = Name, kind = queue},
704705
case rabbit_amqqueue:lookup(QName) of
705706
{ok, Q} when ?amqqueue_is_classic(Q) ->
@@ -715,14 +716,14 @@ add_member(VHost, Name, Node) ->
715716
%% idempotent by design
716717
ok;
717718
false ->
718-
add_member(Q, Node)
719+
add_member(Q, Node, Timeout)
719720
end
720721
end;
721722
{error, not_found} = E ->
722723
E
723724
end.
724725

725-
add_member(Q, Node) when ?amqqueue_is_quorum(Q) ->
726+
add_member(Q, Node, Timeout) when ?amqqueue_is_quorum(Q) ->
726727
{RaName, _} = ServerRef = amqqueue:get_pid(Q),
727728
QName = amqqueue:get_name(Q),
728729
QNodes = amqqueue:get_quorum_nodes(Q),
@@ -731,7 +732,7 @@ add_member(Q, Node) when ?amqqueue_is_quorum(Q) ->
731732
case ra:start_server(RaName, ServerId, ra_machine(Q),
732733
[{RaName, N} || N <- QNodes]) of
733734
ok ->
734-
case ra:add_member(ServerRef, ServerId) of
735+
case ra:add_member(ServerRef, ServerId, Timeout) of
735736
{ok, _, Leader} ->
736737
Fun = fun(Q1) ->
737738
Q2 = amqqueue:set_quorum_nodes(
@@ -743,6 +744,8 @@ add_member(Q, Node) when ?amqqueue_is_quorum(Q) ->
743744
fun() -> rabbit_amqqueue:update(QName, Fun) end),
744745
ok;
745746
{timeout, _} ->
747+
_ = ra:force_delete_server(ServerId),
748+
_ = ra:remove_member(ServerRef, ServerId),
746749
{error, timeout};
747750
E ->
748751
_ = ra:force_delete_server(ServerId),
@@ -828,7 +831,7 @@ grow(Node, VhostSpec, QueueSpec, Strategy) ->
828831
QName = amqqueue:get_name(Q),
829832
rabbit_log:info("~s: adding a new member (replica) on node ~w",
830833
[rabbit_misc:rs(QName), Node]),
831-
case add_member(Q, Node) of
834+
case add_member(Q, Node, ?ADD_MEMBER_TIMEOUT) of
832835
ok ->
833836
{QName, {ok, Size + 1}};
834837
{error, Err} ->

src/rabbit_reader.erl

Lines changed: 41 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -66,8 +66,6 @@
6666

6767
-export([conserve_resources/3, server_properties/1]).
6868

69-
-deprecated([{force_event_refresh, 2, eventually}]).
70-
7169
-define(NORMAL_TIMEOUT, 3).
7270
-define(CLOSING_TIMEOUT, 30).
7371
-define(CHANNEL_TERMINATION_TIMEOUT, 3).
@@ -220,6 +218,9 @@ info(Pid, Items) ->
220218

221219
-spec force_event_refresh(pid(), reference()) -> 'ok'.
222220

221+
% Note: https://www.pivotaltracker.com/story/show/166962656
222+
% This event is necessary for the stats timer to be initialized with
223+
% the correct values once the management agent has started
223224
force_event_refresh(Pid, Ref) ->
224225
gen_server:cast(Pid, {force_event_refresh, Ref}).
225226

@@ -1273,6 +1274,44 @@ handle_method0(#'connection.close_ok'{},
12731274
State = #v1{connection_state = closed}) ->
12741275
self() ! terminate_connection,
12751276
State;
1277+
handle_method0(#'connection.update_secret'{new_secret = NewSecret, reason = Reason},
1278+
State = #v1{connection =
1279+
#connection{protocol = Protocol,
1280+
user = User = #user{username = Username},
1281+
log_name = ConnName} = Conn,
1282+
sock = Sock}) when ?IS_RUNNING(State) ->
1283+
rabbit_log_connection:debug(
1284+
"connection ~p (~s) of user '~s': "
1285+
"asked to update secret, reason: ~s~n",
1286+
[self(), dynamic_connection_name(ConnName), Username, Reason]),
1287+
case rabbit_access_control:update_state(User, NewSecret) of
1288+
{ok, User1} ->
1289+
%% User/auth backend state has been updated. Now we can propagate it to channels
1290+
%% asynchronously and return. All the channels have to do is to update their
1291+
%% own state.
1292+
%%
1293+
%% Any secret update errors coming from the authz backend will be handled in the other branch.
1294+
%% Therefore we optimistically do no error handling here. MK.
1295+
lists:foreach(fun(Ch) ->
1296+
rabbit_log:debug("Updating user/auth backend state for channel ~p", [Ch]),
1297+
_ = rabbit_channel:update_user_state(Ch, User1)
1298+
end, all_channels()),
1299+
ok = send_on_channel0(Sock, #'connection.update_secret_ok'{}, Protocol),
1300+
rabbit_log_connection:info(
1301+
"connection ~p (~s): "
1302+
"user '~s' updated secret, reason: ~s~n",
1303+
[self(), dynamic_connection_name(ConnName), Username, Reason]),
1304+
State#v1{connection = Conn#connection{user = User1}};
1305+
{refused, Message} ->
1306+
rabbit_log_connection:error("Secret update was refused for user '~p': ~p",
1307+
[Username, Message]),
1308+
rabbit_misc:protocol_error(not_allowed, "New secret was refused by one of the backends", []);
1309+
{error, Message} ->
1310+
rabbit_log_connection:error("Secret update for user '~p' failed: ~p",
1311+
[Username, Message]),
1312+
rabbit_misc:protocol_error(not_allowed,
1313+
"Secret update failed", [])
1314+
end;
12761315
handle_method0(_Method, State) when ?IS_STOPPING(State) ->
12771316
State;
12781317
handle_method0(_Method, #v1{connection_state = S}) ->

0 commit comments

Comments
 (0)