Skip to content

Commit 2687034

Browse files
committed
Reduce ETS copy overhead when delivering to target queues
## What? This commit avoids copying the full amqqueue record from ETS per incoming message and target queue. The amqqueue record contains 21 elements and for some queue types, especially streams, some elements are themselves nested terms. ## How? In Khepri, use a new `rabbit_khepri_queue_target` projection which contains a subset of the full amqqueue record. This way all relevant information to deliver to a target queue can be looked up in a single ets:lookup_element call. Alternative approaches are described in erlang/otp#10211 ## Benchmark Fanout to 3 streams Start broker: ``` make run-broker TEST_TMPDIR="$HOME/scratch/rabbit/test" \ FULL=1 \ RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS="+S 5" \ RABBITMQ_CONFIG_FILE="$HOME/scratch/rabbit/high-credit.config" \ PLUGINS="rabbitmq_management" ``` `high-credit.config` contains: ``` [ {rabbit, [ %% Maximum incoming-window of AMQP 1.0 session. %% Default: 400 {max_incoming_window, 5000}, %% Maximum link-credit RabbitMQ grants to AMQP 1.0 sender. %% Default: 128 {max_link_credit, 2000}, %% Maximum link-credit RabbitMQ AMQP 1.0 session grants to sending queue. %% Default: 256 {max_queue_credit, 5000}, {loopback_users, []} ]}, {rabbitmq_management_agent, [ {disable_metrics_collector, true} ]} ]. ``` Create the 3 streams and bindings to the fanout exchange: ``` deps/rabbitmq_management/bin/rabbitmqadmin declare queue queue_type=stream durable=true name=ssssssssssssssssssssssssssssssssssssssssssssssssssssssssssss1 && \ deps/rabbitmq_management/bin/rabbitmqadmin declare queue queue_type=stream durable=true name=ssssssssssssssssssssssssssssssssssssssssssssssssssssssssssss2 && \ deps/rabbitmq_management/bin/rabbitmqadmin declare queue queue_type=stream durable=true name=ssssssssssssssssssssssssssssssssssssssssssssssssssssssssssss3 && \ deps/rabbitmq_management/bin/rabbitmqadmin declare binding source=amq.fanout destination=ssssssssssssssssssssssssssssssssssssssssssssssssssssssssssss1 && \ deps/rabbitmq_management/bin/rabbitmqadmin declare binding source=amq.fanout destination=ssssssssssssssssssssssssssssssssssssssssssssssssssssssssssss2 && \ deps/rabbitmq_management/bin/rabbitmqadmin declare binding source=amq.fanout destination=ssssssssssssssssssssssssssssssssssssssssssssssssssssssssssss3 ``` Start the client: ``` quiver-arrow send //host.docker.internal//exchanges/amq.fanout --summary --count 1m --body-size 4 ``` `main` branch: ``` Count ............................................. 1,000,000 messages Duration ............................................... 16.3 seconds Message rate ......................................... 61,237 messages/s ``` with this PR: ``` Count ............................................. 1,000,000 messages Duration ............................................... 14.2 seconds Message rate ......................................... 70,309 messages/s ``` Hence, this PR increases the throughput when sending to 3 streams via AMQP by ~14%.
1 parent 512e6a4 commit 2687034

16 files changed

+231
-108
lines changed

deps/rabbit/src/amqqueue.erl

Lines changed: 44 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
new/9,
1515
new_with_version/9,
1616
new_with_version/10,
17+
new_target/4,
1718
fields/0,
1819
fields/1,
1920
field_vhost/0,
@@ -39,6 +40,7 @@
3940
% options
4041
get_options/1,
4142
set_options/2,
43+
get_extra_bcc/1,
4244
% pid
4345
get_pid/1,
4446
set_pid/2,
@@ -77,7 +79,8 @@
7779
qnode/1,
7880
to_printable/1,
7981
to_printable/2,
80-
macros/0]).
82+
macros/0
83+
]).
8184

8285
-define(record_version, amqqueue_v2).
8386
-define(is_backwards_compat_classic(T),
@@ -120,6 +123,17 @@
120123
type_state = #{} :: map() | ets:match_pattern()
121124
}).
122125

126+
%% A subset of the amqqueue record containing just the necessary fields
127+
%% to deliver a message to a target queue.
128+
-record(queue_target,
129+
{name :: rabbit_amqqueue:name(),
130+
type :: rabbit_queue_type:queue_type(),
131+
pid :: pid() | ra_server_id() | none,
132+
extra_bcc :: rabbit_misc:resource_name() | none
133+
}).
134+
135+
-opaque target() :: #queue_target{}.
136+
123137
-type amqqueue() :: amqqueue_v2().
124138
-type amqqueue_v2() :: #amqqueue{
125139
name :: rabbit_amqqueue:name(),
@@ -175,6 +189,7 @@
175189
amqqueue_v2/0,
176190
amqqueue_pattern/0,
177191
amqqueue_v2_pattern/0,
192+
target/0,
178193
ra_server_id/0]).
179194

180195
-spec new(rabbit_amqqueue:name(),
@@ -328,6 +343,17 @@ new_with_version(?record_version,
328343
options = Options,
329344
type = ensure_type_compat(Type)}.
330345

346+
-spec new_target(rabbit_amqqueue:name(),
347+
rabbit_queue_type:queue_type(),
348+
pid() | ra_server_id() | none,
349+
rabbit_misc:resource_name() | none) ->
350+
target().
351+
new_target(Name, Type, Pid, ExtraBcc) ->
352+
#queue_target{name = Name,
353+
type = Type,
354+
pid = Pid,
355+
extra_bcc = ExtraBcc}.
356+
331357
-spec is_amqqueue(any()) -> boolean().
332358

333359
is_amqqueue(#amqqueue{}) -> true.
@@ -361,15 +387,21 @@ set_arguments(#amqqueue{} = Queue, Args) ->
361387
% options
362388

363389
-spec get_options(amqqueue()) -> amqqueue_options().
364-
365390
get_options(#amqqueue{options = Options}) ->
366391
Options.
367392

368393
-spec set_options(amqqueue(), amqqueue_options()) -> amqqueue().
369-
370394
set_options(#amqqueue{} = Queue, Options) ->
371395
Queue#amqqueue{options = Options}.
372396

397+
-spec get_extra_bcc(amqqueue() | target()) ->
398+
rabbit_misc:resource_name() | none.
399+
get_extra_bcc(#amqqueue{options = #{extra_bcc := Name}}) ->
400+
Name;
401+
get_extra_bcc(#amqqueue{}) ->
402+
none;
403+
get_extra_bcc(#queue_target{extra_bcc = Name}) ->
404+
Name.
373405

374406
% decorators
375407

@@ -418,9 +450,10 @@ set_operator_policy(#amqqueue{} = Queue, Policy) ->
418450

419451
% name
420452

421-
-spec get_name(amqqueue()) -> rabbit_amqqueue:name().
453+
-spec get_name(amqqueue() | target()) -> rabbit_amqqueue:name().
422454

423-
get_name(#amqqueue{name = Name}) -> Name.
455+
get_name(#amqqueue{name = Name}) -> Name;
456+
get_name(#queue_target{name = Name}) -> Name.
424457

425458
-spec set_name(amqqueue(), rabbit_amqqueue:name()) -> amqqueue().
426459

@@ -429,9 +462,10 @@ set_name(#amqqueue{} = Queue, Name) ->
429462

430463
% pid
431464

432-
-spec get_pid(amqqueue_v2()) -> pid() | ra_server_id() | none.
465+
-spec get_pid(amqqueue_v2() | target()) -> pid() | ra_server_id() | none.
433466

434-
get_pid(#amqqueue{pid = Pid}) -> Pid.
467+
get_pid(#amqqueue{pid = Pid}) -> Pid;
468+
get_pid(#queue_target{pid = Pid}) -> Pid.
435469

436470
-spec set_pid(amqqueue_v2(), pid() | ra_server_id() | none) -> amqqueue_v2().
437471

@@ -488,9 +522,10 @@ set_state(#amqqueue{} = Queue, State) ->
488522

489523
%% New in v2.
490524

491-
-spec get_type(amqqueue()) -> atom().
525+
-spec get_type(amqqueue() | target()) -> atom().
492526

493-
get_type(#amqqueue{type = Type}) -> Type.
527+
get_type(#amqqueue{type = Type}) -> Type;
528+
get_type(#queue_target{type = Type}) -> Type.
494529

495530
-spec get_vhost(amqqueue()) -> rabbit_types:vhost() | undefined.
496531

@@ -629,8 +664,6 @@ to_printable(QName = #resource{name = Name, virtual_host = VHost}, Type) ->
629664
<<"virtual_host">> => VHost,
630665
<<"type">> => Type}.
631666

632-
% private
633-
634667
macros() ->
635668
io:format(
636669
"-define(is_~ts(Q), is_record(Q, amqqueue, ~b)).~n~n",

deps/rabbit/src/rabbit_amqp_session.erl

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2528,7 +2528,7 @@ incoming_link_transfer(
25282528
QNames = rabbit_exchange:route(X, Mc2, #{return_binding_keys => true}),
25292529
rabbit_trace:tap_in(Mc2, QNames, ConnName, ChannelNum, Username, Trace),
25302530
Opts = #{correlation => {HandleInt, DeliveryId}},
2531-
Qs0 = rabbit_amqqueue:lookup_many(QNames),
2531+
Qs0 = rabbit_db_queue:get_targets(QNames),
25322532
Qs = rabbit_amqqueue:prepend_extra_bcc(Qs0),
25332533
Mc = ensure_mc_cluster_compat(Mc2),
25342534
case rabbit_queue_type:deliver(Qs, Mc, Opts, QStates0) of
@@ -2674,8 +2674,8 @@ process_routing_confirm([], _SenderSettles = false, DeliveryId, U) ->
26742674
Disposition = released(DeliveryId),
26752675
{U, [Disposition]};
26762676
process_routing_confirm([_|_] = Qs, SenderSettles, DeliveryId, U0) ->
2677-
QNames = rabbit_amqqueue:queue_names(Qs),
26782677
false = maps:is_key(DeliveryId, U0),
2678+
QNames = rabbit_amqqueue:queue_names(Qs),
26792679
Map = maps:from_keys(QNames, ok),
26802680
U = U0#{DeliveryId => {Map, SenderSettles, false}},
26812681
rabbit_global_counters:messages_routed(?PROTOCOL, map_size(Map)),

deps/rabbit/src/rabbit_amqqueue.erl

Lines changed: 33 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -2051,65 +2051,60 @@ get_quorum_nodes(Q) ->
20512051
end.
20522052

20532053
-spec prepend_extra_bcc(Qs) ->
2054-
Qs when Qs :: [amqqueue:amqqueue() |
2055-
{amqqueue:amqqueue(), route_infos()}].
2054+
Qs when Qs :: [amqqueue:amqqueue() | amqqueue:target() |
2055+
{amqqueue:amqqueue() | amqqueue:target(), route_infos()}].
20562056
prepend_extra_bcc([]) ->
20572057
[];
20582058
prepend_extra_bcc([Q0] = Qs) ->
20592059
Q = queue(Q0),
2060-
case amqqueue:get_options(Q) of
2061-
#{extra_bcc := BCCName} ->
2062-
case get_bcc_queue(Q, BCCName) of
2060+
case amqqueue:get_extra_bcc(Q) of
2061+
none ->
2062+
Qs;
2063+
Name ->
2064+
case lookup_extra_bcc(Q, Name) of
20632065
{ok, BCCQueue} ->
20642066
[BCCQueue | Qs];
20652067
{error, not_found} ->
20662068
Qs
2067-
end;
2068-
_ ->
2069-
Qs
2069+
end
20702070
end;
20712071
prepend_extra_bcc(Qs) ->
2072-
BCCQueues =
2073-
lists:filtermap(
2074-
fun(Q0) ->
2075-
Q = queue(Q0),
2076-
case amqqueue:get_options(Q) of
2077-
#{extra_bcc := BCCName} ->
2078-
case get_bcc_queue(Q, BCCName) of
2079-
{ok, BCCQ} ->
2080-
{true, BCCQ};
2081-
{error, not_found} ->
2082-
false
2083-
end;
2084-
_ ->
2085-
false
2086-
end
2087-
end, Qs),
2088-
lists:usort(BCCQueues) ++ Qs.
2072+
ExtraQs = lists:filtermap(
2073+
fun(Q0) ->
2074+
Q = queue(Q0),
2075+
case amqqueue:get_extra_bcc(Q) of
2076+
none ->
2077+
false;
2078+
Name ->
2079+
case lookup_extra_bcc(Q, Name) of
2080+
{ok, BCCQ} ->
2081+
{true, BCCQ};
2082+
{error, not_found} ->
2083+
false
2084+
end
2085+
end
2086+
end, Qs),
2087+
lists:usort(ExtraQs) ++ Qs.
20892088

20902089
-spec queue(Q | {Q, route_infos()}) ->
2091-
Q when Q :: amqqueue:amqqueue().
2092-
queue(Q)
2093-
when ?is_amqqueue(Q) ->
2090+
Q when Q :: amqqueue:amqqueue() | amqqueue:target().
2091+
queue({Q, RouteInfos}) when is_map(RouteInfos) ->
20942092
Q;
2095-
queue({Q, RouteInfos})
2096-
when ?is_amqqueue(Q) andalso is_map(RouteInfos) ->
2093+
queue(Q) ->
20972094
Q.
20982095

20992096
-spec queue_names([Q | {Q, route_infos()}]) ->
2100-
[name()] when Q :: amqqueue:amqqueue().
2101-
queue_names(Queues)
2102-
when is_list(Queues) ->
2103-
lists:map(fun(Q) when ?is_amqqueue(Q) ->
2097+
[name()] when Q :: amqqueue:amqqueue() | amqqueue:target().
2098+
queue_names(Queues) ->
2099+
lists:map(fun({Q, RouteInfos}) when is_map(RouteInfos) ->
21042100
amqqueue:get_name(Q);
2105-
({Q, RouteInfos})
2106-
when ?is_amqqueue(Q) andalso is_map(RouteInfos) ->
2101+
(Q) ->
21072102
amqqueue:get_name(Q)
21082103
end, Queues).
21092104

2110-
-spec get_bcc_queue(amqqueue:amqqueue(), binary()) ->
2105+
-spec lookup_extra_bcc(amqqueue:amqqueue() | amqqueue:target(), binary()) ->
21112106
{ok, amqqueue:amqqueue()} | {error, not_found}.
2112-
get_bcc_queue(Q, BCCName) ->
2107+
lookup_extra_bcc(Q, BCCName) ->
21132108
#resource{virtual_host = VHost} = amqqueue:get_name(Q),
21142109
BCCQueueName = rabbit_misc:r(VHost, queue, BCCName),
21152110
lookup(BCCQueueName).

deps/rabbit/src/rabbit_channel.erl

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1222,7 +1222,7 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin,
12221222
check_user_id_header(Message0, User),
12231223
Message = rabbit_msg_interceptor:intercept_incoming(Message0, MsgIcptCtx),
12241224
QNames = rabbit_exchange:route(Exchange, Message, #{return_binding_keys => true}),
1225-
Queues = rabbit_amqqueue:lookup_many(QNames),
1225+
Queues = rabbit_db_queue:get_targets(QNames),
12261226
rabbit_trace:tap_in(Message, QNames, ConnName, ChannelNum,
12271227
Username, TraceState),
12281228
%% TODO: call delivery_to_queues with plain args
@@ -2126,7 +2126,12 @@ deliver_to_queues(XName,
21262126
rabbit_misc:protocol_error(
21272127
resource_error,
21282128
"Stream coordinator unavailable for ~ts",
2129-
[rabbit_misc:rs(Resource)])
2129+
[rabbit_misc:rs(Resource)]);
2130+
{error, Reason} ->
2131+
rabbit_misc:protocol_error(
2132+
resource_error,
2133+
"failed to deliver message: ~tp",
2134+
[Reason])
21302135
end.
21312136

21322137
process_routing_mandatory(_Mandatory = true,

deps/rabbit/src/rabbit_classic_queue.erl

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -287,16 +287,17 @@ format(Q, _Ctx) when ?is_amqqueue(Q) ->
287287
{state, State},
288288
{node, node(amqqueue:get_pid(Q))}].
289289

290-
-spec init(amqqueue:amqqueue()) -> {ok, state()}.
291-
init(Q) when ?amqqueue_is_classic(Q) ->
290+
-spec init(amqqueue:amqqueue() | amqqueue:target()) ->
291+
{ok, state()}.
292+
init(Q) ->
292293
{ok, #?STATE{pid = amqqueue:get_pid(Q)}}.
293294

294295
-spec close(state()) -> ok.
295296
close(_State) ->
296297
ok.
297298

298-
-spec update(amqqueue:amqqueue(), state()) -> state().
299-
update(Q, #?STATE{pid = Pid} = State) when ?amqqueue_is_classic(Q) ->
299+
-spec update(amqqueue:amqqueue() | amqqueue:target(), state()) -> state().
300+
update(Q, #?STATE{pid = Pid} = State) ->
300301
case amqqueue:get_pid(Q) of
301302
Pid ->
302303
State;
@@ -473,7 +474,7 @@ settlement_action(Type, QRef, MsgSeqs, Acc) ->
473474

474475
supports_stateful_delivery() -> true.
475476

476-
-spec deliver([{amqqueue:amqqueue(), state()}],
477+
-spec deliver([{amqqueue:amqqueue() | amqqueue:target(), state()}],
477478
Delivery :: mc:state(),
478479
rabbit_queue_type:delivery_options()) ->
479480
{[{amqqueue:amqqueue(), state()}], rabbit_queue_type:actions()}.

deps/rabbit/src/rabbit_db_queue.erl

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
-export([
2020
get/1,
2121
get_many/1,
22+
get_targets/1,
2223
get_all/0,
2324
get_all/1,
2425
get_all_by_type/1,
@@ -85,6 +86,7 @@
8586
-define(MNESIA_DURABLE_TABLE, rabbit_durable_queue).
8687

8788
-define(KHEPRI_PROJECTION, rabbit_khepri_queue).
89+
-define(KHEPRI_TARGET_PROJECTION, rabbit_khepri_queue_target).
8890

8991
%% -------------------------------------------------------------------
9092
%% get_all().
@@ -468,6 +470,57 @@ internal_delete_in_mnesia(QueueName, OnlyDurable, Reason) ->
468470
%% after the transaction.
469471
rabbit_db_binding:delete_for_destination_in_mnesia(QueueName, OnlyDurable).
470472

473+
%% -------------------------------------------------------------------
474+
%% get_targets().
475+
%% -------------------------------------------------------------------
476+
477+
%% Queue target optimisation is only available in Khepri.
478+
%% Mnesia falls back looking up the full amqqueue record.
479+
-spec get_targets(rabbit_exchange:route_return()) ->
480+
[amqqueue:target() | amqqueue:amqqueue() |
481+
{amqqueue:target() | amqqueue:amqqueue(), rabbit_exchange:route_infos()}].
482+
get_targets(Names) ->
483+
rabbit_khepri:handle_fallback(
484+
#{mnesia => fun() -> get_many_in_ets(?MNESIA_TABLE, Names) end,
485+
khepri => fun() -> lookup_targets(Names) end
486+
}).
487+
488+
lookup_targets(Names) ->
489+
lists:filtermap(fun({Name, RouteInfos})
490+
when is_map(RouteInfos) ->
491+
case lookup_target(Name) of
492+
not_found -> false;
493+
Target -> {true, {Target, RouteInfos}}
494+
end;
495+
(Name) ->
496+
case lookup_target(Name) of
497+
not_found -> false;
498+
Target -> {true, Target}
499+
end
500+
end, Names).
501+
502+
lookup_target(#resource{name = NameBin} = Name) ->
503+
case rabbit_volatile_queue:is(NameBin) of
504+
true ->
505+
%% This queue is not stored in the database.
506+
%% We create it on the fly.
507+
case rabbit_volatile_queue:new(Name) of
508+
error -> not_found;
509+
Q -> Q
510+
end;
511+
false ->
512+
try
513+
ets:lookup_element(?KHEPRI_TARGET_PROJECTION, Name, 2, not_found) of
514+
{Type, Pid, ExtraBcc} ->
515+
amqqueue:new_target(Name, Type, Pid, ExtraBcc);
516+
not_found ->
517+
not_found
518+
catch
519+
error:badarg ->
520+
not_found
521+
end
522+
end.
523+
471524
%% -------------------------------------------------------------------
472525
%% get_many().
473526
%% -------------------------------------------------------------------

deps/rabbit/src/rabbit_dead_letter.erl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ publish(Msg0, Reason, #exchange{name = XName} = DLX, RK,
4444
Routed0 = rabbit_exchange:route(DLX, DLMsg, #{return_binding_keys => true}),
4545
{Cycles, Routed} = detect_cycles(Reason, DLMsg, Routed0),
4646
lists:foreach(fun log_cycle_once/1, Cycles),
47-
Qs0 = rabbit_amqqueue:lookup_many(Routed),
47+
Qs0 = rabbit_db_queue:get_targets(Routed),
4848
Qs = rabbit_amqqueue:prepend_extra_bcc(Qs0),
4949
_ = rabbit_queue_type:deliver(Qs, DLMsg, #{}, stateless),
5050
ok.

0 commit comments

Comments
 (0)