Skip to content

Commit 8f70a50

Browse files
Ayanda-Dmichaelklishin
authored andcommitted
store and retrieve channel source from state
(cherry picked from commit f71ace3) Conflicts: src/rabbit_channel.erl
1 parent e6fed33 commit 8f70a50

File tree

2 files changed

+97
-85
lines changed

2 files changed

+97
-85
lines changed

src/rabbit_channel.erl

Lines changed: 72 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@
7676
-export([build_topic_variable_map/3]).
7777

7878
%% Mgmt HTTP API refactor
79-
-export([handle_method/5]).
79+
-export([handle_method/6]).
8080

8181
-record(ch, {
8282
%% starting | running | flow | closing
@@ -95,6 +95,9 @@
9595
%% same as reader's name, see #v1.name
9696
%% in rabbit_reader
9797
conn_name,
98+
%% channel's originating source e.g. rabbit_reader | rabbit_direct | undefined
99+
%% or any other channel creating/spawning entity
100+
source,
98101
%% limiter pid, see rabbit_limiter
99102
limiter,
100103
%% none | {Msgs, Acks} | committing | failed |
@@ -715,9 +718,8 @@ handle_info({{Ref, Node}, LateAnswer}, State = #ch{channel = Channel})
715718
[Channel, LateAnswer, Node]),
716719
noreply(State);
717720

718-
handle_info({channel_source, Source}, State) ->
719-
put(channel_source, Source),
720-
noreply(State).
721+
handle_info({channel_source, Source}, State = #ch{}) ->
722+
noreply(State#ch{source = Source}).
721723

722724
handle_pre_hibernate(State) ->
723725
ok = clear_permission_cache(),
@@ -842,11 +844,11 @@ check_write_permitted(Resource, User) ->
842844
check_read_permitted(Resource, User) ->
843845
check_resource_access(User, Resource, read).
844846

845-
check_write_permitted_on_topic(Resource, User, ConnPid, RoutingKey) ->
846-
check_topic_authorisation(Resource, User, ConnPid, RoutingKey, write).
847+
check_write_permitted_on_topic(Resource, User, ConnPid, RoutingKey, ChSrc) ->
848+
check_topic_authorisation(Resource, User, ConnPid, RoutingKey, ChSrc, write).
847849

848-
check_read_permitted_on_topic(Resource, User, ConnPid, RoutingKey) ->
849-
check_topic_authorisation(Resource, User, ConnPid, RoutingKey, read).
850+
check_read_permitted_on_topic(Resource, User, ConnPid, RoutingKey, ChSrc) ->
851+
check_topic_authorisation(Resource, User, ConnPid, RoutingKey, ChSrc, read).
850852

851853
check_user_id_header(#'P_basic'{user_id = undefined}, _) ->
852854
ok;
@@ -882,14 +884,17 @@ check_internal_exchange(_) ->
882884
ok.
883885

884886
check_topic_authorisation(Resource = #exchange{type = topic},
885-
User, none, RoutingKey, Permission) ->
887+
User, none, RoutingKey, _ChSrc, Permission) ->
886888
%% Called from outside the channel by mgmt API
887889
AmqpParams = [],
888890
check_topic_authorisation(Resource, User, AmqpParams, RoutingKey, Permission);
889891
check_topic_authorisation(Resource = #exchange{type = topic},
890-
User, ConnPid, RoutingKey, Permission) when is_pid(ConnPid) ->
891-
AmqpParams = get_amqp_params(ConnPid, get(channel_source)),
892+
User, ConnPid, RoutingKey, ChSrc, Permission) when is_pid(ConnPid) ->
893+
AmqpParams = get_amqp_params(ConnPid, ChSrc),
892894
check_topic_authorisation(Resource, User, AmqpParams, RoutingKey, Permission);
895+
check_topic_authorisation(_, _, _, _, _, _) ->
896+
ok.
897+
893898
check_topic_authorisation(#exchange{name = Name = #resource{virtual_host = VHost}, type = topic},
894899
User = #user{username = Username},
895900
AmqpParams, RoutingKey, Permission) ->
@@ -907,9 +912,7 @@ check_topic_authorisation(#exchange{name = Name = #resource{virtual_host = VHost
907912
User, Resource, Permission, Context),
908913
CacheTail = lists:sublist(Cache, ?MAX_PERMISSION_CACHE_SIZE-1),
909914
put(topic_permission_cache, [{Resource, Context, Permission} | CacheTail])
910-
end;
911-
check_topic_authorisation(_, _, _, _, _) ->
912-
ok.
915+
end.
913916

914917
get_amqp_params(_ConnPid, rabbit_reader) -> [];
915918
get_amqp_params(ConnPid, _Any) when is_pid(ConnPid) ->
@@ -1124,13 +1127,14 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin,
11241127
user = #user{username = Username} = User,
11251128
conn_name = ConnName,
11261129
delivery_flow = Flow,
1127-
conn_pid = ConnPid}) ->
1130+
conn_pid = ConnPid,
1131+
source = ChSrc}) ->
11281132
check_msg_size(Content),
11291133
ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin),
11301134
check_write_permitted(ExchangeName, User),
11311135
Exchange = rabbit_exchange:lookup_or_die(ExchangeName),
11321136
check_internal_exchange(Exchange),
1133-
check_write_permitted_on_topic(Exchange, User, ConnPid, RoutingKey),
1137+
check_write_permitted_on_topic(Exchange, User, ConnPid, RoutingKey, ChSrc),
11341138
%% We decode the content's properties here because we're almost
11351139
%% certain to want to look at delivery-mode and priority.
11361140
DecodedContent = #content {properties = Props} =
@@ -1405,79 +1409,90 @@ handle_method(#'exchange.declare'{nowait = NoWait} = Method,
14051409
_, State = #ch{virtual_host = VHostPath,
14061410
user = User,
14071411
queue_collector_pid = CollectorPid,
1408-
conn_pid = ConnPid}) ->
1409-
handle_method(Method, ConnPid, CollectorPid, VHostPath, User),
1412+
conn_pid = ConnPid,
1413+
source = ChSrc}) ->
1414+
handle_method(Method, ConnPid, ChSrc, CollectorPid, VHostPath, User),
14101415
return_ok(State, NoWait, #'exchange.declare_ok'{});
14111416

14121417
handle_method(#'exchange.delete'{nowait = NoWait} = Method,
14131418
_, State = #ch{conn_pid = ConnPid,
1419+
source = ChSrc,
14141420
virtual_host = VHostPath,
14151421
queue_collector_pid = CollectorPid,
14161422
user = User}) ->
1417-
handle_method(Method, ConnPid, CollectorPid, VHostPath, User),
1423+
handle_method(Method, ConnPid, ChSrc, CollectorPid, VHostPath, User),
14181424
return_ok(State, NoWait, #'exchange.delete_ok'{});
14191425

14201426
handle_method(#'exchange.bind'{nowait = NoWait} = Method,
14211427
_, State = #ch{virtual_host = VHostPath,
14221428
conn_pid = ConnPid,
1429+
source = ChSrc,
14231430
queue_collector_pid = CollectorPid,
14241431
user = User}) ->
1425-
handle_method(Method, ConnPid, CollectorPid, VHostPath, User),
1432+
handle_method(Method, ConnPid, ChSrc, CollectorPid, VHostPath, User),
14261433
return_ok(State, NoWait, #'exchange.bind_ok'{});
14271434

14281435
handle_method(#'exchange.unbind'{nowait = NoWait} = Method,
14291436
_, State = #ch{virtual_host = VHostPath,
14301437
conn_pid = ConnPid,
1438+
source = ChSrc,
14311439
queue_collector_pid = CollectorPid,
14321440
user = User}) ->
1433-
handle_method(Method, ConnPid, CollectorPid, VHostPath, User),
1441+
handle_method(Method, ConnPid, ChSrc, CollectorPid, VHostPath, User),
14341442
return_ok(State, NoWait, #'exchange.unbind_ok'{});
14351443

14361444
handle_method(#'queue.declare'{nowait = NoWait} = Method,
14371445
_, State = #ch{virtual_host = VHostPath,
14381446
conn_pid = ConnPid,
1447+
source = ChSrc,
14391448
queue_collector_pid = CollectorPid,
14401449
user = User}) ->
14411450
{ok, QueueName, MessageCount, ConsumerCount} =
1442-
handle_method(Method, ConnPid, CollectorPid, VHostPath, User),
1451+
handle_method(Method, ConnPid, ChSrc, CollectorPid, VHostPath, User),
14431452
return_queue_declare_ok(QueueName, NoWait, MessageCount,
14441453
ConsumerCount, State);
14451454

14461455
handle_method(#'queue.delete'{nowait = NoWait} = Method, _,
14471456
State = #ch{conn_pid = ConnPid,
1457+
source = ChSrc,
14481458
virtual_host = VHostPath,
14491459
queue_collector_pid = CollectorPid,
14501460
user = User}) ->
1451-
{ok, PurgedMessageCount} = handle_method(Method, ConnPid, CollectorPid,
1452-
VHostPath, User),
1461+
{ok, PurgedMessageCount} =
1462+
handle_method(Method, ConnPid, ChSrc, CollectorPid, VHostPath, User),
14531463
return_ok(State, NoWait,
14541464
#'queue.delete_ok'{message_count = PurgedMessageCount});
14551465

14561466
handle_method(#'queue.bind'{nowait = NoWait} = Method, _,
14571467
State = #ch{conn_pid = ConnPid,
1468+
source = ChSrc,
14581469
user = User,
14591470
queue_collector_pid = CollectorPid,
14601471
virtual_host = VHostPath}) ->
1461-
handle_method(Method, ConnPid, CollectorPid, VHostPath, User),
1472+
handle_method(Method, ConnPid, ChSrc, CollectorPid, VHostPath, User),
14621473
return_ok(State, NoWait, #'queue.bind_ok'{});
14631474

14641475
handle_method(#'queue.unbind'{} = Method, _,
14651476
State = #ch{conn_pid = ConnPid,
1477+
source = ChSrc,
14661478
user = User,
14671479
queue_collector_pid = CollectorPid,
14681480
virtual_host = VHostPath}) ->
1469-
handle_method(Method, ConnPid, CollectorPid, VHostPath, User),
1481+
handle_method(Method, ConnPid, ChSrc, CollectorPid, VHostPath, User),
14701482
return_ok(State, false, #'queue.unbind_ok'{});
14711483

14721484
handle_method(#'queue.purge'{nowait = NoWait} = Method,
14731485
_, State = #ch{conn_pid = ConnPid,
1486+
source = ChSrc,
14741487
user = User,
14751488
queue_collector_pid = CollectorPid,
14761489
virtual_host = VHostPath}) ->
1477-
{ok, PurgedMessageCount} = handle_method(Method, ConnPid, CollectorPid,
1478-
VHostPath, User),
1479-
return_ok(State, NoWait,
1480-
#'queue.purge_ok'{message_count = PurgedMessageCount});
1490+
case handle_method(Method, ConnPid, ChSrc, CollectorPid,
1491+
VHostPath, User) of
1492+
{ok, PurgedMessageCount} ->
1493+
return_ok(State, NoWait,
1494+
#'queue.purge_ok'{message_count = PurgedMessageCount})
1495+
end;
14811496

14821497
handle_method(#'tx.select'{}, _, #ch{confirm_enabled = true}) ->
14831498
precondition_failed("cannot switch from confirm to tx mode");
@@ -1672,7 +1687,7 @@ handle_delivering_queue_down(QPid, State = #ch{delivering_queues = DQ}) ->
16721687
State#ch{delivering_queues = sets:del_element(QPid, DQ)}.
16731688

16741689
binding_action(Fun, SourceNameBin0, DestinationType, DestinationNameBin0,
1675-
RoutingKey, Arguments, VHostPath, ConnPid,
1690+
RoutingKey, Arguments, VHostPath, ConnPid, ChSrc,
16761691
#user{username = Username} = User) ->
16771692
ExchangeNameBin = strip_cr_lf(SourceNameBin0),
16781693
DestinationNameBin = strip_cr_lf(DestinationNameBin0),
@@ -1681,14 +1696,11 @@ binding_action(Fun, SourceNameBin0, DestinationType, DestinationNameBin0,
16811696
ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin),
16821697
[check_not_default_exchange(N) || N <- [DestinationName, ExchangeName]],
16831698
check_read_permitted(ExchangeName, User),
1684-
ExchangeLookup = rabbit_exchange:lookup(ExchangeName),
1685-
case ExchangeLookup of
1699+
case rabbit_exchange:lookup(ExchangeName) of
16861700
{error, not_found} ->
1687-
%% no-op
1688-
ExchangeLookup;
1701+
ok;
16891702
{ok, Exchange} ->
1690-
check_read_permitted_on_topic(Exchange, User, ConnPid, RoutingKey),
1691-
ExchangeLookup
1703+
check_read_permitted_on_topic(Exchange, User, ConnPid, RoutingKey, ChSrc)
16921704
end,
16931705
case Fun(#binding{source = ExchangeName,
16941706
destination = DestinationName,
@@ -2058,6 +2070,7 @@ i(user_who_performed_action, Ch) -> i(user, Ch);
20582070
i(vhost, #ch{virtual_host = VHost}) -> VHost;
20592071
i(transactional, #ch{tx = Tx}) -> Tx =/= none;
20602072
i(confirm, #ch{confirm_enabled = CE}) -> CE;
2073+
i(source, #ch{source = ChSrc}) -> ChSrc;
20612074
i(name, State) -> name(State);
20622075
i(consumer_count, #ch{consumer_mapping = CM}) -> maps:size(CM);
20632076
i(messages_unconfirmed, #ch{unconfirmed = UC}) -> dtree:size(UC);
@@ -2078,7 +2091,6 @@ i(garbage_collection, _State) ->
20782091
i(reductions, _State) ->
20792092
{reductions, Reductions} = erlang:process_info(self(), reductions),
20802093
Reductions;
2081-
i(channel_source, _State = #ch{}) -> get(channel_source);
20822094
i(Item, _) ->
20832095
throw({bad_argument, Item}).
20842096

@@ -2130,39 +2142,39 @@ handle_method(#'exchange.bind'{destination = DestinationNameBin,
21302142
source = SourceNameBin,
21312143
routing_key = RoutingKey,
21322144
arguments = Arguments},
2133-
ConnPid, _CollectorId, VHostPath, User) ->
2145+
ConnPid, ChSrc, _CollectorId, VHostPath, User) ->
21342146
binding_action(fun rabbit_binding:add/3,
21352147
SourceNameBin, exchange, DestinationNameBin,
2136-
RoutingKey, Arguments, VHostPath, ConnPid, User);
2148+
RoutingKey, Arguments, VHostPath, ConnPid, ChSrc, User);
21372149
handle_method(#'exchange.unbind'{destination = DestinationNameBin,
21382150
source = SourceNameBin,
21392151
routing_key = RoutingKey,
21402152
arguments = Arguments},
2141-
ConnPid, _CollectorId, VHostPath, User) ->
2153+
ConnPid, ChSrc, _CollectorId, VHostPath, User) ->
21422154
binding_action(fun rabbit_binding:remove/3,
21432155
SourceNameBin, exchange, DestinationNameBin,
2144-
RoutingKey, Arguments, VHostPath, ConnPid, User);
2156+
RoutingKey, Arguments, VHostPath, ConnPid, ChSrc, User);
21452157
handle_method(#'queue.unbind'{queue = QueueNameBin,
21462158
exchange = ExchangeNameBin,
21472159
routing_key = RoutingKey,
21482160
arguments = Arguments},
2149-
ConnPid, _CollectorId, VHostPath, User) ->
2161+
ConnPid, ChSrc, _CollectorId, VHostPath, User) ->
21502162
binding_action(fun rabbit_binding:remove/3,
21512163
ExchangeNameBin, queue, QueueNameBin,
2152-
RoutingKey, Arguments, VHostPath, ConnPid, User);
2164+
RoutingKey, Arguments, VHostPath, ConnPid, ChSrc, User);
21532165
handle_method(#'queue.bind'{queue = QueueNameBin,
21542166
exchange = ExchangeNameBin,
21552167
routing_key = RoutingKey,
21562168
arguments = Arguments},
2157-
ConnPid, _CollectorId, VHostPath, User) ->
2169+
ConnPid, ChSrc, _CollectorId, VHostPath, User) ->
21582170
binding_action(fun rabbit_binding:add/3,
21592171
ExchangeNameBin, queue, QueueNameBin,
2160-
RoutingKey, Arguments, VHostPath, ConnPid, User);
2172+
RoutingKey, Arguments, VHostPath, ConnPid, ChSrc, User);
21612173
%% Note that all declares to these are effectively passive. If it
21622174
%% exists it by definition has one consumer.
21632175
handle_method(#'queue.declare'{queue = <<"amq.rabbitmq.reply-to",
21642176
_/binary>> = QueueNameBin},
2165-
_ConnPid, _CollectorPid, VHost, _User) ->
2177+
_ConnPid, _ChSrc, _CollectorPid, VHost, _User) ->
21662178
StrippedQueueNameBin = strip_cr_lf(QueueNameBin),
21672179
QueueName = rabbit_misc:r(VHost, queue, StrippedQueueNameBin),
21682180
case declare_fast_reply_to(StrippedQueueNameBin) of
@@ -2176,7 +2188,8 @@ handle_method(#'queue.declare'{queue = QueueNameBin,
21762188
auto_delete = AutoDelete,
21772189
nowait = NoWait,
21782190
arguments = Args} = Declare,
2179-
ConnPid, CollectorPid, VHostPath, #user{username = Username} = User) ->
2191+
ConnPid, ChSrc, CollectorPid, VHostPath,
2192+
#user{username = Username} = User) ->
21802193
Owner = case ExclusiveDeclare of
21812194
true -> ConnPid;
21822195
false -> none
@@ -2233,7 +2246,8 @@ handle_method(#'queue.declare'{queue = QueueNameBin,
22332246
{existing, _Q} ->
22342247
%% must have been created between the stat and the
22352248
%% declare. Loop around again.
2236-
handle_method(Declare, ConnPid, CollectorPid, VHostPath, User);
2249+
handle_method(Declare, ConnPid, ChSrc, CollectorPid, VHostPath,
2250+
User);
22372251
{absent, Q, Reason} ->
22382252
rabbit_misc:absent(Q, Reason);
22392253
{owner_died, _Q} ->
@@ -2248,7 +2262,7 @@ handle_method(#'queue.declare'{queue = QueueNameBin,
22482262
handle_method(#'queue.declare'{queue = QueueNameBin,
22492263
nowait = NoWait,
22502264
passive = true},
2251-
ConnPid, _CollectorPid, VHostPath, _User) ->
2265+
ConnPid, _ChSrc, _CollectorPid, VHostPath, _User) ->
22522266
StrippedQueueNameBin = strip_cr_lf(QueueNameBin),
22532267
QueueName = rabbit_misc:r(VHostPath, queue, StrippedQueueNameBin),
22542268
{{ok, MessageCount, ConsumerCount}, #amqqueue{} = Q} =
@@ -2259,7 +2273,8 @@ handle_method(#'queue.declare'{queue = QueueNameBin,
22592273
handle_method(#'queue.delete'{queue = QueueNameBin,
22602274
if_unused = IfUnused,
22612275
if_empty = IfEmpty},
2262-
ConnPid, _CollectorPid, VHostPath, User = #user{username = Username}) ->
2276+
ConnPid, _ChSrc, _CollectorPid, VHostPath,
2277+
User = #user{username = Username}) ->
22632278
StrippedQueueNameBin = strip_cr_lf(QueueNameBin),
22642279
QueueName = qbin_to_resource(StrippedQueueNameBin, VHostPath),
22652280

@@ -2286,7 +2301,8 @@ handle_method(#'queue.delete'{queue = QueueNameBin,
22862301
end;
22872302
handle_method(#'exchange.delete'{exchange = ExchangeNameBin,
22882303
if_unused = IfUnused},
2289-
_ConnPid, _CollectorPid, VHostPath, User = #user{username = Username}) ->
2304+
_ConnPid, _ChSrc, _CollectorPid, VHostPath,
2305+
User = #user{username = Username}) ->
22902306
StrippedExchangeNameBin = strip_cr_lf(ExchangeNameBin),
22912307
ExchangeName = rabbit_misc:r(VHostPath, exchange, StrippedExchangeNameBin),
22922308
check_not_default_exchange(ExchangeName),
@@ -2301,7 +2317,7 @@ handle_method(#'exchange.delete'{exchange = ExchangeNameBin,
23012317
ok
23022318
end;
23032319
handle_method(#'queue.purge'{queue = QueueNameBin},
2304-
ConnPid, _CollectorPid, VHostPath, User) ->
2320+
ConnPid, _ChSrc, _CollectorPid, VHostPath, User) ->
23052321
QueueName = qbin_to_resource(QueueNameBin, VHostPath),
23062322
check_read_permitted(QueueName, User),
23072323
rabbit_amqqueue:with_exclusive_access_or_die(
@@ -2314,7 +2330,8 @@ handle_method(#'exchange.declare'{exchange = ExchangeNameBin,
23142330
auto_delete = AutoDelete,
23152331
internal = Internal,
23162332
arguments = Args},
2317-
_ConnPid, _CollectorPid, VHostPath, #user{username = Username} = User) ->
2333+
_ConnPid, _ChSrc, _CollectorPid, VHostPath,
2334+
#user{username = Username} = User) ->
23182335
CheckedType = rabbit_exchange:check_type(TypeNameBin),
23192336
ExchangeName = rabbit_misc:r(VHostPath, exchange, strip_cr_lf(ExchangeNameBin)),
23202337
check_not_default_exchange(ExchangeName),
@@ -2346,7 +2363,7 @@ handle_method(#'exchange.declare'{exchange = ExchangeNameBin,
23462363
AutoDelete, Internal, Args);
23472364
handle_method(#'exchange.declare'{exchange = ExchangeNameBin,
23482365
passive = true},
2349-
_ConnPid, _CollectorPid, VHostPath, _User) ->
2366+
_ConnPid, _ChSrc, _CollectorPid, VHostPath, _User) ->
23502367
ExchangeName = rabbit_misc:r(VHostPath, exchange, strip_cr_lf(ExchangeNameBin)),
23512368
check_not_default_exchange(ExchangeName),
23522369
_ = rabbit_exchange:lookup_or_die(ExchangeName).

0 commit comments

Comments
 (0)