Skip to content

Commit 36d0c84

Browse files
ansdmergify[bot]
authored andcommitted
Reduce ETS copy overhead when delivering to target queues (#14570)
* Reduce ETS copy overhead when delivering to target queues ## What? This commit avoids copying the full amqqueue record from ETS per incoming message and target queue. The amqqueue record contains 21 elements and for some queue types, especially streams, some elements are themselves nested terms. ## How? In Khepri, use a new `rabbit_khepri_queue_target` projection which contains a subset of the full amqqueue record. This way all relevant information to deliver to a target queue can be looked up in a single ets:lookup_element call. Alternative approaches are described in erlang/otp#10211 ## Benchmark Fanout to 3 streams Start broker: ``` make run-broker TEST_TMPDIR="$HOME/scratch/rabbit/test" \ FULL=1 \ RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS="+S 5" \ RABBITMQ_CONFIG_FILE="$HOME/scratch/rabbit/high-credit.config" \ PLUGINS="rabbitmq_management" ``` `high-credit.config` contains: ``` [ {rabbit, [ %% Maximum incoming-window of AMQP 1.0 session. %% Default: 400 {max_incoming_window, 5000}, %% Maximum link-credit RabbitMQ grants to AMQP 1.0 sender. %% Default: 128 {max_link_credit, 2000}, %% Maximum link-credit RabbitMQ AMQP 1.0 session grants to sending queue. %% Default: 256 {max_queue_credit, 5000}, {loopback_users, []} ]}, {rabbitmq_management_agent, [ {disable_metrics_collector, true} ]} ]. ``` Create the 3 streams and bindings to the fanout exchange: ``` deps/rabbitmq_management/bin/rabbitmqadmin declare queue queue_type=stream durable=true name=ssssssssssssssssssssssssssssssssssssssssssssssssssssssssssss1 && \ deps/rabbitmq_management/bin/rabbitmqadmin declare queue queue_type=stream durable=true name=ssssssssssssssssssssssssssssssssssssssssssssssssssssssssssss2 && \ deps/rabbitmq_management/bin/rabbitmqadmin declare queue queue_type=stream durable=true name=ssssssssssssssssssssssssssssssssssssssssssssssssssssssssssss3 && \ deps/rabbitmq_management/bin/rabbitmqadmin declare binding source=amq.fanout destination=ssssssssssssssssssssssssssssssssssssssssssssssssssssssssssss1 && \ deps/rabbitmq_management/bin/rabbitmqadmin declare binding source=amq.fanout destination=ssssssssssssssssssssssssssssssssssssssssssssssssssssssssssss2 && \ deps/rabbitmq_management/bin/rabbitmqadmin declare binding source=amq.fanout destination=ssssssssssssssssssssssssssssssssssssssssssssssssssssssssssss3 ``` Start the client: ``` quiver-arrow send //host.docker.internal//exchanges/amq.fanout --summary --count 1m --body-size 4 ``` `main` branch: ``` Count ............................................. 1,000,000 messages Duration ............................................... 16.3 seconds Message rate ......................................... 61,237 messages/s ``` with this PR: ``` Count ............................................. 1,000,000 messages Duration ............................................... 14.2 seconds Message rate ......................................... 70,309 messages/s ``` Hence, this PR increases the throughput when sending to 3 streams via AMQP by ~14%. * Avoid creating 5 elems tuple * Simplify rabbit_queue_type callbacks deliver should only take targets and init should only take the full record * Fix flaky test * Fix specs (cherry picked from commit 2e75bc6)
1 parent a4b0437 commit 36d0c84

20 files changed

+252
-178
lines changed

deps/rabbit/src/amqqueue.erl

Lines changed: 43 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
new/9,
1515
new_with_version/9,
1616
new_with_version/10,
17+
new_target/2,
1718
fields/0,
1819
fields/1,
1920
field_vhost/0,
@@ -39,6 +40,7 @@
3940
% options
4041
get_options/1,
4142
set_options/2,
43+
get_extra_bcc/1,
4244
% pid
4345
get_pid/1,
4446
set_pid/2,
@@ -77,13 +79,15 @@
7779
qnode/1,
7880
to_printable/1,
7981
to_printable/2,
80-
macros/0]).
82+
macros/0
83+
]).
8184

8285
-define(record_version, amqqueue_v2).
8386
-define(is_backwards_compat_classic(T),
8487
(T =:= classic orelse T =:= ?amqqueue_v1_type)).
8588

8689
-type amqqueue_options() :: map() | ets:match_pattern().
90+
-type extra_bcc() :: rabbit_misc:resource_name() | none.
8791

8892
-record(amqqueue, {
8993
%% immutable
@@ -120,6 +124,17 @@
120124
type_state = #{} :: map() | ets:match_pattern()
121125
}).
122126

127+
%% A subset of the amqqueue record containing just the necessary fields
128+
%% to deliver a message to a target queue.
129+
-record(queue_target,
130+
{name :: rabbit_amqqueue:name(),
131+
target :: {rabbit_queue_type:queue_type(),
132+
pid() | ra_server_id() | none,
133+
extra_bcc()}
134+
}).
135+
136+
-opaque target() :: #queue_target{}.
137+
123138
-type amqqueue() :: amqqueue_v2().
124139
-type amqqueue_v2() :: #amqqueue{
125140
name :: rabbit_amqqueue:name(),
@@ -175,6 +190,7 @@
175190
amqqueue_v2/0,
176191
amqqueue_pattern/0,
177192
amqqueue_v2_pattern/0,
193+
target/0,
178194
ra_server_id/0]).
179195

180196
-spec new(rabbit_amqqueue:name(),
@@ -328,6 +344,15 @@ new_with_version(?record_version,
328344
options = Options,
329345
type = ensure_type_compat(Type)}.
330346

347+
-spec new_target(rabbit_amqqueue:name(),
348+
{rabbit_queue_type:queue_type(),
349+
pid() | ra_server_id() | none,
350+
extra_bcc()}) ->
351+
target().
352+
new_target(Name, Target) when tuple_size(Target) =:= 3 ->
353+
#queue_target{name = Name,
354+
target = Target}.
355+
331356
-spec is_amqqueue(any()) -> boolean().
332357

333358
is_amqqueue(#amqqueue{}) -> true.
@@ -361,15 +386,21 @@ set_arguments(#amqqueue{} = Queue, Args) ->
361386
% options
362387

363388
-spec get_options(amqqueue()) -> amqqueue_options().
364-
365389
get_options(#amqqueue{options = Options}) ->
366390
Options.
367391

368392
-spec set_options(amqqueue(), amqqueue_options()) -> amqqueue().
369-
370393
set_options(#amqqueue{} = Queue, Options) ->
371394
Queue#amqqueue{options = Options}.
372395

396+
-spec get_extra_bcc(amqqueue() | target()) ->
397+
extra_bcc().
398+
get_extra_bcc(#amqqueue{options = #{extra_bcc := ExtraBcc}}) ->
399+
ExtraBcc;
400+
get_extra_bcc(#amqqueue{}) ->
401+
none;
402+
get_extra_bcc(#queue_target{target = {_Type, _Pid, ExtraBcc}}) ->
403+
ExtraBcc.
373404

374405
% decorators
375406

@@ -418,9 +449,10 @@ set_operator_policy(#amqqueue{} = Queue, Policy) ->
418449

419450
% name
420451

421-
-spec get_name(amqqueue()) -> rabbit_amqqueue:name().
452+
-spec get_name(amqqueue() | target()) -> rabbit_amqqueue:name().
422453

423-
get_name(#amqqueue{name = Name}) -> Name.
454+
get_name(#amqqueue{name = Name}) -> Name;
455+
get_name(#queue_target{name = Name}) -> Name.
424456

425457
-spec set_name(amqqueue(), rabbit_amqqueue:name()) -> amqqueue().
426458

@@ -429,9 +461,10 @@ set_name(#amqqueue{} = Queue, Name) ->
429461

430462
% pid
431463

432-
-spec get_pid(amqqueue_v2()) -> pid() | ra_server_id() | none.
464+
-spec get_pid(amqqueue_v2() | target()) -> pid() | ra_server_id() | none.
433465

434-
get_pid(#amqqueue{pid = Pid}) -> Pid.
466+
get_pid(#amqqueue{pid = Pid}) -> Pid;
467+
get_pid(#queue_target{target = {_Type, Pid, _ExtraBcc}}) -> Pid.
435468

436469
-spec set_pid(amqqueue_v2(), pid() | ra_server_id() | none) -> amqqueue_v2().
437470

@@ -488,9 +521,10 @@ set_state(#amqqueue{} = Queue, State) ->
488521

489522
%% New in v2.
490523

491-
-spec get_type(amqqueue()) -> atom().
524+
-spec get_type(amqqueue() | target()) -> atom().
492525

493-
get_type(#amqqueue{type = Type}) -> Type.
526+
get_type(#amqqueue{type = Type}) -> Type;
527+
get_type(#queue_target{target = {Type, _Pid, _ExtraBcc}}) -> Type.
494528

495529
-spec get_vhost(amqqueue()) -> rabbit_types:vhost() | undefined.
496530

@@ -629,8 +663,6 @@ to_printable(QName = #resource{name = Name, virtual_host = VHost}, Type) ->
629663
<<"virtual_host">> => VHost,
630664
<<"type">> => Type}.
631665

632-
% private
633-
634666
macros() ->
635667
io:format(
636668
"-define(is_~ts(Q), is_record(Q, amqqueue, ~b)).~n~n",

deps/rabbit/src/rabbit_amqp_session.erl

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2528,7 +2528,7 @@ incoming_link_transfer(
25282528
QNames = rabbit_exchange:route(X, Mc2, #{return_binding_keys => true}),
25292529
rabbit_trace:tap_in(Mc2, QNames, ConnName, ChannelNum, Username, Trace),
25302530
Opts = #{correlation => {HandleInt, DeliveryId}},
2531-
Qs0 = rabbit_amqqueue:lookup_many(QNames),
2531+
Qs0 = rabbit_db_queue:get_targets(QNames),
25322532
Qs = rabbit_amqqueue:prepend_extra_bcc(Qs0),
25332533
Mc = ensure_mc_cluster_compat(Mc2),
25342534
case rabbit_queue_type:deliver(Qs, Mc, Opts, QStates0) of
@@ -2674,8 +2674,8 @@ process_routing_confirm([], _SenderSettles = false, DeliveryId, U) ->
26742674
Disposition = released(DeliveryId),
26752675
{U, [Disposition]};
26762676
process_routing_confirm([_|_] = Qs, SenderSettles, DeliveryId, U0) ->
2677-
QNames = rabbit_amqqueue:queue_names(Qs),
26782677
false = maps:is_key(DeliveryId, U0),
2678+
QNames = rabbit_amqqueue:queue_names(Qs),
26792679
Map = maps:from_keys(QNames, ok),
26802680
U = U0#{DeliveryId => {Map, SenderSettles, false}},
26812681
rabbit_global_counters:messages_routed(?PROTOCOL, map_size(Map)),

deps/rabbit/src/rabbit_amqqueue.erl

Lines changed: 34 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
delete_immediately/1, delete_exclusive/2, delete/4, purge/1,
1212
forget_all_durable/1]).
1313
-export([pseudo_queue/2, pseudo_queue/3]).
14-
-export([exists/1, lookup/1, lookup/2, lookup_many/1, lookup_durable_queue/1,
14+
-export([exists/1, lookup/1, lookup/2, lookup_durable_queue/1,
1515
not_found_or_absent_dirty/1,
1616
with/2, with/3, with_or_die/2,
1717
assert_equivalence/5,
@@ -367,14 +367,6 @@ lookup(Name) when is_record(Name, resource) ->
367367
lookup_durable_queue(QName) ->
368368
rabbit_db_queue:get_durable(QName).
369369

370-
-spec lookup_many(rabbit_exchange:route_return()) ->
371-
[amqqueue:amqqueue() | {amqqueue:amqqueue(), route_infos()}].
372-
lookup_many([]) ->
373-
%% optimisation
374-
[];
375-
lookup_many(Names) when is_list(Names) ->
376-
rabbit_db_queue:get_many(Names).
377-
378370
-spec lookup(binary(), binary()) ->
379371
rabbit_types:ok(amqqueue:amqqueue()) |
380372
rabbit_types:error('not_found').
@@ -2051,68 +2043,57 @@ get_quorum_nodes(Q) ->
20512043
end.
20522044

20532045
-spec prepend_extra_bcc(Qs) ->
2054-
Qs when Qs :: [amqqueue:amqqueue() |
2055-
{amqqueue:amqqueue(), route_infos()}].
2046+
Qs when Qs :: [amqqueue:target() | {amqqueue:target(), route_infos()}].
20562047
prepend_extra_bcc([]) ->
20572048
[];
20582049
prepend_extra_bcc([Q0] = Qs) ->
20592050
Q = queue(Q0),
2060-
case amqqueue:get_options(Q) of
2061-
#{extra_bcc := BCCName} ->
2062-
case get_bcc_queue(Q, BCCName) of
2063-
{ok, BCCQueue} ->
2064-
[BCCQueue | Qs];
2065-
{error, not_found} ->
2066-
Qs
2067-
end;
2068-
_ ->
2069-
Qs
2051+
case amqqueue:get_extra_bcc(Q) of
2052+
none ->
2053+
Qs;
2054+
Name ->
2055+
lookup_extra_bcc(Q, Name) ++ Qs
20702056
end;
20712057
prepend_extra_bcc(Qs) ->
2072-
BCCQueues =
2073-
lists:filtermap(
2074-
fun(Q0) ->
2075-
Q = queue(Q0),
2076-
case amqqueue:get_options(Q) of
2077-
#{extra_bcc := BCCName} ->
2078-
case get_bcc_queue(Q, BCCName) of
2079-
{ok, BCCQ} ->
2080-
{true, BCCQ};
2081-
{error, not_found} ->
2082-
false
2083-
end;
2084-
_ ->
2085-
false
2086-
end
2087-
end, Qs),
2088-
lists:usort(BCCQueues) ++ Qs.
2058+
ExtraQs = lists:filtermap(
2059+
fun(Q0) ->
2060+
Q = queue(Q0),
2061+
case amqqueue:get_extra_bcc(Q) of
2062+
none ->
2063+
false;
2064+
Name ->
2065+
case lookup_extra_bcc(Q, Name) of
2066+
[ExtraQ] ->
2067+
{true, ExtraQ};
2068+
[] ->
2069+
false
2070+
end
2071+
end
2072+
end, Qs),
2073+
lists:usort(ExtraQs) ++ Qs.
20892074

20902075
-spec queue(Q | {Q, route_infos()}) ->
2091-
Q when Q :: amqqueue:amqqueue().
2092-
queue(Q)
2093-
when ?is_amqqueue(Q) ->
2076+
Q when Q :: amqqueue:target().
2077+
queue({Q, RouteInfos}) when is_map(RouteInfos) ->
20942078
Q;
2095-
queue({Q, RouteInfos})
2096-
when ?is_amqqueue(Q) andalso is_map(RouteInfos) ->
2079+
queue(Q) ->
20972080
Q.
20982081

20992082
-spec queue_names([Q | {Q, route_infos()}]) ->
2100-
[name()] when Q :: amqqueue:amqqueue().
2101-
queue_names(Queues)
2102-
when is_list(Queues) ->
2103-
lists:map(fun(Q) when ?is_amqqueue(Q) ->
2083+
[name()] when Q :: amqqueue:target().
2084+
queue_names(Queues) ->
2085+
lists:map(fun({Q, RouteInfos}) when is_map(RouteInfos) ->
21042086
amqqueue:get_name(Q);
2105-
({Q, RouteInfos})
2106-
when ?is_amqqueue(Q) andalso is_map(RouteInfos) ->
2087+
(Q) ->
21072088
amqqueue:get_name(Q)
21082089
end, Queues).
21092090

2110-
-spec get_bcc_queue(amqqueue:amqqueue(), binary()) ->
2111-
{ok, amqqueue:amqqueue()} | {error, not_found}.
2112-
get_bcc_queue(Q, BCCName) ->
2091+
-spec lookup_extra_bcc(amqqueue:target(), binary()) ->
2092+
[amqqueue:target()].
2093+
lookup_extra_bcc(Q, BCCName) ->
21132094
#resource{virtual_host = VHost} = amqqueue:get_name(Q),
21142095
BCCQueueName = rabbit_misc:r(VHost, queue, BCCName),
2115-
lookup(BCCQueueName).
2096+
rabbit_db_queue:get_targets([BCCQueueName]).
21162097

21172098
is_queue_args_combination_permitted(Q) ->
21182099
Durable = amqqueue:is_durable(Q),

deps/rabbit/src/rabbit_channel.erl

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1222,7 +1222,7 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin,
12221222
check_user_id_header(Message0, User),
12231223
Message = rabbit_msg_interceptor:intercept_incoming(Message0, MsgIcptCtx),
12241224
QNames = rabbit_exchange:route(Exchange, Message, #{return_binding_keys => true}),
1225-
Queues = rabbit_amqqueue:lookup_many(QNames),
1225+
Queues = rabbit_db_queue:get_targets(QNames),
12261226
rabbit_trace:tap_in(Message, QNames, ConnName, ChannelNum,
12271227
Username, TraceState),
12281228
%% TODO: call delivery_to_queues with plain args
@@ -2126,7 +2126,12 @@ deliver_to_queues(XName,
21262126
rabbit_misc:protocol_error(
21272127
resource_error,
21282128
"Stream coordinator unavailable for ~ts",
2129-
[rabbit_misc:rs(Resource)])
2129+
[rabbit_misc:rs(Resource)]);
2130+
{error, Reason} ->
2131+
rabbit_misc:protocol_error(
2132+
resource_error,
2133+
"failed to deliver message: ~tp",
2134+
[Reason])
21302135
end.
21312136

21322137
process_routing_mandatory(_Mandatory = true,

deps/rabbit/src/rabbit_classic_queue.erl

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -295,8 +295,8 @@ init(Q) when ?amqqueue_is_classic(Q) ->
295295
close(_State) ->
296296
ok.
297297

298-
-spec update(amqqueue:amqqueue(), state()) -> state().
299-
update(Q, #?STATE{pid = Pid} = State) when ?amqqueue_is_classic(Q) ->
298+
-spec update(amqqueue:amqqueue() | amqqueue:target(), state()) -> state().
299+
update(Q, #?STATE{pid = Pid} = State) ->
300300
case amqqueue:get_pid(Q) of
301301
Pid ->
302302
State;
@@ -473,10 +473,10 @@ settlement_action(Type, QRef, MsgSeqs, Acc) ->
473473

474474
supports_stateful_delivery() -> true.
475475

476-
-spec deliver([{amqqueue:amqqueue(), state()}],
476+
-spec deliver([{amqqueue:target(), state()}],
477477
Delivery :: mc:state(),
478478
rabbit_queue_type:delivery_options()) ->
479-
{[{amqqueue:amqqueue(), state()}], rabbit_queue_type:actions()}.
479+
{[{amqqueue:target(), state()}], rabbit_queue_type:actions()}.
480480
deliver(Qs0, Msg0, Options) ->
481481
%% add guid to content here instead of in rabbit_basic:message/3,
482482
%% as classic queues are the only ones that need it

0 commit comments

Comments
 (0)