Skip to content

Commit 767523a

Browse files
committed
Reduce ETS copy overhead when delivering to target queues
## What? This commit avoids copying the full amqqueue record from ETS per incoming message and target queue. The amqqueue record contains 21 elements and for some queue types, especially streams, some elements are themselves nested terms. ## How? In Khepri, use a new `rabbit_khepri_queue_target` projection which contains a subset of the full amqqueue record. This way all relevant information to deliver to a target queue can be looked up in a single ets:lookup call. Alternative approaches are described in erlang/otp#10211 ## Benchmark Fanout to 3 classic queues. Variation: Fanout to 3 streams since streams have a larger amqqueue record. Start broker: ``` make run-broker TEST_TMPDIR="$HOME/scratch/rabbit/test" \ FULL=1 \ RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS="+S 5" \ RABBITMQ_CONFIG_FILE="$HOME/scratch/rabbit/high-credit.config" \ PLUGINS="rabbitmq_management" ``` `high-credit.config` contains: ``` [ {rabbit, [ %% Maximum incoming-window of AMQP 1.0 session. %% Default: 400 {max_incoming_window, 5000}, %% Maximum link-credit RabbitMQ grants to AMQP 1.0 sender. %% Default: 128 {max_link_credit, 2000}, %% Maximum link-credit RabbitMQ AMQP 1.0 session grants to sending queue. %% Default: 256 {max_queue_credit, 5000}, {loopback_users, []} ]}, {rabbitmq_management_agent, [ {disable_metrics_collector, true} ]} ]. ``` Create the 3 classic queues and bindings to the fanout exchange: ``` deps/rabbitmq_management/bin/rabbitmqadmin declare queue queue_type=classic durable=true name=q1 && \ deps/rabbitmq_management/bin/rabbitmqadmin declare queue queue_type=classic durable=true name=q2 && \ deps/rabbitmq_management/bin/rabbitmqadmin declare queue queue_type=classic durable=true name=q3 && \ deps/rabbitmq_management/bin/rabbitmqadmin declare binding source=amq.fanout destination=q1 && \ deps/rabbitmq_management/bin/rabbitmqadmin declare binding source=amq.fanout destination=q2 && \ deps/rabbitmq_management/bin/rabbitmqadmin declare binding source=amq.fanout destination=q3 ``` Start the client: ``` quiver-arrow send //host.docker.internal//exchanges/amq.fanout --summary --count 1m --body-size 4 ``` On main branch the rate is about ~67k msgs/s. With this PR the rate is about ~70k msgs/s. When faning out to 3 classic queues, the throughput increase is about 4%. When faning out to 3 streams, the throughput increase is about 9%.
1 parent 25db081 commit 767523a

17 files changed

+188
-107
lines changed

deps/rabbit/src/amqqueue.erl

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -145,6 +145,8 @@
145145
type_state :: #{}
146146
}.
147147

148+
-type target() :: #queue_target{}.
149+
148150
-type ra_server_id() :: {Name :: atom(), Node :: node()}.
149151

150152
-type amqqueue_pattern() :: amqqueue_v2_pattern().
@@ -175,6 +177,7 @@
175177
amqqueue_v2/0,
176178
amqqueue_pattern/0,
177179
amqqueue_v2_pattern/0,
180+
target/0,
178181
ra_server_id/0]).
179182

180183
-spec new(rabbit_amqqueue:name(),
@@ -418,9 +421,10 @@ set_operator_policy(#amqqueue{} = Queue, Policy) ->
418421

419422
% name
420423

421-
-spec get_name(amqqueue()) -> rabbit_amqqueue:name().
424+
-spec get_name(amqqueue() | target()) -> rabbit_amqqueue:name().
422425

423-
get_name(#amqqueue{name = Name}) -> Name.
426+
get_name(#amqqueue{name = Name}) -> Name;
427+
get_name(#queue_target{name = Name}) -> Name.
424428

425429
-spec set_name(amqqueue(), rabbit_amqqueue:name()) -> amqqueue().
426430

@@ -429,9 +433,10 @@ set_name(#amqqueue{} = Queue, Name) ->
429433

430434
% pid
431435

432-
-spec get_pid(amqqueue_v2()) -> pid() | ra_server_id() | none.
436+
-spec get_pid(amqqueue_v2() | target()) -> pid() | ra_server_id() | none.
433437

434-
get_pid(#amqqueue{pid = Pid}) -> Pid.
438+
get_pid(#amqqueue{pid = Pid}) -> Pid;
439+
get_pid(#queue_target{pid = Pid}) -> Pid.
435440

436441
-spec set_pid(amqqueue_v2(), pid() | ra_server_id() | none) -> amqqueue_v2().
437442

@@ -488,9 +493,10 @@ set_state(#amqqueue{} = Queue, State) ->
488493

489494
%% New in v2.
490495

491-
-spec get_type(amqqueue()) -> atom().
496+
-spec get_type(amqqueue() | target()) -> atom().
492497

493-
get_type(#amqqueue{type = Type}) -> Type.
498+
get_type(#amqqueue{type = Type}) -> Type;
499+
get_type(#queue_target{type = Type}) -> Type.
494500

495501
-spec get_vhost(amqqueue()) -> rabbit_types:vhost() | undefined.
496502

deps/rabbit/src/rabbit_amqp_session.erl

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2528,7 +2528,7 @@ incoming_link_transfer(
25282528
QNames = rabbit_exchange:route(X, Mc2, #{return_binding_keys => true}),
25292529
rabbit_trace:tap_in(Mc2, QNames, ConnName, ChannelNum, Username, Trace),
25302530
Opts = #{correlation => {HandleInt, DeliveryId}},
2531-
Qs0 = rabbit_amqqueue:lookup_many(QNames),
2531+
Qs0 = rabbit_db_queue:get_targets(QNames),
25322532
Qs = rabbit_amqqueue:prepend_extra_bcc(Qs0),
25332533
Mc = ensure_mc_cluster_compat(Mc2),
25342534
case rabbit_queue_type:deliver(Qs, Mc, Opts, QStates0) of
@@ -2674,8 +2674,8 @@ process_routing_confirm([], _SenderSettles = false, DeliveryId, U) ->
26742674
Disposition = released(DeliveryId),
26752675
{U, [Disposition]};
26762676
process_routing_confirm([_|_] = Qs, SenderSettles, DeliveryId, U0) ->
2677-
QNames = rabbit_amqqueue:queue_names(Qs),
26782677
false = maps:is_key(DeliveryId, U0),
2678+
QNames = rabbit_amqqueue:queue_names(Qs),
26792679
Map = maps:from_keys(QNames, ok),
26802680
U = U0#{DeliveryId => {Map, SenderSettles, false}},
26812681
rabbit_global_counters:messages_routed(?PROTOCOL, map_size(Map)),

deps/rabbit/src/rabbit_amqqueue.erl

Lines changed: 43 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -2051,65 +2051,70 @@ get_quorum_nodes(Q) ->
20512051
end.
20522052

20532053
-spec prepend_extra_bcc(Qs) ->
2054-
Qs when Qs :: [amqqueue:amqqueue() |
2055-
{amqqueue:amqqueue(), route_infos()}].
2054+
Qs when Qs :: [amqqueue:amqqueue() | amqqueue:target() |
2055+
{amqqueue:amqqueue() | amqqueue:target(), route_infos()}].
20562056
prepend_extra_bcc([]) ->
20572057
[];
20582058
prepend_extra_bcc([Q0] = Qs) ->
20592059
Q = queue(Q0),
2060-
case amqqueue:get_options(Q) of
2061-
#{extra_bcc := BCCName} ->
2062-
case get_bcc_queue(Q, BCCName) of
2060+
case get_extra_bcc(Q) of
2061+
none ->
2062+
Qs;
2063+
Name ->
2064+
case lookup_extra_bcc(Q, Name) of
20632065
{ok, BCCQueue} ->
20642066
[BCCQueue | Qs];
20652067
{error, not_found} ->
20662068
Qs
2067-
end;
2068-
_ ->
2069-
Qs
2069+
end
20702070
end;
20712071
prepend_extra_bcc(Qs) ->
2072-
BCCQueues =
2073-
lists:filtermap(
2074-
fun(Q0) ->
2075-
Q = queue(Q0),
2076-
case amqqueue:get_options(Q) of
2077-
#{extra_bcc := BCCName} ->
2078-
case get_bcc_queue(Q, BCCName) of
2079-
{ok, BCCQ} ->
2080-
{true, BCCQ};
2081-
{error, not_found} ->
2082-
false
2083-
end;
2084-
_ ->
2085-
false
2086-
end
2087-
end, Qs),
2088-
lists:usort(BCCQueues) ++ Qs.
2072+
ExtraQs = lists:filtermap(
2073+
fun(Q0) ->
2074+
Q = queue(Q0),
2075+
case get_extra_bcc(Q) of
2076+
none ->
2077+
false;
2078+
Name ->
2079+
case lookup_extra_bcc(Q, Name) of
2080+
{ok, BCCQ} ->
2081+
{true, BCCQ};
2082+
{error, not_found} ->
2083+
false
2084+
end
2085+
end
2086+
end, Qs),
2087+
lists:usort(ExtraQs) ++ Qs.
20892088

20902089
-spec queue(Q | {Q, route_infos()}) ->
2091-
Q when Q :: amqqueue:amqqueue().
2092-
queue(Q)
2093-
when ?is_amqqueue(Q) ->
2090+
Q when Q :: amqqueue:amqqueue() | amqqueue:target().
2091+
queue({Q, RouteInfos}) when is_map(RouteInfos) ->
20942092
Q;
2095-
queue({Q, RouteInfos})
2096-
when ?is_amqqueue(Q) andalso is_map(RouteInfos) ->
2093+
queue(Q) ->
20972094
Q.
20982095

20992096
-spec queue_names([Q | {Q, route_infos()}]) ->
2100-
[name()] when Q :: amqqueue:amqqueue().
2101-
queue_names(Queues)
2102-
when is_list(Queues) ->
2103-
lists:map(fun(Q) when ?is_amqqueue(Q) ->
2097+
[name()] when Q :: amqqueue:amqqueue() | amqqueue:target().
2098+
queue_names(Queues) ->
2099+
lists:map(fun({Q, RouteInfos}) when is_map(RouteInfos) ->
21042100
amqqueue:get_name(Q);
2105-
({Q, RouteInfos})
2106-
when ?is_amqqueue(Q) andalso is_map(RouteInfos) ->
2101+
(Q) ->
21072102
amqqueue:get_name(Q)
21082103
end, Queues).
21092104

2110-
-spec get_bcc_queue(amqqueue:amqqueue(), binary()) ->
2105+
get_extra_bcc(Q) when ?is_amqqueue(Q) ->
2106+
case amqqueue:get_options(Q) of
2107+
#{extra_bcc := Name} ->
2108+
Name;
2109+
_ ->
2110+
none
2111+
end;
2112+
get_extra_bcc(#queue_target{extra_bcc = Name}) ->
2113+
Name.
2114+
2115+
-spec lookup_extra_bcc(amqqueue:amqqueue() | amqqueue:target(), binary()) ->
21112116
{ok, amqqueue:amqqueue()} | {error, not_found}.
2112-
get_bcc_queue(Q, BCCName) ->
2117+
lookup_extra_bcc(Q, BCCName) ->
21132118
#resource{virtual_host = VHost} = amqqueue:get_name(Q),
21142119
BCCQueueName = rabbit_misc:r(VHost, queue, BCCName),
21152120
lookup(BCCQueueName).

deps/rabbit/src/rabbit_channel.erl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1222,7 +1222,7 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin,
12221222
check_user_id_header(Message0, User),
12231223
Message = rabbit_msg_interceptor:intercept_incoming(Message0, MsgIcptCtx),
12241224
QNames = rabbit_exchange:route(Exchange, Message, #{return_binding_keys => true}),
1225-
Queues = rabbit_amqqueue:lookup_many(QNames),
1225+
Queues = rabbit_db_queue:get_targets(QNames),
12261226
rabbit_trace:tap_in(Message, QNames, ConnName, ChannelNum,
12271227
Username, TraceState),
12281228
%% TODO: call delivery_to_queues with plain args

deps/rabbit/src/rabbit_classic_queue.erl

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -287,16 +287,17 @@ format(Q, _Ctx) when ?is_amqqueue(Q) ->
287287
{state, State},
288288
{node, node(amqqueue:get_pid(Q))}].
289289

290-
-spec init(amqqueue:amqqueue()) -> {ok, state()}.
291-
init(Q) when ?amqqueue_is_classic(Q) ->
290+
-spec init(amqqueue:amqqueue() | amqqueue:target()) ->
291+
{ok, state()}.
292+
init(Q) ->
292293
{ok, #?STATE{pid = amqqueue:get_pid(Q)}}.
293294

294295
-spec close(state()) -> ok.
295296
close(_State) ->
296297
ok.
297298

298-
-spec update(amqqueue:amqqueue(), state()) -> state().
299-
update(Q, #?STATE{pid = Pid} = State) when ?amqqueue_is_classic(Q) ->
299+
-spec update(amqqueue:amqqueue() | amqqueue:target(), state()) -> state().
300+
update(Q, #?STATE{pid = Pid} = State) ->
300301
case amqqueue:get_pid(Q) of
301302
Pid ->
302303
State;
@@ -473,7 +474,7 @@ settlement_action(Type, QRef, MsgSeqs, Acc) ->
473474

474475
supports_stateful_delivery() -> true.
475476

476-
-spec deliver([{amqqueue:amqqueue(), state()}],
477+
-spec deliver([{amqqueue:amqqueue() | amqqueue:target(), state()}],
477478
Delivery :: mc:state(),
478479
rabbit_queue_type:delivery_options()) ->
479480
{[{amqqueue:amqqueue(), state()}], rabbit_queue_type:actions()}.

deps/rabbit/src/rabbit_db_queue.erl

Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
-export([
2020
get/1,
2121
get_many/1,
22+
get_targets/1,
2223
get_all/0,
2324
get_all/1,
2425
get_all_by_type/1,
@@ -85,6 +86,7 @@
8586
-define(MNESIA_DURABLE_TABLE, rabbit_durable_queue).
8687

8788
-define(KHEPRI_PROJECTION, rabbit_khepri_queue).
89+
-define(KHEPRI_TARGET_PROJECTION, rabbit_khepri_queue_target).
8890

8991
%% -------------------------------------------------------------------
9092
%% get_all().
@@ -468,6 +470,21 @@ internal_delete_in_mnesia(QueueName, OnlyDurable, Reason) ->
468470
%% after the transaction.
469471
rabbit_db_binding:delete_for_destination_in_mnesia(QueueName, OnlyDurable).
470472

473+
%% -------------------------------------------------------------------
474+
%% get_targets().
475+
%% -------------------------------------------------------------------
476+
477+
%% Queue target optimisation is only available in Khepri.
478+
%% Mnesia falls back looking up the full amqqueue record.
479+
-spec get_targets(rabbit_exchange:route_return()) ->
480+
[amqqueue:target() | amqqueue:amqqueue() |
481+
{amqqueue:target() | amqqueue:amqqueue(), rabbit_exchange:route_infos()}].
482+
get_targets(Names) ->
483+
rabbit_khepri:handle_fallback(
484+
#{mnesia => fun() -> get_many_in_ets(?MNESIA_TABLE, Names) end,
485+
khepri => fun() -> get_many_in_khepri(?KHEPRI_TARGET_PROJECTION, Names) end
486+
}).
487+
471488
%% -------------------------------------------------------------------
472489
%% get_many().
473490
%% -------------------------------------------------------------------
@@ -477,12 +494,12 @@ internal_delete_in_mnesia(QueueName, OnlyDurable, Reason) ->
477494
get_many(Names) when is_list(Names) ->
478495
rabbit_khepri:handle_fallback(
479496
#{mnesia => fun() -> get_many_in_ets(?MNESIA_TABLE, Names) end,
480-
khepri => fun() -> get_many_in_khepri(Names) end
497+
khepri => fun() -> get_many_in_khepri(?KHEPRI_PROJECTION, Names) end
481498
}).
482499

483-
get_many_in_khepri(Names) ->
500+
get_many_in_khepri(Table, Names) ->
484501
try
485-
get_many_in_ets(?KHEPRI_PROJECTION, Names)
502+
get_many_in_ets(Table, Names)
486503
catch
487504
error:badarg ->
488505
[]

deps/rabbit/src/rabbit_dead_letter.erl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ publish(Msg0, Reason, #exchange{name = XName} = DLX, RK,
4444
Routed0 = rabbit_exchange:route(DLX, DLMsg, #{return_binding_keys => true}),
4545
{Cycles, Routed} = detect_cycles(Reason, DLMsg, Routed0),
4646
lists:foreach(fun log_cycle_once/1, Cycles),
47-
Qs0 = rabbit_amqqueue:lookup_many(Routed),
47+
Qs0 = rabbit_db_queue:get_targets(Routed),
4848
Qs = rabbit_amqqueue:prepend_extra_bcc(Qs0),
4949
_ = rabbit_queue_type:deliver(Qs, DLMsg, #{}, stateless),
5050
ok.

deps/rabbit/src/rabbit_fifo_dlx_worker.erl

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -342,7 +342,7 @@ forward(ConsumedMsg, ConsumedMsgId, ConsumedQRef, DLX, Reason,
342342
{Cycles, RouteToQs1} = rabbit_dead_letter:detect_cycles(
343343
Reason, Msg, RouteToQs0),
344344
State1 = log_cycles(Cycles, [RKey], State0),
345-
RouteToQs2 = rabbit_amqqueue:lookup_many(RouteToQs1),
345+
RouteToQs2 = rabbit_db_queue:get_targets(RouteToQs1),
346346
RouteToQs = rabbit_amqqueue:prepend_extra_bcc(RouteToQs2),
347347
State2 = case RouteToQs of
348348
[] ->
@@ -496,7 +496,7 @@ redeliver0(#pending{delivery = Msg0,
496496
%% queues that do not exist. Therefore, filter out non-existent target queues.
497497
RouteToQs0 = queue_names(
498498
rabbit_amqqueue:prepend_extra_bcc(
499-
rabbit_amqqueue:lookup_many(
499+
rabbit_db_queue:get_targets(
500500
rabbit_exchange:route(DLX, Msg)))),
501501
case {RouteToQs0, Settled} of
502502
{[], [_|_]} ->
@@ -529,7 +529,10 @@ redeliver0(#pending{delivery = Msg0,
529529
rejected = []},
530530
State = State0#state{pendings = maps:update(OutSeq, Pend, Pendings)},
531531
Options = #{correlation => OutSeq},
532-
deliver_to_queues(Msg, Options, rabbit_amqqueue:lookup_many(RouteToQs), State)
532+
deliver_to_queues(Msg,
533+
Options,
534+
rabbit_db_queue:get_targets(RouteToQs),
535+
State)
533536
end
534537
end.
535538

@@ -569,8 +572,7 @@ cancel_timer(#state{timer = TRef} = State)
569572
cancel_timer(State) ->
570573
State.
571574

572-
queue_names(Qs)
573-
when is_list(Qs) ->
575+
queue_names(Qs) ->
574576
lists:map(fun amqqueue:get_name/1, Qs).
575577

576578
format_status(#{state := #state{

deps/rabbit/src/rabbit_khepri.erl

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1310,6 +1310,7 @@ delete_or_fail(Path) ->
13101310
register_projections() ->
13111311
RegFuns = [fun register_rabbit_exchange_projection/0,
13121312
fun register_rabbit_queue_projection/0,
1313+
fun register_rabbit_queue_target_projection/0,
13131314
fun register_rabbit_vhost_projection/0,
13141315
fun register_rabbit_users_projection/0,
13151316
fun register_rabbit_global_runtime_parameters_projection/0,
@@ -1351,7 +1352,26 @@ register_rabbit_queue_projection() ->
13511352
_VHost = ?KHEPRI_WILDCARD_STAR,
13521353
_Name = ?KHEPRI_WILDCARD_STAR),
13531354
KeyPos = 2, %% #amqqueue.name
1354-
register_simple_projection(Name, PathPattern, KeyPos, true).
1355+
register_simple_projection(Name, PathPattern, KeyPos, false).
1356+
1357+
register_rabbit_queue_target_projection() ->
1358+
PathPattern = rabbit_db_queue:khepri_queue_path(
1359+
_VHost = ?KHEPRI_WILDCARD_STAR,
1360+
_Name = ?KHEPRI_WILDCARD_STAR),
1361+
Fun = fun(_Path, Q) ->
1362+
BCC = case amqqueue:get_options(Q) of
1363+
#{extra_bcc := Name} -> Name;
1364+
_ -> none
1365+
end,
1366+
#queue_target{name = amqqueue:get_name(Q),
1367+
type = amqqueue:get_type(Q),
1368+
pid = amqqueue:get_pid(Q),
1369+
extra_bcc = BCC}
1370+
end,
1371+
Opts = #{keypos => #queue_target.name,
1372+
read_concurrency => true},
1373+
Projection = khepri_projection:new(rabbit_khepri_queue_target, Fun, Opts),
1374+
khepri:register_projection(?STORE_ID, PathPattern, Projection).
13551375

13561376
register_rabbit_vhost_projection() ->
13571377
Name = rabbit_khepri_vhost,

0 commit comments

Comments
 (0)