diff --git a/deps/rabbit/src/rabbit_access_control.erl b/deps/rabbit/src/rabbit_access_control.erl index cfc8b591eb3f..305a3b743f0f 100644 --- a/deps/rabbit/src/rabbit_access_control.erl +++ b/deps/rabbit/src/rabbit_access_control.erl @@ -249,7 +249,7 @@ check_user_id0(ClaimedUserName, #user{username = ActualUserName, end. -spec update_state(User :: rabbit_types:user(), NewState :: term()) -> - {'ok', rabbit_types:auth_user()} | + {'ok', rabbit_types:user()} | {'refused', string()} | {'error', any()}. diff --git a/deps/rabbit/src/rabbit_amqp_management.erl b/deps/rabbit/src/rabbit_amqp_management.erl index e4555e806033..9cd2669f57b1 100644 --- a/deps/rabbit/src/rabbit_amqp_management.erl +++ b/deps/rabbit/src/rabbit_amqp_management.erl @@ -381,7 +381,19 @@ handle_http_req(<<"GET">>, Bindings0 = rabbit_binding:list_for_source_and_destination(SrcXName, DstName), Bindings = [B || B = #binding{key = K} <- Bindings0, K =:= Key], RespPayload = encode_bindings(Bindings), - {<<"200">>, RespPayload, PermCaches}. + {<<"200">>, RespPayload, PermCaches}; + +handle_http_req(<<"PUT">>, + [<<"auth">>, <<"tokens">>], + _Query, + ReqPayload, + _Vhost, + _User, + ConnPid, + PermCaches) -> + {binary, Token} = ReqPayload, + ok = rabbit_amqp_reader:set_credential(ConnPid, Token), + {<<"204">>, null, PermCaches}. decode_queue({map, KVList}) -> M = lists:foldl( diff --git a/deps/rabbit/src/rabbit_amqp_reader.erl b/deps/rabbit/src/rabbit_amqp_reader.erl index bcfa6a1dcc8c..070205fa0b64 100644 --- a/deps/rabbit/src/rabbit_amqp_reader.erl +++ b/deps/rabbit/src/rabbit_amqp_reader.erl @@ -7,13 +7,15 @@ -module(rabbit_amqp_reader). +-include_lib("kernel/include/logger.hrl"). -include_lib("rabbit_common/include/rabbit.hrl"). -include_lib("amqp10_common/include/amqp10_types.hrl"). -include("rabbit_amqp.hrl"). -export([init/1, info/2, - mainloop/2]). + mainloop/2, + set_credential/2]). -export([system_continue/3, system_terminate/4, @@ -53,6 +55,7 @@ channel_max :: non_neg_integer(), auth_mechanism :: sasl_init_unprocessed | {binary(), module()}, auth_state :: term(), + credential_timer :: undefined | reference(), properties :: undefined | {map, list(tuple())} }). @@ -139,6 +142,11 @@ server_properties() -> Props = [{{symbol, <<"node">>}, {utf8, atom_to_binary(node())}} | Props1], {map, Props}. +-spec set_credential(pid(), binary()) -> ok. +set_credential(Pid, Credential) -> + Pid ! {set_credential, Credential}, + ok. + %%-------------------------------------------------------------------------- inet_op(F) -> rabbit_misc:throw_on_error(inet_error, F). @@ -243,6 +251,8 @@ handle_other({'$gen_cast', {force_event_refresh, _Ref}}, State) -> State; handle_other(terminate_connection, _State) -> stop; +handle_other({set_credential, Cred}, State) -> + set_credential0(Cred, State); handle_other(credential_expired, State) -> Error = error_frame(?V_1_0_AMQP_ERROR_UNAUTHORIZED_ACCESS, "credential expired", []), handle_exception(State, 0, Error); @@ -320,16 +330,14 @@ error_frame(Condition, Fmt, Args) -> handle_exception(State = #v1{connection_state = closed}, Channel, #'v1_0.error'{description = {utf8, Desc}}) -> - rabbit_log_connection:error( - "Error on AMQP 1.0 connection ~tp (~tp), channel number ~b:~n~tp", - [self(), closed, Channel, Desc]), + ?LOG_ERROR("Error on AMQP 1.0 connection ~tp (~tp), channel number ~b:~n~tp", + [self(), closed, Channel, Desc]), State; handle_exception(State = #v1{connection_state = CS}, Channel, Error = #'v1_0.error'{description = {utf8, Desc}}) when ?IS_RUNNING(State) orelse CS =:= closing -> - rabbit_log_connection:error( - "Error on AMQP 1.0 connection ~tp (~tp), channel number ~b:~n~tp", - [self(), CS, Channel, Desc]), + ?LOG_ERROR("Error on AMQP 1.0 connection ~tp (~tp), channel number ~b:~n~tp", + [self(), CS, Channel, Desc]), close(Error, State); handle_exception(State, _Channel, Error) -> silent_close_delay(), @@ -416,21 +424,23 @@ handle_connection_frame( }, helper_sup = HelperSupPid, sock = Sock} = State0) -> - logger:update_process_metadata(#{amqp_container => ContainerId}), Vhost = vhost(Hostname), + logger:update_process_metadata(#{amqp_container => ContainerId, + vhost => Vhost, + user => Username}), ok = check_user_loopback(State0), ok = check_vhost_exists(Vhost, State0), ok = check_vhost_alive(Vhost), ok = rabbit_access_control:check_vhost_access(User, Vhost, {socket, Sock}, #{}), ok = check_vhost_connection_limit(Vhost, Username), ok = check_user_connection_limit(Username), - ok = ensure_credential_expiry_timer(User), + Timer = maybe_start_credential_expiry_timer(User), rabbit_core_metrics:auth_attempt_succeeded(<<>>, Username, amqp10), notify_auth(user_authentication_success, Username, State0), - rabbit_log_connection:info( - "Connection from AMQP 1.0 container '~ts': user '~ts' authenticated " - "using SASL mechanism ~s and granted access to vhost '~ts'", - [ContainerId, Username, Mechanism, Vhost]), + ?LOG_INFO( + "Connection from AMQP 1.0 container '~ts': user '~ts' authenticated " + "using SASL mechanism ~s and granted access to vhost '~ts'", + [ContainerId, Username, Mechanism, Vhost]), OutgoingMaxFrameSize = case ClientMaxFrame of undefined -> @@ -499,7 +509,8 @@ handle_connection_frame( outgoing_max_frame_size = OutgoingMaxFrameSize, channel_max = EffectiveChannelMax, properties = Properties, - timeout = ReceiveTimeoutMillis}, + timeout = ReceiveTimeoutMillis, + credential_timer = Timer}, heartbeater = Heartbeater}, State = start_writer(State1), HostnameVal = case Hostname of @@ -507,9 +518,9 @@ handle_connection_frame( null -> undefined; {utf8, Val} -> Val end, - rabbit_log:debug( - "AMQP 1.0 connection.open frame: hostname = ~ts, extracted vhost = ~ts, idle-time-out = ~p", - [HostnameVal, Vhost, IdleTimeout]), + ?LOG_DEBUG( + "AMQP 1.0 connection.open frame: hostname = ~ts, extracted vhost = ~ts, idle-time-out = ~p", + [HostnameVal, Vhost, IdleTimeout]), Infos = infos(?CONNECTION_EVENT_KEYS, State), ok = rabbit_core_metrics:connection_created( @@ -768,16 +779,16 @@ notify_auth(EventType, Username, State) -> rabbit_event:notify(EventType, EventProps). track_channel(ChannelNum, SessionPid, #v1{tracked_channels = Channels} = State) -> - rabbit_log:debug("AMQP 1.0 created session process ~p for channel number ~b", - [SessionPid, ChannelNum]), + ?LOG_DEBUG("AMQP 1.0 created session process ~p for channel number ~b", + [SessionPid, ChannelNum]), _Ref = erlang:monitor(process, SessionPid, [{tag, {'DOWN', ChannelNum}}]), State#v1{tracked_channels = maps:put(ChannelNum, SessionPid, Channels)}. untrack_channel(ChannelNum, SessionPid, #v1{tracked_channels = Channels0} = State) -> case maps:take(ChannelNum, Channels0) of {SessionPid, Channels} -> - rabbit_log:debug("AMQP 1.0 closed session process ~p with channel number ~b", - [SessionPid, ChannelNum]), + ?LOG_DEBUG("AMQP 1.0 closed session process ~p with channel number ~b", + [SessionPid, ChannelNum]), State#v1{tracked_channels = Channels}; _ -> State @@ -871,39 +882,57 @@ check_user_connection_limit(Username) -> end. -%% TODO Provide a means for the client to refresh the credential. -%% This could be either via: -%% 1. SASL (if multiple authentications are allowed on the same AMQP 1.0 connection), see -%% https://datatracker.ietf.org/doc/html/rfc4422#section-3.8 , or -%% 2. Claims Based Security (CBS) extension, see https://docs.oasis-open.org/amqp/amqp-cbs/v1.0/csd01/amqp-cbs-v1.0-csd01.html -%% and https://github.com/rabbitmq/rabbitmq-server/issues/9259 -%% 3. Simpler variation of 2. where a token is put to a special /token node. -%% -%% If the user does not refresh their credential on time (the only implementation currently), -%% close the entire connection as we must assume that vhost access could have been revoked. -%% -%% If the user refreshes their credential on time (to be implemented), the AMQP reader should -%% 1. rabbit_access_control:check_vhost_access/4 -%% 2. send a message to all its sessions which should then erase the permission caches and -%% re-check all link permissions (i.e. whether reading / writing to exchanges / queues is still allowed). -%% 3. cancel the current timer, and set a new timer -%% similary as done for Stream connections, see https://github.com/rabbitmq/rabbitmq-server/issues/10292 -ensure_credential_expiry_timer(User) -> +set_credential0(Cred, + State = #v1{connection = #v1_connection{ + user = User0, + vhost = Vhost, + credential_timer = OldTimer} = Conn, + tracked_channels = Chans, + sock = Sock}) -> + ?LOG_INFO("updating credential", []), + case rabbit_access_control:update_state(User0, Cred) of + {ok, User} -> + try rabbit_access_control:check_vhost_access(User, Vhost, {socket, Sock}, #{}) of + ok -> + maps:foreach(fun(_ChanNum, Pid) -> + rabbit_amqp_session:reset_authz(Pid, User) + end, Chans), + case OldTimer of + undefined -> ok; + Ref -> ok = erlang:cancel_timer(Ref, [{info, false}]) + end, + NewTimer = maybe_start_credential_expiry_timer(User), + State#v1{connection = Conn#v1_connection{ + user = User, + credential_timer = NewTimer}} + catch _:Reason -> + Error = error_frame(?V_1_0_AMQP_ERROR_UNAUTHORIZED_ACCESS, + "access to vhost ~s failed for new credential: ~p", + [Vhost, Reason]), + handle_exception(State, 0, Error) + end; + Err -> + Error = error_frame(?V_1_0_AMQP_ERROR_UNAUTHORIZED_ACCESS, + "credential update failed: ~p", + [Err]), + handle_exception(State, 0, Error) + end. + +maybe_start_credential_expiry_timer(User) -> case rabbit_access_control:expiry_timestamp(User) of never -> - ok; + undefined; Ts when is_integer(Ts) -> Time = (Ts - os:system_time(second)) * 1000, - rabbit_log:debug( - "Credential expires in ~b ms frow now (absolute timestamp = ~b seconds since epoch)", - [Time, Ts]), + ?LOG_DEBUG( + "credential expires in ~b ms frow now (absolute timestamp = ~b seconds since epoch)", + [Time, Ts]), case Time > 0 of true -> - _TimerRef = erlang:send_after(Time, self(), credential_expired), - ok; + erlang:send_after(Time, self(), credential_expired); false -> protocol_error(?V_1_0_AMQP_ERROR_UNAUTHORIZED_ACCESS, - "Credential expired ~b ms ago", [abs(Time)]) + "credential expired ~b ms ago", [abs(Time)]) end end. diff --git a/deps/rabbit/src/rabbit_amqp_session.erl b/deps/rabbit/src/rabbit_amqp_session.erl index 81e4d88d071d..8e965aa8c8ee 100644 --- a/deps/rabbit/src/rabbit_amqp_session.erl +++ b/deps/rabbit/src/rabbit_amqp_session.erl @@ -11,6 +11,7 @@ -behaviour(gen_server). +-include_lib("kernel/include/logger.hrl"). -include_lib("rabbit_common/include/rabbit.hrl"). -include_lib("amqp10_common/include/amqp10_types.hrl"). -include("rabbit_amqp.hrl"). @@ -90,7 +91,8 @@ list_local/0, conserve_resources/3, check_resource_access/4, - check_read_permitted_on_topic/4 + check_read_permitted_on_topic/4, + reset_authz/2 ]). -export([init/1, @@ -393,6 +395,10 @@ init({ReaderPid, WriterPid, ChannelNum, MaxFrameSize, User, Vhost, ConnName, handle_max = ClientHandleMax}}) -> process_flag(trap_exit, true), rabbit_process_flag:adjust_for_message_handling_proc(), + logger:update_process_metadata(#{channel_number => ChannelNum, + connection => ConnName, + vhost => Vhost, + user => User#user.username}), ok = pg:join(pg_scope(), self(), self()), Alarms0 = rabbit_alarm:register(self(), {?MODULE, conserve_resources, []}), @@ -480,6 +486,10 @@ list_local() -> conserve_resources(Pid, Source, {_, Conserve, _}) -> gen_server:cast(Pid, {conserve_resources, Source, Conserve}). +-spec reset_authz(pid(), rabbit_types:user()) -> ok. +reset_authz(Pid, User) -> + gen_server:cast(Pid, {reset_authz, User}). + handle_call(Msg, _From, State) -> Reply = {error, {not_understood, Msg}}, reply(Reply, State). @@ -574,15 +584,26 @@ handle_cast({conserve_resources, Alarm, Conserve}, noreply(State); handle_cast(refresh_config, #state{cfg = #cfg{vhost = Vhost} = Cfg} = State0) -> State = State0#state{cfg = Cfg#cfg{trace_state = rabbit_trace:init(Vhost)}}, - noreply(State). + noreply(State); +handle_cast({reset_authz, User}, #state{cfg = Cfg} = State0) -> + State1 = State0#state{ + permission_cache = [], + topic_permission_cache = [], + cfg = Cfg#cfg{user = User}}, + try recheck_authz(State1) of + State -> + noreply(State) + catch exit:#'v1_0.error'{} = Error -> + log_error_and_close_session(Error, State1) + end. log_error_and_close_session( Error, State = #state{cfg = #cfg{reader_pid = ReaderPid, writer_pid = WriterPid, channel_num = Ch}}) -> End = #'v1_0.end'{error = Error}, - rabbit_log:warning("Closing session for connection ~p: ~tp", - [ReaderPid, Error]), + ?LOG_WARNING("Closing session for connection ~p: ~tp", + [ReaderPid, Error]), ok = rabbit_amqp_writer:send_command_sync(WriterPid, Ch, End), {stop, {shutdown, Error}, State}. @@ -869,8 +890,8 @@ destroy_outgoing_link(_, _, _, Acc) -> Acc. detach(Handle, Link, Error = #'v1_0.error'{}) -> - rabbit_log:warning("Detaching link handle ~b due to error: ~tp", - [Handle, Error]), + ?LOG_WARNING("Detaching link handle ~b due to error: ~tp", + [Handle, Error]), publisher_or_consumer_deleted(Link), #'v1_0.detach'{handle = ?UINT(Handle), closed = true, @@ -961,8 +982,8 @@ handle_frame(#'v1_0.flow'{handle = Handle} = Flow, %% "If set to a handle that is not currently associated with %% an attached link, the recipient MUST respond by ending the %% session with an unattached-handle session error." [2.7.4] - rabbit_log:warning( - "Received Flow frame for unknown link handle: ~tp", [Flow]), + ?LOG_WARNING("Received Flow frame for unknown link handle: ~tp", + [Flow]), protocol_error( ?V_1_0_SESSION_ERROR_UNATTACHED_HANDLE, "Unattached link handle: ~b", [HandleInt]) @@ -2141,9 +2162,9 @@ handle_deliver(ConsumerTag, AckRequired, outgoing_links = OutgoingLinks}; _ -> %% TODO handle missing link -- why does the queue think it's there? - rabbit_log:warning( - "No link handle ~b exists for delivery with consumer tag ~p from queue ~tp", - [Handle, ConsumerTag, QName]), + ?LOG_WARNING( + "No link handle ~b exists for delivery with consumer tag ~p from queue ~tp", + [Handle, ConsumerTag, QName]), State end. @@ -2988,7 +3009,7 @@ credit_reply_timeout(QType, QName) -> Fmt = "Timed out waiting for credit reply from ~s ~s. " "Hint: Enable feature flag rabbitmq_4.0.0", Args = [QType, rabbit_misc:rs(QName)], - rabbit_log:error(Fmt, Args), + ?LOG_ERROR(Fmt, Args), protocol_error(?V_1_0_AMQP_ERROR_INTERNAL_ERROR, Fmt, Args). default(undefined, Default) -> Default; @@ -3522,6 +3543,29 @@ check_topic_authorisation(#exchange{type = topic, check_topic_authorisation(_, _, _, _, Cache) -> Cache. +recheck_authz(#state{incoming_links = IncomingLinks, + outgoing_links = OutgoingLinks, + permission_cache = Cache0, + cfg = #cfg{user = User} + } = State) -> + ?LOG_DEBUG("rechecking link authorizations", []), + Cache1 = maps:fold( + fun(_Handle, #incoming_link{exchange = X}, Cache) -> + case X of + #exchange{name = XName} -> + check_resource_access(XName, write, User, Cache); + #resource{} = XName -> + check_resource_access(XName, write, User, Cache); + to -> + Cache + end + end, Cache0, IncomingLinks), + Cache2 = maps:fold( + fun(_Handle, #outgoing_link{queue_name = QName}, Cache) -> + check_resource_access(QName, read, User, Cache) + end, Cache1, OutgoingLinks), + State#state{permission_cache = Cache2}. + check_user_id(Mc, User) -> case rabbit_access_control:check_user_id(Mc, User) of ok -> diff --git a/deps/rabbit/src/rabbit_channel.erl b/deps/rabbit/src/rabbit_channel.erl index 8688f5e5e679..0d7bd5bf45d7 100644 --- a/deps/rabbit/src/rabbit_channel.erl +++ b/deps/rabbit/src/rabbit_channel.erl @@ -470,7 +470,7 @@ force_event_refresh(Ref) -> list_queue_states(Pid) -> gen_server2:call(Pid, list_queue_states). --spec update_user_state(pid(), rabbit_types:auth_user()) -> 'ok' | {error, channel_terminated}. +-spec update_user_state(pid(), rabbit_types:user()) -> 'ok' | {error, channel_terminated}. update_user_state(Pid, UserState) when is_pid(Pid) -> case erlang:is_process_alive(Pid) of diff --git a/deps/rabbitmq_amqp_client/src/rabbitmq_amqp_client.erl b/deps/rabbitmq_amqp_client/src/rabbitmq_amqp_client.erl index 0fde808151d8..ef385b6162e3 100644 --- a/deps/rabbitmq_amqp_client/src/rabbitmq_amqp_client.erl +++ b/deps/rabbitmq_amqp_client/src/rabbitmq_amqp_client.erl @@ -28,7 +28,9 @@ declare_exchange/3, bind_exchange/5, unbind_exchange/5, - delete_exchange/2 + delete_exchange/2, + + set_token/2 ]. -define(TIMEOUT, 20_000). @@ -381,6 +383,23 @@ delete_exchange(LinkPair, ExchangeName) -> Err end. +%% Renew OAuth 2.0 token. +-spec set_token(link_pair(), binary()) -> + ok | {error, term()}. +set_token(LinkPair, Token) -> + Props = #{subject => <<"PUT">>, + to => <<"/auth/tokens">>}, + Body = {binary, Token}, + case request(LinkPair, Props, Body) of + {ok, Resp} -> + case is_success(Resp) of + true -> ok; + false -> {error, Resp} + end; + Err -> + Err + end. + -spec request(link_pair(), amqp10_msg:amqp10_properties(), amqp10_prim()) -> {ok, Response :: amqp10_msg:amqp10_msg()} | {error, term()}. request(#link_pair{session = Session, diff --git a/deps/rabbitmq_auth_backend_oauth2/test/system_SUITE.erl b/deps/rabbitmq_auth_backend_oauth2/test/system_SUITE.erl index e17a76281411..8ba8eb33575a 100644 --- a/deps/rabbitmq_auth_backend_oauth2/test/system_SUITE.erl +++ b/deps/rabbitmq_auth_backend_oauth2/test/system_SUITE.erl @@ -11,6 +11,7 @@ -include_lib("common_test/include/ct.hrl"). -include_lib("amqp_client/include/amqp_client.hrl"). +-include_lib("amqp10_common/include/amqp10_framing.hrl"). -include_lib("eunit/include/eunit.hrl"). -import(rabbit_ct_client_helpers, [close_connection/1, close_channel/1, @@ -46,8 +47,7 @@ groups() -> more_than_one_resource_server_id_not_allowed_in_one_token, mqtt_expired_token, mqtt_expirable_token, - web_mqtt_expirable_token, - amqp_expirable_token + web_mqtt_expirable_token ]}, {token_refresh, [], [ @@ -73,7 +73,14 @@ groups() -> ]}, {rich_authorization_requests, [], [ test_successful_connection_with_rich_authorization_request_token - ]} + ]}, + {amqp, [shuffle], + [ + amqp_token_expire, + amqp_token_refresh_expire, + amqp_token_refresh_vhost_permission, + amqp_token_refresh_revoked_permissions + ]} ]. %% @@ -100,7 +107,9 @@ init_per_suite(Config) -> end_per_suite(Config) -> rabbit_ct_helpers:run_teardown_steps(Config, rabbit_ct_broker_helpers:teardown_steps()). - +init_per_group(amqp, Config) -> + {ok, _} = application:ensure_all_started(rabbitmq_amqp_client), + Config; init_per_group(_Group, Config) -> %% The broker is managed by {init,end}_per_testcase(). lists:foreach(fun(Value) -> @@ -109,6 +118,8 @@ init_per_group(_Group, Config) -> [<<"vhost1">>, <<"vhost2">>, <<"vhost3">>, <<"vhost4">>]), Config. +end_per_group(amqp, Config) -> + Config; end_per_group(_Group, Config) -> %% The broker is managed by {init,end}_per_testcase(). lists:foreach(fun(Value) -> @@ -500,29 +511,20 @@ mqtt_expirable_token0(Port, AdditionalOpts, Connect, Config) -> after Millis * 2 -> ct:fail("missing DISCONNECT packet from server") end. -amqp_expirable_token(Config) -> - {ok, _} = application:ensure_all_started(rabbitmq_amqp_client), - - Seconds = 4, +%% Test that RabbitMQ closes the AMQP 1.0 connection when the token expires. +amqp_token_expire(Config) -> + Seconds = 3, Millis = Seconds * 1000, {_Algo, Token} = generate_expirable_token(Config, - [<<"rabbitmq.configure:*/*">>, - <<"rabbitmq.write:*/*">>, - <<"rabbitmq.read:*/*">>], + [<<"rabbitmq.configure:%2F/*">>, + <<"rabbitmq.write:%2F/*">>, + <<"rabbitmq.read:%2F/*">>], Seconds), - %% Send and receive a message via AMQP 1.0. + %% Send and receive a message. + {Connection, Session, LinkPair} = amqp_init(Token, Config), QName = atom_to_binary(?FUNCTION_NAME), Address = rabbitmq_amqp_address:queue(QName), - Host = ?config(rmq_hostname, Config), - Port = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_amqp), - OpnConf = #{address => Host, - port => Port, - container_id => <<"my container">>, - sasl => {plain, <<"">>, Token}}, - {ok, Connection} = amqp10_client:open_connection(OpnConf), - {ok, Session} = amqp10_client:begin_session_sync(Connection), - {ok, LinkPair} = rabbitmq_amqp_client:attach_management_link_pair_sync(Session, <<"my link pair">>), {ok, _} = rabbitmq_amqp_client:declare_queue(LinkPair, QName, #{}), {ok, Sender} = amqp10_client:attach_sender_link(Session, <<"my sender">>, Address), receive {amqp10_event, {link, Sender, credited}} -> ok @@ -535,7 +537,53 @@ amqp_expirable_token(Config) -> {ok, Msg} = amqp10_client:get_msg(Receiver), ?assertEqual([Body], amqp10_msg:body(Msg)), - %% In 4 seconds from now, we expect that RabbitMQ disconnects us because our token expired. + %% In 3 seconds from now, we expect that RabbitMQ disconnects us because our token expired. + receive {amqp10_event, + {connection, Connection, + {closed, {unauthorized_access, <<"credential expired">>}}}} -> + ok + after Millis * 2 -> + ct:fail("server did not close our connection") + end. + +%% First, test the success case that an OAuth 2.0 token can be renewed via AMQP 1.0. +%% Second, test that the new token expires. +amqp_token_refresh_expire(Config) -> + Seconds = 3, + Millis = Seconds * 1000, + Scopes = [<<"rabbitmq.configure:%2F/*">>, + <<"rabbitmq.write:%2F/*">>, + <<"rabbitmq.read:%2F/*">>], + {_, Token1} = generate_expirable_token(Config, Scopes, Seconds), + + %% Send and receive a message. + {Connection, Session, LinkPair} = amqp_init(Token1, Config), + QName = atom_to_binary(?FUNCTION_NAME), + Address = rabbitmq_amqp_address:queue(QName), + {ok, _} = rabbitmq_amqp_client:declare_queue(LinkPair, QName, #{}), + {ok, Sender} = amqp10_client:attach_sender_link(Session, <<"my sender">>, Address), + receive {amqp10_event, {link, Sender, credited}} -> ok + after 5000 -> ct:fail({missing_event, ?LINE}) + end, + ok = amqp10_client:send_msg(Sender, amqp10_msg:new(<<"t1">>, <<"m1">>, true)), + {ok, Receiver} = amqp10_client:attach_receiver_link(Session, <<"my receiver">>, Address), + {ok, Msg1} = amqp10_client:get_msg(Receiver), + ?assertEqual([<<"m1">>], amqp10_msg:body(Msg1)), + + %% Renew token before the old one expires. + {_, Token2} = generate_expirable_token(Config, Scopes, Seconds * 2), + ok = rabbitmq_amqp_client:set_token(LinkPair, Token2), + + %% Wait until old token would have expired. + timer:sleep(Millis + 500), + + %% We should still be able to send and receive a message thanks to the new token. + ok = amqp10_client:send_msg(Sender, amqp10_msg:new(<<"t2">>, <<"m2">>, true)), + {ok, Msg2} = amqp10_client:get_msg(Receiver), + ?assertEqual([<<"m2">>], amqp10_msg:body(Msg2)), + + %% In 2.5 seconds from now, we expect that RabbitMQ + %% disconnects us because the new token should expire. receive {amqp10_event, {connection, Connection, {closed, {unauthorized_access, <<"credential expired">>}}}} -> @@ -544,6 +592,178 @@ amqp_expirable_token(Config) -> ct:fail("server did not close our connection") end. +%% Test that RabbitMQ closes the AMQP 1.0 connection if the client +%% submits a new token without any permission to the vhost. +amqp_token_refresh_vhost_permission(Config) -> + {_, Token1} = generate_valid_token(Config), + {Connection, _Session, LinkPair} = amqp_init(Token1, Config), + + {_, Token2} = generate_valid_token(Config, + [<<"rabbitmq.configure:wrongvhost/*">>, + <<"rabbitmq.write:wrongvhost/*">>, + <<"rabbitmq.read:wrongvhost/*">>]), + ok = rabbitmq_amqp_client:set_token(LinkPair, Token2), + receive {amqp10_event, + {connection, Connection, + {closed, {unauthorized_access, Reason}}}} -> + ?assertMatch(<<"access to vhost / failed for new credential:", _/binary>>, + Reason) + after 5000 -> ct:fail({missing_event, ?LINE}) + end. + +%% Test that RabbitMQ closes AMQP 1.0 sessions if the client +%% submits a new token with reduced permissions. +amqp_token_refresh_revoked_permissions(Config) -> + {_, Token1} = generate_expirable_token(Config, + [<<"rabbitmq.configure:%2F/*/*">>, + <<"rabbitmq.write:%2F/*/*">>, + <<"rabbitmq.read:%2F/*/*">>], + 30), + {Connection, Session1, LinkPair} = amqp_init(Token1, Config), + {ok, Session2} = amqp10_client:begin_session_sync(Connection), + {ok, Session3} = amqp10_client:begin_session_sync(Connection), + {ok, Session4} = amqp10_client:begin_session_sync(Connection), + {ok, Session5} = amqp10_client:begin_session_sync(Connection), + {ok, Session6} = amqp10_client:begin_session_sync(Connection), + + {ok, Sender2} = amqp10_client:attach_sender_link_sync( + Session2, <<"sender 2">>, + rabbitmq_amqp_address:exchange(<<"amq.fanout">>)), + receive {amqp10_event, {link, Sender2, credited}} -> ok + after 5000 -> ct:fail({missing_event, ?LINE}) + end, + + QName = <<"q1">>, + {ok, _} = rabbitmq_amqp_client:declare_queue(LinkPair, QName, #{}), + ok = rabbitmq_amqp_client:bind_queue(LinkPair, QName, <<"amq.topic">>, <<"#">>, #{}), + {ok, Receiver3} = amqp10_client:attach_receiver_link( + Session3, <<"receiver 3">>, rabbitmq_amqp_address:queue(QName)), + receive {amqp10_event, {link, Receiver3, attached}} -> ok + after 5000 -> ct:fail({missing_event, ?LINE}) + end, + + {ok, Sender4} = amqp10_client:attach_sender_link_sync(Session4, <<"sender 4">>, null), + receive {amqp10_event, {link, Sender4, credited}} -> ok + after 5000 -> ct:fail({missing_event, ?LINE}) + end, + ok = amqp10_client:send_msg( + Sender4, + amqp10_msg:set_properties( + #{to => rabbitmq_amqp_address:queue(QName)}, + amqp10_msg:new(<<"t4">>, <<"m4a">>))), + receive {amqp10_disposition, {accepted, <<"t4">>}} -> ok + after 5000 -> ct:fail({settled_timeout, <<"t4">>}) + end, + + {ok, Sender5} = amqp10_client:attach_sender_link_sync(Session5, <<"sender 5">>, null), + receive {amqp10_event, {link, Sender5, credited}} -> ok + after 5000 -> ct:fail({missing_event, ?LINE}) + end, + ok = amqp10_client:send_msg( + Sender5, + amqp10_msg:set_properties( + #{to => rabbitmq_amqp_address:exchange(<<"amq.topic">>, <<"topic-1">>)}, + amqp10_msg:new(<<"t5">>, <<"m5a">>))), + receive {amqp10_disposition, {accepted, <<"t5">>}} -> ok + after 5000 -> ct:fail({settled_timeout, <<"t5">>}) + end, + + XName = <<"e1">>, + ok = rabbitmq_amqp_client:declare_exchange(LinkPair, XName, #{type => <<"fanout">>}), + {ok, Sender6} = amqp10_client:attach_sender_link_sync( + Session6, <<"sender 6">>, + rabbitmq_amqp_address:exchange(XName)), + receive {amqp10_event, {link, Sender6, credited}} -> ok + after 5000 -> ct:fail({missing_event, ?LINE}) + end, + + %% Revoke the previous granted permissions on the default vhost. + {_, Token2} = generate_expirable_token( + Config, + [ + %% Set configure access on q1 and e1 so that we can delete this queue and exchange later. + <<"rabbitmq.configure:%2F/*1/nope">>, + %% Set write access on amq.topic so that we can test the revoked topic permission. + <<"rabbitmq.write:%2F/amq.topic/nope">>, + <<"rabbitmq.read:%2F/nope/nope">>], + 30), + flush(<<"setting token...">>), + ok = rabbitmq_amqp_client:set_token(LinkPair, Token2), + + %% We expect RabbitMQ to close Session2 because we are no longer allowed to write to exchange amq.fanout. + receive + {amqp10_event, + {session, Session2, + {ended, + #'v1_0.error'{ + condition = ?V_1_0_AMQP_ERROR_UNAUTHORIZED_ACCESS, + description = {utf8, <<"write access to exchange 'amq.fanout' in vhost '/' refused", _/binary>>}}}}} -> ok + after 5000 -> ct:fail({missing_event, ?LINE}) + end, + + %% We expect RabbitMQ to close Session3 because we are no longer allowed to read from queue q1. + %% This complies with the user expectation in + %% https://github.com/rabbitmq/rabbitmq-server/discussions/11364 + receive + {amqp10_event, + {session, Session3, + {ended, + #'v1_0.error'{ + condition = ?V_1_0_AMQP_ERROR_UNAUTHORIZED_ACCESS, + description = {utf8, <<"read access to queue 'q1' in vhost '/' refused", _/binary>>}}}}} -> ok + after 5000 -> ct:fail({missing_event, ?LINE}) + end, + + ok = amqp10_client:send_msg( + Sender4, + amqp10_msg:set_properties( + #{to => rabbitmq_amqp_address:queue(QName)}, + amqp10_msg:new(<<"t4">>, <<"m4b">>))), + %% We expect RabbitMQ to close Session4 because we are no longer allowed to write to the default exchange. + receive + {amqp10_event, + {session, Session4, + {ended, + #'v1_0.error'{ + condition = ?V_1_0_AMQP_ERROR_UNAUTHORIZED_ACCESS, + description = {utf8, <<"write access to exchange 'amq.default' in vhost '/' refused", _/binary>>}}}}} -> ok + after 5000 -> ct:fail({missing_event, ?LINE}) + end, + + ok = amqp10_client:send_msg( + Sender5, + amqp10_msg:set_properties( + #{to => rabbitmq_amqp_address:exchange(<<"amq.topic">>, <<"topic-1">>)}, + amqp10_msg:new(<<"t5">>, <<"m5b">>))), + %% We expect RabbitMQ to close Session5 because we are no longer allowed to write to topic topic-1. + receive + {amqp10_event, + {session, Session5, + {ended, + #'v1_0.error'{ + condition = ?V_1_0_AMQP_ERROR_UNAUTHORIZED_ACCESS, + description = {utf8, <<"write access to topic 'topic-1' in exchange" + " 'amq.topic' in vhost '/' refused", _/binary>>}}}}} -> ok + after 5000 -> ct:fail({missing_event, ?LINE}) + end, + + %% We expect RabbitMQ to close Session6 because we are no longer allowed to write to exchange e1. + receive + {amqp10_event, + {session, Session6, + {ended, + #'v1_0.error'{ + condition = ?V_1_0_AMQP_ERROR_UNAUTHORIZED_ACCESS, + description = {utf8, <<"write access to exchange 'e1' in vhost '/' refused", _/binary>>}}}}} -> ok + after 5000 -> ct:fail({missing_event, ?LINE}) + end, + + ?assertMatch({ok, #{message_count := 2}}, + rabbitmq_amqp_client:delete_queue(LinkPair, QName)), + ok = rabbitmq_amqp_client:delete_exchange(LinkPair, XName), + ok = amqp10_client:end_session(Session1), + ok = amqp10_client:close_connection(Connection). + test_successful_connection_with_complex_claim_as_a_map(Config) -> {_Algo, Token} = generate_valid_token_with_extra_fields( Config, @@ -765,3 +985,30 @@ test_failed_connection_with_non_existent_scope_alias_in_scope_field(Config) -> more_than_one_resource_server_id_not_allowed_in_one_token(Config) -> {_Algo, Token} = generate_valid_token(Config, <<"rmq.configure:*/*">>, [<<"prod">>, <<"dev">>]), {error, _} = open_unmanaged_connection(Config, 0, <<"username">>, Token). + +amqp_init(Token, Config) -> + OpnConf = amqp_connection_config(Token, Config), + {ok, Connection} = amqp10_client:open_connection(OpnConf), + receive {amqp10_event, {connection, Connection, opened}} -> ok + after 5000 -> ct:fail({missing_event, ?LINE}) + end, + {ok, Session} = amqp10_client:begin_session_sync(Connection), + {ok, LinkPair} = rabbitmq_amqp_client:attach_management_link_pair_sync(Session, <<"my link pair">>), + {Connection, Session, LinkPair}. + +amqp_connection_config(Token, Config) -> + Host = proplists:get_value(rmq_hostname, Config), + Port = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_amqp), + #{address => Host, + port => Port, + container_id => <<"my container">>, + sasl => {plain, <<>>, Token}}. + +flush(Prefix) -> + receive + Msg -> + ct:pal("~p flushed: ~p~n", [Prefix, Msg]), + flush(Prefix) + after 1 -> + ok + end. diff --git a/release-notes/4.1.0.md b/release-notes/4.1.0.md index b4fe0f8b56cc..294aabe37ffc 100644 --- a/release-notes/4.1.0.md +++ b/release-notes/4.1.0.md @@ -24,6 +24,11 @@ Each metric is labelled by protocol (AMQP 1.0, AMQP 0.9.1, MQTT 5.0, MQTT 3.1.1, [PR #12559](https://github.com/rabbitmq/rabbitmq-server/pull/12559) enables AMQP 1.0 publishers to set multiple routing keys by using the `x-cc` message annotation. This annotation allows publishers to specify a [list](https://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-types-v1.0-os.html#type-list) of routing keys ([strings](https://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-types-v1.0-os.html#type-string)) for more flexible message distribution, similar to the [CC](https://www.rabbitmq.com/docs/sender-selected) header in AMQP 0.9.1. +### OAuth 2.0 Token Renewal on AMQP 1.0 Connections +[PR #12599](https://github.com/rabbitmq/rabbitmq-server/pull/12599) introduces support for OAuth 2.0 token renewal on AMQP 1.0 connections. +This feature allows clients to set a new token proactively before the current one [expires](/docs/oauth2#token-expiration), ensuring uninterrupted connectivity. +If a client does not set a new token before the existing one expires, RabbitMQ will automatically close the AMQP 1.0 connection. + ## Potential incompatibilities * The default MQTT [Maximum Packet Size](https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901086) changed from 256 MiB to 16 MiB. This default can be overridden by [configuring](https://www.rabbitmq.com/docs/configure#config-file) `mqtt.max_packet_size_authenticated`. Note that this value must not be greater than `max_message_size` (which also defaults to 16 MiB).