@@ -971,16 +971,17 @@ return_queue_declare_ok(#resource{name = ActualName},
971971 message_count = MessageCount ,
972972 consumer_count = ConsumerCount }).
973973
974- check_resource_access (User , Resource , Perm ) ->
975- V = {Resource , Perm },
974+ check_resource_access (User , Resource , Perm , Context ) ->
975+ V = {Resource , Context , Perm },
976+
976977 Cache = case get (permission_cache ) of
977978 undefined -> [];
978979 Other -> Other
979980 end ,
980981 case lists :member (V , Cache ) of
981982 true -> ok ;
982983 false -> ok = rabbit_access_control :check_resource_access (
983- User , Resource , Perm ),
984+ User , Resource , Perm , Context ),
984985 CacheTail = lists :sublist (Cache , ? MAX_PERMISSION_CACHE_SIZE - 1 ),
985986 put (permission_cache , [V | CacheTail ])
986987 end .
@@ -989,14 +990,14 @@ clear_permission_cache() -> erase(permission_cache),
989990 erase (topic_permission_cache ),
990991 ok .
991992
992- check_configure_permitted (Resource , User ) ->
993- check_resource_access (User , Resource , configure ).
993+ check_configure_permitted (Resource , User , Context ) ->
994+ check_resource_access (User , Resource , configure , Context ).
994995
995- check_write_permitted (Resource , User ) ->
996- check_resource_access (User , Resource , write ).
996+ check_write_permitted (Resource , User , Context ) ->
997+ check_resource_access (User , Resource , write , Context ).
997998
998- check_read_permitted (Resource , User ) ->
999- check_resource_access (User , Resource , read ).
999+ check_read_permitted (Resource , User , Context ) ->
1000+ check_resource_access (User , Resource , read , Context ).
10001001
10011002check_write_permitted_on_topic (Resource , User , ConnPid , RoutingKey , ChSrc ) ->
10021003 check_topic_authorisation (Resource , User , ConnPid , RoutingKey , ChSrc , write ).
@@ -1071,7 +1072,8 @@ check_topic_authorisation(#exchange{name = Name = #resource{virtual_host = VHost
10711072get_amqp_params (_ConnPid , rabbit_reader ) -> [];
10721073get_amqp_params (ConnPid , _Any ) when is_pid (ConnPid ) ->
10731074 Timeout = get_operation_timeout (),
1074- get_amqp_params (ConnPid , rabbit_misc :is_process_alive (ConnPid ), Timeout ).
1075+ get_amqp_params (ConnPid , rabbit_misc :is_process_alive (ConnPid ), Timeout );
1076+ get_amqp_params (_ , _ ) -> [].
10751077
10761078get_amqp_params (ConnPid , false , _Timeout ) ->
10771079 % % Connection process is dead
@@ -1082,16 +1084,19 @@ get_amqp_params(ConnPid, true, Timeout) ->
10821084 rabbit_amqp_connection :amqp_params (ConnPid , Timeout ).
10831085
10841086build_topic_variable_map (AmqpParams , VHost , Username ) ->
1085- VariableFromAmqpParams = extract_topic_variable_map_from_amqp_params (AmqpParams ),
1087+ VariableFromAmqpParams = extract_variable_map_from_amqp_params (AmqpParams ),
10861088 maps :merge (VariableFromAmqpParams , #{<<" vhost" >> => VHost , <<" username" >> => Username }).
10871089
1090+ extract_authz_context (ConnPid , ChSrc ) ->
1091+ extract_variable_map_from_amqp_params (get_amqp_params (ConnPid , ChSrc )).
1092+
10881093% % use tuple representation of amqp_params to avoid coupling.
10891094% % get variable map only from amqp_params_direct, not amqp_params_network.
10901095% % amqp_params_direct are usually used from plugins (e.g. MQTT, STOMP)
1091- extract_topic_variable_map_from_amqp_params ([{amqp_params , {amqp_params_direct , _ , _ , _ , _ ,
1096+ extract_variable_map_from_amqp_params ([{amqp_params , {amqp_params_direct , _ , _ , _ , _ ,
10921097 {amqp_adapter_info , _ ,_ ,_ ,_ ,_ ,_ ,AdditionalInfo }, _ }}]) ->
10931098 proplists :get_value (variable_map , AdditionalInfo , #{});
1094- extract_topic_variable_map_from_amqp_params (_ ) ->
1099+ extract_variable_map_from_amqp_params (_ ) ->
10951100 #{}.
10961101
10971102check_msg_size (Content , MaxMessageSize ) ->
@@ -1298,7 +1303,7 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin,
12981303 }) ->
12991304 check_msg_size (Content , MaxMessageSize ),
13001305 ExchangeName = rabbit_misc :r (VHostPath , exchange , ExchangeNameBin ),
1301- check_write_permitted (ExchangeName , User ),
1306+ check_write_permitted (ExchangeName , User , extract_authz_context ( ConnPid , ChSrc ) ),
13021307 Exchange = rabbit_exchange :lookup_or_die (ExchangeName ),
13031308 check_internal_exchange (Exchange ),
13041309 check_write_permitted_on_topic (Exchange , User , ConnPid , RoutingKey , ChSrc ),
@@ -1353,13 +1358,14 @@ handle_method(#'basic.get'{queue = QueueNameBin, no_ack = NoAck},
13531358 _ , State = # ch {cfg = # conf {writer_pid = WriterPid ,
13541359 conn_pid = ConnPid ,
13551360 user = User ,
1356- virtual_host = VHostPath
1361+ virtual_host = VHostPath ,
1362+ source = ChSrc
13571363 },
13581364 limiter = Limiter ,
13591365 next_tag = DeliveryTag ,
13601366 queue_states = QueueStates0 }) ->
13611367 QueueName = qbin_to_resource (QueueNameBin , VHostPath ),
1362- check_read_permitted (QueueName , User ),
1368+ check_read_permitted (QueueName , User , extract_authz_context ( ConnPid , ChSrc ) ),
13631369 case rabbit_amqqueue :with_exclusive_access_or_die (
13641370 QueueName , ConnPid ,
13651371 % % Use the delivery tag as consumer tag for quorum queues
@@ -1438,13 +1444,15 @@ handle_method(#'basic.consume'{queue = QueueNameBin,
14381444 arguments = Args },
14391445 _ , State = # ch {cfg = # conf {consumer_prefetch = ConsumerPrefetch ,
14401446 user = User ,
1441- virtual_host = VHostPath },
1447+ virtual_host = VHostPath ,
1448+ conn_pid = ConnPid ,
1449+ source = ChSrc },
14421450 consumer_mapping = ConsumerMapping
14431451 }) ->
14441452 case maps :find (ConsumerTag , ConsumerMapping ) of
14451453 error ->
14461454 QueueName = qbin_to_resource (QueueNameBin , VHostPath ),
1447- check_read_permitted (QueueName , User ),
1455+ check_read_permitted (QueueName , User , extract_authz_context ( ConnPid , ChSrc ) ),
14481456 ActualConsumerTag =
14491457 case ConsumerTag of
14501458 <<>> -> rabbit_guid :binary (rabbit_guid :gen_secure (),
@@ -1916,10 +1924,11 @@ binding_action(Fun, SourceNameBin0, DestinationType, DestinationNameBin0,
19161924 ExchangeNameBin = strip_cr_lf (SourceNameBin0 ),
19171925 DestinationNameBin = strip_cr_lf (DestinationNameBin0 ),
19181926 DestinationName = name_to_resource (DestinationType , DestinationNameBin , VHostPath ),
1919- check_write_permitted (DestinationName , User ),
1927+ AuthContext = extract_authz_context (ConnPid , ChSrc ),
1928+ check_write_permitted (DestinationName , User , AuthContext ),
19201929 ExchangeName = rabbit_misc :r (VHostPath , exchange , ExchangeNameBin ),
19211930 [check_not_default_exchange (N ) || N <- [DestinationName , ExchangeName ]],
1922- check_read_permitted (ExchangeName , User ),
1931+ check_read_permitted (ExchangeName , User , AuthContext ),
19231932 case rabbit_exchange :lookup (ExchangeName ) of
19241933 {error , not_found } ->
19251934 ok ;
@@ -2495,7 +2504,7 @@ handle_method(#'queue.declare'{queue = QueueNameBin,
24952504 Other -> check_name ('queue' , Other )
24962505 end ,
24972506 QueueName = rabbit_misc :r (VHostPath , queue , ActualNameBin ),
2498- check_configure_permitted (QueueName , User ),
2507+ check_configure_permitted (QueueName , User , extract_authz_context ( ConnPid , ChSrc ) ),
24992508 rabbit_core_metrics :queue_declared (QueueName ),
25002509 case rabbit_amqqueue :with (
25012510 QueueName ,
@@ -2517,8 +2526,9 @@ handle_method(#'queue.declare'{queue = QueueNameBin,
25172526 " invalid type '~s ' for arg '~s ' in ~s " ,
25182527 [Type , DlxKey , rabbit_misc :rs (QueueName )]);
25192528 DLX ->
2520- check_read_permitted (QueueName , User ),
2521- check_write_permitted (DLX , User ),
2529+ AuthContext = extract_authz_context (ConnPid , ChSrc ),
2530+ check_read_permitted (QueueName , User , AuthContext ),
2531+ check_write_permitted (DLX , User , AuthContext ),
25222532 ok
25232533 end ,
25242534 case rabbit_amqqueue :declare (QueueName , Durable , AutoDelete ,
@@ -2570,12 +2580,12 @@ handle_method(#'queue.declare'{queue = QueueNameBin,
25702580handle_method (# 'queue.delete' {queue = QueueNameBin ,
25712581 if_unused = IfUnused ,
25722582 if_empty = IfEmpty },
2573- ConnPid , _ChSrc , _CollectorPid , VHostPath ,
2583+ ConnPid , ChSrc , _CollectorPid , VHostPath ,
25742584 User = # user {username = Username }) ->
25752585 StrippedQueueNameBin = strip_cr_lf (QueueNameBin ),
25762586 QueueName = qbin_to_resource (StrippedQueueNameBin , VHostPath ),
25772587
2578- check_configure_permitted (QueueName , User ),
2588+ check_configure_permitted (QueueName , User , extract_authz_context ( ConnPid , ChSrc ) ),
25792589 case rabbit_amqqueue :with (
25802590 QueueName ,
25812591 fun (Q ) ->
@@ -2599,13 +2609,13 @@ handle_method(#'queue.delete'{queue = QueueNameBin,
25992609 end ;
26002610handle_method (# 'exchange.delete' {exchange = ExchangeNameBin ,
26012611 if_unused = IfUnused },
2602- _ConnPid , _ChSrc , _CollectorPid , VHostPath ,
2612+ ConnPid , ChSrc , _CollectorPid , VHostPath ,
26032613 User = # user {username = Username }) ->
26042614 StrippedExchangeNameBin = strip_cr_lf (ExchangeNameBin ),
26052615 ExchangeName = rabbit_misc :r (VHostPath , exchange , StrippedExchangeNameBin ),
26062616 check_not_default_exchange (ExchangeName ),
26072617 check_exchange_deletion (ExchangeName ),
2608- check_configure_permitted (ExchangeName , User ),
2618+ check_configure_permitted (ExchangeName , User , extract_authz_context ( ConnPid , ChSrc ) ),
26092619 case rabbit_exchange :delete (ExchangeName , IfUnused , Username ) of
26102620 {error , not_found } ->
26112621 ok ;
@@ -2615,9 +2625,9 @@ handle_method(#'exchange.delete'{exchange = ExchangeNameBin,
26152625 ok
26162626 end ;
26172627handle_method (# 'queue.purge' {queue = QueueNameBin },
2618- ConnPid , _ChSrc , _CollectorPid , VHostPath , User ) ->
2628+ ConnPid , ChSrc , _CollectorPid , VHostPath , User ) ->
26192629 QueueName = qbin_to_resource (QueueNameBin , VHostPath ),
2620- check_read_permitted (QueueName , User ),
2630+ check_read_permitted (QueueName , User , extract_authz_context ( ConnPid , ChSrc ) ),
26212631 rabbit_amqqueue :with_exclusive_access_or_die (
26222632 QueueName , ConnPid ,
26232633 fun (Q ) -> rabbit_amqqueue :purge (Q ) end );
@@ -2628,12 +2638,12 @@ handle_method(#'exchange.declare'{exchange = ExchangeNameBin,
26282638 auto_delete = AutoDelete ,
26292639 internal = Internal ,
26302640 arguments = Args },
2631- _ConnPid , _ChSrc , _CollectorPid , VHostPath ,
2641+ ConnPid , ChSrc , _CollectorPid , VHostPath ,
26322642 # user {username = Username } = User ) ->
26332643 CheckedType = rabbit_exchange :check_type (TypeNameBin ),
26342644 ExchangeName = rabbit_misc :r (VHostPath , exchange , strip_cr_lf (ExchangeNameBin )),
26352645 check_not_default_exchange (ExchangeName ),
2636- check_configure_permitted (ExchangeName , User ),
2646+ check_configure_permitted (ExchangeName , User , extract_authz_context ( ConnPid , ChSrc ) ),
26372647 X = case rabbit_exchange :lookup (ExchangeName ) of
26382648 {ok , FoundX } -> FoundX ;
26392649 {error , not_found } ->
@@ -2645,8 +2655,9 @@ handle_method(#'exchange.declare'{exchange = ExchangeNameBin,
26452655 precondition_failed (
26462656 " invalid type '~s ' for arg '~s ' in ~s " ,
26472657 [Type , AeKey , rabbit_misc :rs (ExchangeName )]);
2648- AName -> check_read_permitted (ExchangeName , User ),
2649- check_write_permitted (AName , User ),
2658+ AName -> AuthContext = extract_authz_context (ConnPid , ChSrc ),
2659+ check_read_permitted (ExchangeName , User , AuthContext ),
2660+ check_write_permitted (AName , User , AuthContext ),
26502661 ok
26512662 end ,
26522663 rabbit_exchange :declare (ExchangeName ,
0 commit comments