Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion deps/rabbit/src/rabbit_access_control.erl
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,7 @@ check_user_id0(ClaimedUserName, #user{username = ActualUserName,
end.

-spec update_state(User :: rabbit_types:user(), NewState :: term()) ->
{'ok', rabbit_types:auth_user()} |
{'ok', rabbit_types:user()} |
{'refused', string()} |
{'error', any()}.

Expand Down
14 changes: 13 additions & 1 deletion deps/rabbit/src/rabbit_amqp_management.erl
Original file line number Diff line number Diff line change
Expand Up @@ -381,7 +381,19 @@ handle_http_req(<<"GET">>,
Bindings0 = rabbit_binding:list_for_source_and_destination(SrcXName, DstName),
Bindings = [B || B = #binding{key = K} <- Bindings0, K =:= Key],
RespPayload = encode_bindings(Bindings),
{<<"200">>, RespPayload, PermCaches}.
{<<"200">>, RespPayload, PermCaches};

handle_http_req(<<"PUT">>,
[<<"auth">>, <<"tokens">>],
_Query,
ReqPayload,
_Vhost,
_User,
ConnPid,
PermCaches) ->
{binary, Token} = ReqPayload,
ok = rabbit_amqp_reader:set_credential(ConnPid, Token),
{<<"204">>, null, PermCaches}.

decode_queue({map, KVList}) ->
M = lists:foldl(
Expand Down
121 changes: 75 additions & 46 deletions deps/rabbit/src/rabbit_amqp_reader.erl
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,15 @@

-module(rabbit_amqp_reader).

-include_lib("kernel/include/logger.hrl").
-include_lib("rabbit_common/include/rabbit.hrl").
-include_lib("amqp10_common/include/amqp10_types.hrl").
-include("rabbit_amqp.hrl").

-export([init/1,
info/2,
mainloop/2]).
mainloop/2,
set_credential/2]).

-export([system_continue/3,
system_terminate/4,
Expand Down Expand Up @@ -53,6 +55,7 @@
channel_max :: non_neg_integer(),
auth_mechanism :: sasl_init_unprocessed | {binary(), module()},
auth_state :: term(),
credential_timer :: undefined | reference(),
properties :: undefined | {map, list(tuple())}
}).

Expand Down Expand Up @@ -139,6 +142,11 @@ server_properties() ->
Props = [{{symbol, <<"node">>}, {utf8, atom_to_binary(node())}} | Props1],
{map, Props}.

-spec set_credential(pid(), binary()) -> ok.
set_credential(Pid, Credential) ->
Pid ! {set_credential, Credential},
ok.

%%--------------------------------------------------------------------------

inet_op(F) -> rabbit_misc:throw_on_error(inet_error, F).
Expand Down Expand Up @@ -243,6 +251,8 @@ handle_other({'$gen_cast', {force_event_refresh, _Ref}}, State) ->
State;
handle_other(terminate_connection, _State) ->
stop;
handle_other({set_credential, Cred}, State) ->
set_credential0(Cred, State);
handle_other(credential_expired, State) ->
Error = error_frame(?V_1_0_AMQP_ERROR_UNAUTHORIZED_ACCESS, "credential expired", []),
handle_exception(State, 0, Error);
Expand Down Expand Up @@ -320,16 +330,14 @@ error_frame(Condition, Fmt, Args) ->

handle_exception(State = #v1{connection_state = closed}, Channel,
#'v1_0.error'{description = {utf8, Desc}}) ->
rabbit_log_connection:error(
"Error on AMQP 1.0 connection ~tp (~tp), channel number ~b:~n~tp",
[self(), closed, Channel, Desc]),
?LOG_ERROR("Error on AMQP 1.0 connection ~tp (~tp), channel number ~b:~n~tp",
[self(), closed, Channel, Desc]),
State;
handle_exception(State = #v1{connection_state = CS}, Channel,
Error = #'v1_0.error'{description = {utf8, Desc}})
when ?IS_RUNNING(State) orelse CS =:= closing ->
rabbit_log_connection:error(
"Error on AMQP 1.0 connection ~tp (~tp), channel number ~b:~n~tp",
[self(), CS, Channel, Desc]),
?LOG_ERROR("Error on AMQP 1.0 connection ~tp (~tp), channel number ~b:~n~tp",
[self(), CS, Channel, Desc]),
close(Error, State);
handle_exception(State, _Channel, Error) ->
silent_close_delay(),
Expand Down Expand Up @@ -416,21 +424,23 @@ handle_connection_frame(
},
helper_sup = HelperSupPid,
sock = Sock} = State0) ->
logger:update_process_metadata(#{amqp_container => ContainerId}),
Vhost = vhost(Hostname),
logger:update_process_metadata(#{amqp_container => ContainerId,
vhost => Vhost,
user => Username}),
ok = check_user_loopback(State0),
ok = check_vhost_exists(Vhost, State0),
ok = check_vhost_alive(Vhost),
ok = rabbit_access_control:check_vhost_access(User, Vhost, {socket, Sock}, #{}),
ok = check_vhost_connection_limit(Vhost, Username),
ok = check_user_connection_limit(Username),
ok = ensure_credential_expiry_timer(User),
Timer = maybe_start_credential_expiry_timer(User),
rabbit_core_metrics:auth_attempt_succeeded(<<>>, Username, amqp10),
notify_auth(user_authentication_success, Username, State0),
rabbit_log_connection:info(
"Connection from AMQP 1.0 container '~ts': user '~ts' authenticated "
"using SASL mechanism ~s and granted access to vhost '~ts'",
[ContainerId, Username, Mechanism, Vhost]),
?LOG_INFO(
"Connection from AMQP 1.0 container '~ts': user '~ts' authenticated "
"using SASL mechanism ~s and granted access to vhost '~ts'",
[ContainerId, Username, Mechanism, Vhost]),

OutgoingMaxFrameSize = case ClientMaxFrame of
undefined ->
Expand Down Expand Up @@ -499,17 +509,18 @@ handle_connection_frame(
outgoing_max_frame_size = OutgoingMaxFrameSize,
channel_max = EffectiveChannelMax,
properties = Properties,
timeout = ReceiveTimeoutMillis},
timeout = ReceiveTimeoutMillis,
credential_timer = Timer},
heartbeater = Heartbeater},
State = start_writer(State1),
HostnameVal = case Hostname of
undefined -> undefined;
null -> undefined;
{utf8, Val} -> Val
end,
rabbit_log:debug(
"AMQP 1.0 connection.open frame: hostname = ~ts, extracted vhost = ~ts, idle-time-out = ~p",
[HostnameVal, Vhost, IdleTimeout]),
?LOG_DEBUG(
"AMQP 1.0 connection.open frame: hostname = ~ts, extracted vhost = ~ts, idle-time-out = ~p",
[HostnameVal, Vhost, IdleTimeout]),

Infos = infos(?CONNECTION_EVENT_KEYS, State),
ok = rabbit_core_metrics:connection_created(
Expand Down Expand Up @@ -768,16 +779,16 @@ notify_auth(EventType, Username, State) ->
rabbit_event:notify(EventType, EventProps).

track_channel(ChannelNum, SessionPid, #v1{tracked_channels = Channels} = State) ->
rabbit_log:debug("AMQP 1.0 created session process ~p for channel number ~b",
[SessionPid, ChannelNum]),
?LOG_DEBUG("AMQP 1.0 created session process ~p for channel number ~b",
[SessionPid, ChannelNum]),
_Ref = erlang:monitor(process, SessionPid, [{tag, {'DOWN', ChannelNum}}]),
State#v1{tracked_channels = maps:put(ChannelNum, SessionPid, Channels)}.

untrack_channel(ChannelNum, SessionPid, #v1{tracked_channels = Channels0} = State) ->
case maps:take(ChannelNum, Channels0) of
{SessionPid, Channels} ->
rabbit_log:debug("AMQP 1.0 closed session process ~p with channel number ~b",
[SessionPid, ChannelNum]),
?LOG_DEBUG("AMQP 1.0 closed session process ~p with channel number ~b",
[SessionPid, ChannelNum]),
State#v1{tracked_channels = Channels};
_ ->
State
Expand Down Expand Up @@ -871,39 +882,57 @@ check_user_connection_limit(Username) ->
end.


%% TODO Provide a means for the client to refresh the credential.
%% This could be either via:
%% 1. SASL (if multiple authentications are allowed on the same AMQP 1.0 connection), see
%% https://datatracker.ietf.org/doc/html/rfc4422#section-3.8 , or
%% 2. Claims Based Security (CBS) extension, see https://docs.oasis-open.org/amqp/amqp-cbs/v1.0/csd01/amqp-cbs-v1.0-csd01.html
%% and https://github.com/rabbitmq/rabbitmq-server/issues/9259
%% 3. Simpler variation of 2. where a token is put to a special /token node.
%%
%% If the user does not refresh their credential on time (the only implementation currently),
%% close the entire connection as we must assume that vhost access could have been revoked.
%%
%% If the user refreshes their credential on time (to be implemented), the AMQP reader should
%% 1. rabbit_access_control:check_vhost_access/4
%% 2. send a message to all its sessions which should then erase the permission caches and
%% re-check all link permissions (i.e. whether reading / writing to exchanges / queues is still allowed).
%% 3. cancel the current timer, and set a new timer
%% similary as done for Stream connections, see https://github.com/rabbitmq/rabbitmq-server/issues/10292
ensure_credential_expiry_timer(User) ->
set_credential0(Cred,
State = #v1{connection = #v1_connection{
user = User0,
vhost = Vhost,
credential_timer = OldTimer} = Conn,
tracked_channels = Chans,
sock = Sock}) ->
?LOG_INFO("updating credential", []),
case rabbit_access_control:update_state(User0, Cred) of
{ok, User} ->
try rabbit_access_control:check_vhost_access(User, Vhost, {socket, Sock}, #{}) of
ok ->
maps:foreach(fun(_ChanNum, Pid) ->
rabbit_amqp_session:reset_authz(Pid, User)
end, Chans),
case OldTimer of
undefined -> ok;
Ref -> ok = erlang:cancel_timer(Ref, [{info, false}])
end,
NewTimer = maybe_start_credential_expiry_timer(User),
State#v1{connection = Conn#v1_connection{
user = User,
credential_timer = NewTimer}}
catch _:Reason ->
Error = error_frame(?V_1_0_AMQP_ERROR_UNAUTHORIZED_ACCESS,
"access to vhost ~s failed for new credential: ~p",
[Vhost, Reason]),
handle_exception(State, 0, Error)
end;
Err ->
Error = error_frame(?V_1_0_AMQP_ERROR_UNAUTHORIZED_ACCESS,
"credential update failed: ~p",
[Err]),
handle_exception(State, 0, Error)
end.

maybe_start_credential_expiry_timer(User) ->
case rabbit_access_control:expiry_timestamp(User) of
never ->
ok;
undefined;
Ts when is_integer(Ts) ->
Time = (Ts - os:system_time(second)) * 1000,
rabbit_log:debug(
"Credential expires in ~b ms frow now (absolute timestamp = ~b seconds since epoch)",
[Time, Ts]),
?LOG_DEBUG(
"credential expires in ~b ms frow now (absolute timestamp = ~b seconds since epoch)",
[Time, Ts]),
case Time > 0 of
true ->
_TimerRef = erlang:send_after(Time, self(), credential_expired),
ok;
erlang:send_after(Time, self(), credential_expired);
false ->
protocol_error(?V_1_0_AMQP_ERROR_UNAUTHORIZED_ACCESS,
"Credential expired ~b ms ago", [abs(Time)])
"credential expired ~b ms ago", [abs(Time)])
end
end.

Expand Down
68 changes: 56 additions & 12 deletions deps/rabbit/src/rabbit_amqp_session.erl
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

-behaviour(gen_server).

-include_lib("kernel/include/logger.hrl").
-include_lib("rabbit_common/include/rabbit.hrl").
-include_lib("amqp10_common/include/amqp10_types.hrl").
-include("rabbit_amqp.hrl").
Expand Down Expand Up @@ -90,7 +91,8 @@
list_local/0,
conserve_resources/3,
check_resource_access/4,
check_read_permitted_on_topic/4
check_read_permitted_on_topic/4,
reset_authz/2
]).

-export([init/1,
Expand Down Expand Up @@ -393,6 +395,10 @@ init({ReaderPid, WriterPid, ChannelNum, MaxFrameSize, User, Vhost, ConnName,
handle_max = ClientHandleMax}}) ->
process_flag(trap_exit, true),
rabbit_process_flag:adjust_for_message_handling_proc(),
logger:update_process_metadata(#{channel_number => ChannelNum,
connection => ConnName,
vhost => Vhost,
user => User#user.username}),

ok = pg:join(pg_scope(), self(), self()),
Alarms0 = rabbit_alarm:register(self(), {?MODULE, conserve_resources, []}),
Expand Down Expand Up @@ -480,6 +486,10 @@ list_local() ->
conserve_resources(Pid, Source, {_, Conserve, _}) ->
gen_server:cast(Pid, {conserve_resources, Source, Conserve}).

-spec reset_authz(pid(), rabbit_types:user()) -> ok.
reset_authz(Pid, User) ->
gen_server:cast(Pid, {reset_authz, User}).

handle_call(Msg, _From, State) ->
Reply = {error, {not_understood, Msg}},
reply(Reply, State).
Expand Down Expand Up @@ -574,15 +584,26 @@ handle_cast({conserve_resources, Alarm, Conserve},
noreply(State);
handle_cast(refresh_config, #state{cfg = #cfg{vhost = Vhost} = Cfg} = State0) ->
State = State0#state{cfg = Cfg#cfg{trace_state = rabbit_trace:init(Vhost)}},
noreply(State).
noreply(State);
handle_cast({reset_authz, User}, #state{cfg = Cfg} = State0) ->
State1 = State0#state{
permission_cache = [],
topic_permission_cache = [],
cfg = Cfg#cfg{user = User}},
try recheck_authz(State1) of
State ->
noreply(State)
catch exit:#'v1_0.error'{} = Error ->
log_error_and_close_session(Error, State1)
end.

log_error_and_close_session(
Error, State = #state{cfg = #cfg{reader_pid = ReaderPid,
writer_pid = WriterPid,
channel_num = Ch}}) ->
End = #'v1_0.end'{error = Error},
rabbit_log:warning("Closing session for connection ~p: ~tp",
[ReaderPid, Error]),
?LOG_WARNING("Closing session for connection ~p: ~tp",
[ReaderPid, Error]),
ok = rabbit_amqp_writer:send_command_sync(WriterPid, Ch, End),
{stop, {shutdown, Error}, State}.

Expand Down Expand Up @@ -869,8 +890,8 @@ destroy_outgoing_link(_, _, _, Acc) ->
Acc.

detach(Handle, Link, Error = #'v1_0.error'{}) ->
rabbit_log:warning("Detaching link handle ~b due to error: ~tp",
[Handle, Error]),
?LOG_WARNING("Detaching link handle ~b due to error: ~tp",
[Handle, Error]),
publisher_or_consumer_deleted(Link),
#'v1_0.detach'{handle = ?UINT(Handle),
closed = true,
Expand Down Expand Up @@ -961,8 +982,8 @@ handle_frame(#'v1_0.flow'{handle = Handle} = Flow,
%% "If set to a handle that is not currently associated with
%% an attached link, the recipient MUST respond by ending the
%% session with an unattached-handle session error." [2.7.4]
rabbit_log:warning(
"Received Flow frame for unknown link handle: ~tp", [Flow]),
?LOG_WARNING("Received Flow frame for unknown link handle: ~tp",
[Flow]),
protocol_error(
?V_1_0_SESSION_ERROR_UNATTACHED_HANDLE,
"Unattached link handle: ~b", [HandleInt])
Expand Down Expand Up @@ -2141,9 +2162,9 @@ handle_deliver(ConsumerTag, AckRequired,
outgoing_links = OutgoingLinks};
_ ->
%% TODO handle missing link -- why does the queue think it's there?
rabbit_log:warning(
"No link handle ~b exists for delivery with consumer tag ~p from queue ~tp",
[Handle, ConsumerTag, QName]),
?LOG_WARNING(
"No link handle ~b exists for delivery with consumer tag ~p from queue ~tp",
[Handle, ConsumerTag, QName]),
State
end.

Expand Down Expand Up @@ -2988,7 +3009,7 @@ credit_reply_timeout(QType, QName) ->
Fmt = "Timed out waiting for credit reply from ~s ~s. "
"Hint: Enable feature flag rabbitmq_4.0.0",
Args = [QType, rabbit_misc:rs(QName)],
rabbit_log:error(Fmt, Args),
?LOG_ERROR(Fmt, Args),
protocol_error(?V_1_0_AMQP_ERROR_INTERNAL_ERROR, Fmt, Args).

default(undefined, Default) -> Default;
Expand Down Expand Up @@ -3522,6 +3543,29 @@ check_topic_authorisation(#exchange{type = topic,
check_topic_authorisation(_, _, _, _, Cache) ->
Cache.

recheck_authz(#state{incoming_links = IncomingLinks,
outgoing_links = OutgoingLinks,
permission_cache = Cache0,
cfg = #cfg{user = User}
} = State) ->
?LOG_DEBUG("rechecking link authorizations", []),
Cache1 = maps:fold(
fun(_Handle, #incoming_link{exchange = X}, Cache) ->
case X of
#exchange{name = XName} ->
check_resource_access(XName, write, User, Cache);
#resource{} = XName ->
check_resource_access(XName, write, User, Cache);
to ->
Cache
end
end, Cache0, IncomingLinks),
Cache2 = maps:fold(
fun(_Handle, #outgoing_link{queue_name = QName}, Cache) ->
check_resource_access(QName, read, User, Cache)
end, Cache1, OutgoingLinks),
State#state{permission_cache = Cache2}.

check_user_id(Mc, User) ->
case rabbit_access_control:check_user_id(Mc, User) of
ok ->
Expand Down
Loading
Loading