From 36d0c841f74a724ea243873358aec2cbee14a04d Mon Sep 17 00:00:00 2001 From: David Ansari Date: Thu, 25 Sep 2025 11:25:09 +0200 Subject: [PATCH] Reduce ETS copy overhead when delivering to target queues (#14570) * Reduce ETS copy overhead when delivering to target queues ## What? This commit avoids copying the full amqqueue record from ETS per incoming message and target queue. The amqqueue record contains 21 elements and for some queue types, especially streams, some elements are themselves nested terms. ## How? In Khepri, use a new `rabbit_khepri_queue_target` projection which contains a subset of the full amqqueue record. This way all relevant information to deliver to a target queue can be looked up in a single ets:lookup_element call. Alternative approaches are described in https://github.com/erlang/otp/issues/10211 ## Benchmark Fanout to 3 streams Start broker: ``` make run-broker TEST_TMPDIR="$HOME/scratch/rabbit/test" \ FULL=1 \ RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS="+S 5" \ RABBITMQ_CONFIG_FILE="$HOME/scratch/rabbit/high-credit.config" \ PLUGINS="rabbitmq_management" ``` `high-credit.config` contains: ``` [ {rabbit, [ %% Maximum incoming-window of AMQP 1.0 session. %% Default: 400 {max_incoming_window, 5000}, %% Maximum link-credit RabbitMQ grants to AMQP 1.0 sender. %% Default: 128 {max_link_credit, 2000}, %% Maximum link-credit RabbitMQ AMQP 1.0 session grants to sending queue. %% Default: 256 {max_queue_credit, 5000}, {loopback_users, []} ]}, {rabbitmq_management_agent, [ {disable_metrics_collector, true} ]} ]. ``` Create the 3 streams and bindings to the fanout exchange: ``` deps/rabbitmq_management/bin/rabbitmqadmin declare queue queue_type=stream durable=true name=ssssssssssssssssssssssssssssssssssssssssssssssssssssssssssss1 && \ deps/rabbitmq_management/bin/rabbitmqadmin declare queue queue_type=stream durable=true name=ssssssssssssssssssssssssssssssssssssssssssssssssssssssssssss2 && \ deps/rabbitmq_management/bin/rabbitmqadmin declare queue queue_type=stream durable=true name=ssssssssssssssssssssssssssssssssssssssssssssssssssssssssssss3 && \ deps/rabbitmq_management/bin/rabbitmqadmin declare binding source=amq.fanout destination=ssssssssssssssssssssssssssssssssssssssssssssssssssssssssssss1 && \ deps/rabbitmq_management/bin/rabbitmqadmin declare binding source=amq.fanout destination=ssssssssssssssssssssssssssssssssssssssssssssssssssssssssssss2 && \ deps/rabbitmq_management/bin/rabbitmqadmin declare binding source=amq.fanout destination=ssssssssssssssssssssssssssssssssssssssssssssssssssssssssssss3 ``` Start the client: ``` quiver-arrow send //host.docker.internal//exchanges/amq.fanout --summary --count 1m --body-size 4 ``` `main` branch: ``` Count ............................................. 1,000,000 messages Duration ............................................... 16.3 seconds Message rate ......................................... 61,237 messages/s ``` with this PR: ``` Count ............................................. 1,000,000 messages Duration ............................................... 14.2 seconds Message rate ......................................... 70,309 messages/s ``` Hence, this PR increases the throughput when sending to 3 streams via AMQP by ~14%. * Avoid creating 5 elems tuple * Simplify rabbit_queue_type callbacks deliver should only take targets and init should only take the full record * Fix flaky test * Fix specs (cherry picked from commit 2e75bc6eb5a25259e7bb202741818eedcf4b93b8) --- deps/rabbit/src/amqqueue.erl | 54 ++++++++--- deps/rabbit/src/rabbit_amqp_session.erl | 4 +- deps/rabbit/src/rabbit_amqqueue.erl | 87 +++++++----------- deps/rabbit/src/rabbit_channel.erl | 9 +- deps/rabbit/src/rabbit_classic_queue.erl | 8 +- deps/rabbit/src/rabbit_db_queue.erl | 89 +++++++++++-------- deps/rabbit/src/rabbit_dead_letter.erl | 2 +- deps/rabbit/src/rabbit_fifo_dlx_worker.erl | 12 +-- deps/rabbit/src/rabbit_khepri.erl | 21 ++++- deps/rabbit/src/rabbit_queue_type.erl | 67 +++++++------- deps/rabbit/src/rabbit_quorum_queue.erl | 7 +- deps/rabbit/src/rabbit_stream_coordinator.erl | 15 ++-- deps/rabbit/src/rabbit_stream_queue.erl | 5 +- deps/rabbit/src/rabbit_volatile_queue.erl | 11 +++ deps/rabbit/test/rabbit_db_queue_SUITE.erl | 20 +++-- .../test/clustering_SUITE.erl | 3 +- .../src/rabbit_mqtt_processor.erl | 2 +- .../src/rabbit_mqtt_qos0_queue.erl | 2 +- .../rabbit_exchange_type_recent_history.erl | 10 +-- .../src/rabbit_local_shovel.erl | 2 +- 20 files changed, 252 insertions(+), 178 deletions(-) diff --git a/deps/rabbit/src/amqqueue.erl b/deps/rabbit/src/amqqueue.erl index 8bf5a2345f19..3c958d90fea0 100644 --- a/deps/rabbit/src/amqqueue.erl +++ b/deps/rabbit/src/amqqueue.erl @@ -14,6 +14,7 @@ new/9, new_with_version/9, new_with_version/10, + new_target/2, fields/0, fields/1, field_vhost/0, @@ -39,6 +40,7 @@ % options get_options/1, set_options/2, + get_extra_bcc/1, % pid get_pid/1, set_pid/2, @@ -77,13 +79,15 @@ qnode/1, to_printable/1, to_printable/2, - macros/0]). + macros/0 + ]). -define(record_version, amqqueue_v2). -define(is_backwards_compat_classic(T), (T =:= classic orelse T =:= ?amqqueue_v1_type)). -type amqqueue_options() :: map() | ets:match_pattern(). +-type extra_bcc() :: rabbit_misc:resource_name() | none. -record(amqqueue, { %% immutable @@ -120,6 +124,17 @@ type_state = #{} :: map() | ets:match_pattern() }). +%% A subset of the amqqueue record containing just the necessary fields +%% to deliver a message to a target queue. +-record(queue_target, + {name :: rabbit_amqqueue:name(), + target :: {rabbit_queue_type:queue_type(), + pid() | ra_server_id() | none, + extra_bcc()} + }). + +-opaque target() :: #queue_target{}. + -type amqqueue() :: amqqueue_v2(). -type amqqueue_v2() :: #amqqueue{ name :: rabbit_amqqueue:name(), @@ -175,6 +190,7 @@ amqqueue_v2/0, amqqueue_pattern/0, amqqueue_v2_pattern/0, + target/0, ra_server_id/0]). -spec new(rabbit_amqqueue:name(), @@ -328,6 +344,15 @@ new_with_version(?record_version, options = Options, type = ensure_type_compat(Type)}. +-spec new_target(rabbit_amqqueue:name(), + {rabbit_queue_type:queue_type(), + pid() | ra_server_id() | none, + extra_bcc()}) -> + target(). +new_target(Name, Target) when tuple_size(Target) =:= 3 -> + #queue_target{name = Name, + target = Target}. + -spec is_amqqueue(any()) -> boolean(). is_amqqueue(#amqqueue{}) -> true. @@ -361,15 +386,21 @@ set_arguments(#amqqueue{} = Queue, Args) -> % options -spec get_options(amqqueue()) -> amqqueue_options(). - get_options(#amqqueue{options = Options}) -> Options. -spec set_options(amqqueue(), amqqueue_options()) -> amqqueue(). - set_options(#amqqueue{} = Queue, Options) -> Queue#amqqueue{options = Options}. +-spec get_extra_bcc(amqqueue() | target()) -> + extra_bcc(). +get_extra_bcc(#amqqueue{options = #{extra_bcc := ExtraBcc}}) -> + ExtraBcc; +get_extra_bcc(#amqqueue{}) -> + none; +get_extra_bcc(#queue_target{target = {_Type, _Pid, ExtraBcc}}) -> + ExtraBcc. % decorators @@ -418,9 +449,10 @@ set_operator_policy(#amqqueue{} = Queue, Policy) -> % name --spec get_name(amqqueue()) -> rabbit_amqqueue:name(). +-spec get_name(amqqueue() | target()) -> rabbit_amqqueue:name(). -get_name(#amqqueue{name = Name}) -> Name. +get_name(#amqqueue{name = Name}) -> Name; +get_name(#queue_target{name = Name}) -> Name. -spec set_name(amqqueue(), rabbit_amqqueue:name()) -> amqqueue(). @@ -429,9 +461,10 @@ set_name(#amqqueue{} = Queue, Name) -> % pid --spec get_pid(amqqueue_v2()) -> pid() | ra_server_id() | none. +-spec get_pid(amqqueue_v2() | target()) -> pid() | ra_server_id() | none. -get_pid(#amqqueue{pid = Pid}) -> Pid. +get_pid(#amqqueue{pid = Pid}) -> Pid; +get_pid(#queue_target{target = {_Type, Pid, _ExtraBcc}}) -> Pid. -spec set_pid(amqqueue_v2(), pid() | ra_server_id() | none) -> amqqueue_v2(). @@ -488,9 +521,10 @@ set_state(#amqqueue{} = Queue, State) -> %% New in v2. --spec get_type(amqqueue()) -> atom(). +-spec get_type(amqqueue() | target()) -> atom(). -get_type(#amqqueue{type = Type}) -> Type. +get_type(#amqqueue{type = Type}) -> Type; +get_type(#queue_target{target = {Type, _Pid, _ExtraBcc}}) -> Type. -spec get_vhost(amqqueue()) -> rabbit_types:vhost() | undefined. @@ -629,8 +663,6 @@ to_printable(QName = #resource{name = Name, virtual_host = VHost}, Type) -> <<"virtual_host">> => VHost, <<"type">> => Type}. -% private - macros() -> io:format( "-define(is_~ts(Q), is_record(Q, amqqueue, ~b)).~n~n", diff --git a/deps/rabbit/src/rabbit_amqp_session.erl b/deps/rabbit/src/rabbit_amqp_session.erl index cd29844725f0..398d8832e9c3 100644 --- a/deps/rabbit/src/rabbit_amqp_session.erl +++ b/deps/rabbit/src/rabbit_amqp_session.erl @@ -2528,7 +2528,7 @@ incoming_link_transfer( QNames = rabbit_exchange:route(X, Mc2, #{return_binding_keys => true}), rabbit_trace:tap_in(Mc2, QNames, ConnName, ChannelNum, Username, Trace), Opts = #{correlation => {HandleInt, DeliveryId}}, - Qs0 = rabbit_amqqueue:lookup_many(QNames), + Qs0 = rabbit_db_queue:get_targets(QNames), Qs = rabbit_amqqueue:prepend_extra_bcc(Qs0), Mc = ensure_mc_cluster_compat(Mc2), case rabbit_queue_type:deliver(Qs, Mc, Opts, QStates0) of @@ -2674,8 +2674,8 @@ process_routing_confirm([], _SenderSettles = false, DeliveryId, U) -> Disposition = released(DeliveryId), {U, [Disposition]}; process_routing_confirm([_|_] = Qs, SenderSettles, DeliveryId, U0) -> - QNames = rabbit_amqqueue:queue_names(Qs), false = maps:is_key(DeliveryId, U0), + QNames = rabbit_amqqueue:queue_names(Qs), Map = maps:from_keys(QNames, ok), U = U0#{DeliveryId => {Map, SenderSettles, false}}, rabbit_global_counters:messages_routed(?PROTOCOL, map_size(Map)), diff --git a/deps/rabbit/src/rabbit_amqqueue.erl b/deps/rabbit/src/rabbit_amqqueue.erl index 3bb8f53f3bff..36f6b63966df 100644 --- a/deps/rabbit/src/rabbit_amqqueue.erl +++ b/deps/rabbit/src/rabbit_amqqueue.erl @@ -11,7 +11,7 @@ delete_immediately/1, delete_exclusive/2, delete/4, purge/1, forget_all_durable/1]). -export([pseudo_queue/2, pseudo_queue/3]). --export([exists/1, lookup/1, lookup/2, lookup_many/1, lookup_durable_queue/1, +-export([exists/1, lookup/1, lookup/2, lookup_durable_queue/1, not_found_or_absent_dirty/1, with/2, with/3, with_or_die/2, assert_equivalence/5, @@ -367,14 +367,6 @@ lookup(Name) when is_record(Name, resource) -> lookup_durable_queue(QName) -> rabbit_db_queue:get_durable(QName). --spec lookup_many(rabbit_exchange:route_return()) -> - [amqqueue:amqqueue() | {amqqueue:amqqueue(), route_infos()}]. -lookup_many([]) -> - %% optimisation - []; -lookup_many(Names) when is_list(Names) -> - rabbit_db_queue:get_many(Names). - -spec lookup(binary(), binary()) -> rabbit_types:ok(amqqueue:amqqueue()) | rabbit_types:error('not_found'). @@ -2051,68 +2043,57 @@ get_quorum_nodes(Q) -> end. -spec prepend_extra_bcc(Qs) -> - Qs when Qs :: [amqqueue:amqqueue() | - {amqqueue:amqqueue(), route_infos()}]. + Qs when Qs :: [amqqueue:target() | {amqqueue:target(), route_infos()}]. prepend_extra_bcc([]) -> []; prepend_extra_bcc([Q0] = Qs) -> Q = queue(Q0), - case amqqueue:get_options(Q) of - #{extra_bcc := BCCName} -> - case get_bcc_queue(Q, BCCName) of - {ok, BCCQueue} -> - [BCCQueue | Qs]; - {error, not_found} -> - Qs - end; - _ -> - Qs + case amqqueue:get_extra_bcc(Q) of + none -> + Qs; + Name -> + lookup_extra_bcc(Q, Name) ++ Qs end; prepend_extra_bcc(Qs) -> - BCCQueues = - lists:filtermap( - fun(Q0) -> - Q = queue(Q0), - case amqqueue:get_options(Q) of - #{extra_bcc := BCCName} -> - case get_bcc_queue(Q, BCCName) of - {ok, BCCQ} -> - {true, BCCQ}; - {error, not_found} -> - false - end; - _ -> - false - end - end, Qs), - lists:usort(BCCQueues) ++ Qs. + ExtraQs = lists:filtermap( + fun(Q0) -> + Q = queue(Q0), + case amqqueue:get_extra_bcc(Q) of + none -> + false; + Name -> + case lookup_extra_bcc(Q, Name) of + [ExtraQ] -> + {true, ExtraQ}; + [] -> + false + end + end + end, Qs), + lists:usort(ExtraQs) ++ Qs. -spec queue(Q | {Q, route_infos()}) -> - Q when Q :: amqqueue:amqqueue(). -queue(Q) - when ?is_amqqueue(Q) -> + Q when Q :: amqqueue:target(). +queue({Q, RouteInfos}) when is_map(RouteInfos) -> Q; -queue({Q, RouteInfos}) - when ?is_amqqueue(Q) andalso is_map(RouteInfos) -> +queue(Q) -> Q. -spec queue_names([Q | {Q, route_infos()}]) -> - [name()] when Q :: amqqueue:amqqueue(). -queue_names(Queues) - when is_list(Queues) -> - lists:map(fun(Q) when ?is_amqqueue(Q) -> + [name()] when Q :: amqqueue:target(). +queue_names(Queues) -> + lists:map(fun({Q, RouteInfos}) when is_map(RouteInfos) -> amqqueue:get_name(Q); - ({Q, RouteInfos}) - when ?is_amqqueue(Q) andalso is_map(RouteInfos) -> + (Q) -> amqqueue:get_name(Q) end, Queues). --spec get_bcc_queue(amqqueue:amqqueue(), binary()) -> - {ok, amqqueue:amqqueue()} | {error, not_found}. -get_bcc_queue(Q, BCCName) -> +-spec lookup_extra_bcc(amqqueue:target(), binary()) -> + [amqqueue:target()]. +lookup_extra_bcc(Q, BCCName) -> #resource{virtual_host = VHost} = amqqueue:get_name(Q), BCCQueueName = rabbit_misc:r(VHost, queue, BCCName), - lookup(BCCQueueName). + rabbit_db_queue:get_targets([BCCQueueName]). is_queue_args_combination_permitted(Q) -> Durable = amqqueue:is_durable(Q), diff --git a/deps/rabbit/src/rabbit_channel.erl b/deps/rabbit/src/rabbit_channel.erl index 6e63cda0b74b..3c7c865fcb00 100644 --- a/deps/rabbit/src/rabbit_channel.erl +++ b/deps/rabbit/src/rabbit_channel.erl @@ -1222,7 +1222,7 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin, check_user_id_header(Message0, User), Message = rabbit_msg_interceptor:intercept_incoming(Message0, MsgIcptCtx), QNames = rabbit_exchange:route(Exchange, Message, #{return_binding_keys => true}), - Queues = rabbit_amqqueue:lookup_many(QNames), + Queues = rabbit_db_queue:get_targets(QNames), rabbit_trace:tap_in(Message, QNames, ConnName, ChannelNum, Username, TraceState), %% TODO: call delivery_to_queues with plain args @@ -2126,7 +2126,12 @@ deliver_to_queues(XName, rabbit_misc:protocol_error( resource_error, "Stream coordinator unavailable for ~ts", - [rabbit_misc:rs(Resource)]) + [rabbit_misc:rs(Resource)]); + {error, Reason} -> + rabbit_misc:protocol_error( + resource_error, + "failed to deliver message: ~tp", + [Reason]) end. process_routing_mandatory(_Mandatory = true, diff --git a/deps/rabbit/src/rabbit_classic_queue.erl b/deps/rabbit/src/rabbit_classic_queue.erl index 0311dd8741a4..3ecec12fe89c 100644 --- a/deps/rabbit/src/rabbit_classic_queue.erl +++ b/deps/rabbit/src/rabbit_classic_queue.erl @@ -295,8 +295,8 @@ init(Q) when ?amqqueue_is_classic(Q) -> close(_State) -> ok. --spec update(amqqueue:amqqueue(), state()) -> state(). -update(Q, #?STATE{pid = Pid} = State) when ?amqqueue_is_classic(Q) -> +-spec update(amqqueue:amqqueue() | amqqueue:target(), state()) -> state(). +update(Q, #?STATE{pid = Pid} = State) -> case amqqueue:get_pid(Q) of Pid -> State; @@ -473,10 +473,10 @@ settlement_action(Type, QRef, MsgSeqs, Acc) -> supports_stateful_delivery() -> true. --spec deliver([{amqqueue:amqqueue(), state()}], +-spec deliver([{amqqueue:target(), state()}], Delivery :: mc:state(), rabbit_queue_type:delivery_options()) -> - {[{amqqueue:amqqueue(), state()}], rabbit_queue_type:actions()}. + {[{amqqueue:target(), state()}], rabbit_queue_type:actions()}. deliver(Qs0, Msg0, Options) -> %% add guid to content here instead of in rabbit_basic:message/3, %% as classic queues are the only ones that need it diff --git a/deps/rabbit/src/rabbit_db_queue.erl b/deps/rabbit/src/rabbit_db_queue.erl index 4cc90e39ac4d..ed4ee2274379 100644 --- a/deps/rabbit/src/rabbit_db_queue.erl +++ b/deps/rabbit/src/rabbit_db_queue.erl @@ -18,7 +18,7 @@ -export([ get/1, - get_many/1, + get_targets/1, get_all/0, get_all/1, get_all_by_type/1, @@ -85,6 +85,7 @@ -define(MNESIA_DURABLE_TABLE, rabbit_durable_queue). -define(KHEPRI_PROJECTION, rabbit_khepri_queue). +-define(KHEPRI_TARGET_PROJECTION, rabbit_khepri_queue_target). %% ------------------------------------------------------------------- %% get_all(). @@ -469,58 +470,62 @@ internal_delete_in_mnesia(QueueName, OnlyDurable, Reason) -> rabbit_db_binding:delete_for_destination_in_mnesia(QueueName, OnlyDurable). %% ------------------------------------------------------------------- -%% get_many(). +%% get_targets(). %% ------------------------------------------------------------------- --spec get_many(rabbit_exchange:route_return()) -> - [amqqueue:amqqueue() | {amqqueue:amqqueue(), rabbit_exchange:route_infos()}]. -get_many(Names) when is_list(Names) -> +-spec get_targets(rabbit_exchange:route_return()) -> + [amqqueue:target() | {amqqueue:target(), rabbit_exchange:route_infos()}]. +get_targets(Names) -> rabbit_khepri:handle_fallback( - #{mnesia => fun() -> get_many_in_ets(?MNESIA_TABLE, Names) end, - khepri => fun() -> get_many_in_khepri(Names) end + #{mnesia => fun() -> lookup_targets(mnesia, Names) end, + khepri => fun() -> lookup_targets(khepri, Names) end }). -get_many_in_khepri(Names) -> - try - get_many_in_ets(?KHEPRI_PROJECTION, Names) - catch - error:badarg -> - [] - end. - -get_many_in_ets(Table, [{Name, RouteInfos}]) - when is_map(RouteInfos) -> - case ets_lookup(Table, Name) of - [] -> []; - [Q] -> [{Q, RouteInfos}] - end; -get_many_in_ets(Table, [Name]) -> - ets_lookup(Table, Name); -get_many_in_ets(Table, Names) when is_list(Names) -> +lookup_targets(Store, Names) -> lists:filtermap(fun({Name, RouteInfos}) when is_map(RouteInfos) -> - case ets_lookup(Table, Name) of - [] -> false; - [Q] -> {true, {Q, RouteInfos}} + case lookup_target(Store, Name) of + not_found -> false; + Target -> {true, {Target, RouteInfos}} end; (Name) -> - case ets_lookup(Table, Name) of - [] -> false; - [Q] -> {true, Q} + case lookup_target(Store, Name) of + not_found -> false; + Target -> {true, Target} end end, Names). -ets_lookup(Table, QName = #resource{name = QNameBin}) -> - case rabbit_volatile_queue:is(QNameBin) of +lookup_target(Store, #resource{name = NameBin} = Name) -> + case rabbit_volatile_queue:is(NameBin) of true -> - %% This queue record is not stored in the database. - %% We create it on the fly. - case rabbit_volatile_queue:new(QName) of - error -> []; - Q -> [Q] + %% This queue is not stored in the database. We create it on the fly. + case rabbit_volatile_queue:new_target(Name) of + error -> not_found; + Target -> Target end; false -> - ets:lookup(Table, QName) + lookup_target0(Store, Name) + end. + +lookup_target0(khepri, Name) -> + try ets:lookup_element(?KHEPRI_TARGET_PROJECTION, Name, 2, not_found) of + not_found -> + not_found; + Target -> + amqqueue:new_target(Name, Target) + catch + error:badarg -> + not_found + end; +lookup_target0(mnesia, Name) -> + case ets:lookup(?MNESIA_TABLE, Name) of + [] -> + not_found; + [Q] -> + Type = amqqueue:get_type(Q), + Pid = amqqueue:get_pid(Q), + ExtraBcc = amqqueue:get_extra_bcc(Q), + amqqueue:new_target(Name, {Type, Pid, ExtraBcc}) end. %% ------------------------------------------------------------------- @@ -610,6 +615,14 @@ get_many_durable_in_khepri(Names) -> [] end. +get_many_in_ets(Table, Names) -> + lists:filtermap(fun(Name) -> + case ets:lookup(Table, Name) of + [] -> false; + [Q] -> {true, Q} + end + end, Names). + %% ------------------------------------------------------------------- %% update(). %% ------------------------------------------------------------------- diff --git a/deps/rabbit/src/rabbit_dead_letter.erl b/deps/rabbit/src/rabbit_dead_letter.erl index 172bc0bc9306..1b7d1da232f1 100644 --- a/deps/rabbit/src/rabbit_dead_letter.erl +++ b/deps/rabbit/src/rabbit_dead_letter.erl @@ -44,7 +44,7 @@ publish(Msg0, Reason, #exchange{name = XName} = DLX, RK, Routed0 = rabbit_exchange:route(DLX, DLMsg, #{return_binding_keys => true}), {Cycles, Routed} = detect_cycles(Reason, DLMsg, Routed0), lists:foreach(fun log_cycle_once/1, Cycles), - Qs0 = rabbit_amqqueue:lookup_many(Routed), + Qs0 = rabbit_db_queue:get_targets(Routed), Qs = rabbit_amqqueue:prepend_extra_bcc(Qs0), _ = rabbit_queue_type:deliver(Qs, DLMsg, #{}, stateless), ok. diff --git a/deps/rabbit/src/rabbit_fifo_dlx_worker.erl b/deps/rabbit/src/rabbit_fifo_dlx_worker.erl index 4084793846ab..a6fc3ce48318 100644 --- a/deps/rabbit/src/rabbit_fifo_dlx_worker.erl +++ b/deps/rabbit/src/rabbit_fifo_dlx_worker.erl @@ -342,7 +342,7 @@ forward(ConsumedMsg, ConsumedMsgId, ConsumedQRef, DLX, Reason, {Cycles, RouteToQs1} = rabbit_dead_letter:detect_cycles( Reason, Msg, RouteToQs0), State1 = log_cycles(Cycles, [RKey], State0), - RouteToQs2 = rabbit_amqqueue:lookup_many(RouteToQs1), + RouteToQs2 = rabbit_db_queue:get_targets(RouteToQs1), RouteToQs = rabbit_amqqueue:prepend_extra_bcc(RouteToQs2), State2 = case RouteToQs of [] -> @@ -496,7 +496,7 @@ redeliver0(#pending{delivery = Msg0, %% queues that do not exist. Therefore, filter out non-existent target queues. RouteToQs0 = queue_names( rabbit_amqqueue:prepend_extra_bcc( - rabbit_amqqueue:lookup_many( + rabbit_db_queue:get_targets( rabbit_exchange:route(DLX, Msg)))), case {RouteToQs0, Settled} of {[], [_|_]} -> @@ -529,7 +529,10 @@ redeliver0(#pending{delivery = Msg0, rejected = []}, State = State0#state{pendings = maps:update(OutSeq, Pend, Pendings)}, Options = #{correlation => OutSeq}, - deliver_to_queues(Msg, Options, rabbit_amqqueue:lookup_many(RouteToQs), State) + deliver_to_queues(Msg, + Options, + rabbit_db_queue:get_targets(RouteToQs), + State) end end. @@ -569,8 +572,7 @@ cancel_timer(#state{timer = TRef} = State) cancel_timer(State) -> State. -queue_names(Qs) - when is_list(Qs) -> +queue_names(Qs) -> lists:map(fun amqqueue:get_name/1, Qs). format_status(#{state := #state{ diff --git a/deps/rabbit/src/rabbit_khepri.erl b/deps/rabbit/src/rabbit_khepri.erl index 9357f2318018..c7423b2731ee 100644 --- a/deps/rabbit/src/rabbit_khepri.erl +++ b/deps/rabbit/src/rabbit_khepri.erl @@ -1310,6 +1310,7 @@ delete_or_fail(Path) -> register_projections() -> RegFuns = [fun register_rabbit_exchange_projection/0, fun register_rabbit_queue_projection/0, + fun register_rabbit_queue_target_projection/0, fun register_rabbit_vhost_projection/0, fun register_rabbit_users_projection/0, fun register_rabbit_global_runtime_parameters_projection/0, @@ -1351,7 +1352,25 @@ register_rabbit_queue_projection() -> _VHost = ?KHEPRI_WILDCARD_STAR, _Name = ?KHEPRI_WILDCARD_STAR), KeyPos = 2, %% #amqqueue.name - register_simple_projection(Name, PathPattern, KeyPos, true). + register_simple_projection(Name, PathPattern, KeyPos, false). + +%% This projection exists to avoid looking up the full amqqueue record +%% per message delivered to a target queue. +register_rabbit_queue_target_projection() -> + PathPattern = rabbit_db_queue:khepri_queue_path( + _VHost = ?KHEPRI_WILDCARD_STAR, + _Name = ?KHEPRI_WILDCARD_STAR), + Fun = fun(_Path, Q) -> + Name = amqqueue:get_name(Q), + Type = amqqueue:get_type(Q), + Pid = amqqueue:get_pid(Q), + ExtraBcc = amqqueue:get_extra_bcc(Q), + {Name, {Type, Pid, ExtraBcc}} + end, + Opts = #{keypos => 1, + read_concurrency => true}, + Projection = khepri_projection:new(rabbit_khepri_queue_target, Fun, Opts), + khepri:register_projection(?STORE_ID, PathPattern, Projection). register_rabbit_vhost_projection() -> Name = rabbit_khepri_vhost, diff --git a/deps/rabbit/src/rabbit_queue_type.erl b/deps/rabbit/src/rabbit_queue_type.erl index 028dca23f608..953c34731d79 100644 --- a/deps/rabbit/src/rabbit_queue_type.erl +++ b/deps/rabbit/src/rabbit_queue_type.erl @@ -208,8 +208,9 @@ {ok, queue_state()} | {error, Reason :: term()}. -callback close(queue_state()) -> ok. -%% update the queue type state from amqqrecord --callback update(amqqueue:amqqueue(), queue_state()) -> queue_state(). + +-callback update(amqqueue:amqqueue() | amqqueue:target(), queue_state()) -> + queue_state(). -callback consume(amqqueue:amqqueue(), consume_spec(), @@ -232,10 +233,10 @@ -callback supports_stateful_delivery() -> boolean(). --callback deliver([{amqqueue:amqqueue(), queue_state()}], +-callback deliver([{amqqueue:target(), queue_state()}], Message :: mc:state(), Options :: delivery_options()) -> - {[{amqqueue:amqqueue(), queue_state()}], actions()}. + {[{amqqueue:target(), queue_state()}], actions()}. -callback settle(queue_name(), settle_op(), rabbit_types:ctag(), [non_neg_integer()], queue_state()) -> @@ -619,14 +620,14 @@ publish_at_most_once(#resource{} = XName, Msg) -> Err end; publish_at_most_once(X, Msg) - when element(1, X) == exchange -> % hacky but good enough + when element(1, X) == exchange -> % hacky but good enough QNames = rabbit_exchange:route(X, Msg, #{return_binding_keys => true}), - Qs = rabbit_amqqueue:lookup_many(QNames), + Qs = rabbit_db_queue:get_targets(QNames), _ = deliver(Qs, Msg, #{}, stateless), ok. --spec deliver([amqqueue:amqqueue() | - {amqqueue:amqqueue(), rabbit_exchange:route_infos()}], +-spec deliver([amqqueue:target() | + {amqqueue:target(), rabbit_exchange:route_infos()}], Message :: mc:state(), delivery_options(), stateless | state()) -> @@ -688,14 +689,13 @@ deliver0(Qs, Message0, Options, #?STATE{} = State0) -> end, State0, Xs), {ok, State, Actions}. -queue_binding_keys(Q) - when ?is_amqqueue(Q) -> - {Q, #{}}; queue_binding_keys({Q, #{binding_keys := BindingKeys}}) - when ?is_amqqueue(Q) andalso is_map(BindingKeys) -> + when is_map(BindingKeys) -> {Q, BindingKeys}; -queue_binding_keys({Q, _RouteInfos}) - when ?is_amqqueue(Q) -> +queue_binding_keys({Q, RouteInfos}) + when is_map(RouteInfos) -> + {Q, #{}}; +queue_binding_keys(Q) -> {Q, #{}}. add_binding_keys(Message, BindingKeys) @@ -775,9 +775,15 @@ removed_from_rabbit_registry(_Type) -> ok. get_ctx(QOrQref, State) -> get_ctx_with(QOrQref, State, undefined). -get_ctx_with(Q, #?STATE{ctxs = Contexts}, InitState) - when ?is_amqqueue(Q) -> - Ref = qref(Q), +get_ctx_with(#resource{kind = queue} = QRef, Contexts, undefined) -> + case get_ctx(QRef, Contexts, undefined) of + undefined -> + exit({queue_context_not_found, QRef}); + Ctx -> + Ctx + end; +get_ctx_with(Q, #?STATE{ctxs = Contexts}, InitState) -> + Ref = amqqueue:get_name(Q), case Contexts of #{Ref := #ctx{module = Mod, state = State} = Ctx} -> @@ -785,25 +791,20 @@ get_ctx_with(Q, #?STATE{ctxs = Contexts}, InitState) _ when InitState == undefined -> %% not found and no initial state passed - initialize new state Mod = amqqueue:get_type(Q), - case Mod:init(Q) of + maybe + {ok, Q1} ?= to_queue(Q), + {ok, QState} ?= Mod:init(Q1), + #ctx{module = Mod, + state = QState} + else {error, Reason} -> - exit({Reason, Ref}); - {ok, QState} -> - #ctx{module = Mod, - state = QState} + exit({Reason, Ref}) end; _ -> %% not found - initialize with supplied initial state Mod = amqqueue:get_type(Q), #ctx{module = Mod, state = InitState} - end; -get_ctx_with(#resource{kind = queue} = QRef, Contexts, undefined) -> - case get_ctx(QRef, Contexts, undefined) of - undefined -> - exit({queue_context_not_found, QRef}); - Ctx -> - Ctx end. get_ctx(QRef, #?STATE{ctxs = Contexts}, Default) -> @@ -817,9 +818,15 @@ set_ctx(QRef, Ctx, #?STATE{ctxs = Contexts} = State) -> qref(#resource{kind = queue} = QName) -> QName; -qref(Q) when ?is_amqqueue(Q) -> +qref(Q) -> amqqueue:get_name(Q). +to_queue(Q) when ?is_amqqueue(Q) -> + {ok, Q}; +to_queue(Target) -> + QName = amqqueue:get_name(Target), + rabbit_amqqueue:lookup(QName). + -spec known_queue_type_modules() -> [module()]. known_queue_type_modules() -> Registered = rabbit_registry:lookup_all(queue), diff --git a/deps/rabbit/src/rabbit_quorum_queue.erl b/deps/rabbit/src/rabbit_quorum_queue.erl index 3e8a1df80e2a..2e69f227841b 100644 --- a/deps/rabbit/src/rabbit_quorum_queue.erl +++ b/deps/rabbit/src/rabbit_quorum_queue.erl @@ -234,10 +234,10 @@ init(Q) when ?is_amqqueue(Q) -> close(State) -> rabbit_fifo_client:close(State). --spec update(amqqueue:amqqueue(), rabbit_fifo_client:state()) -> +-spec update(amqqueue:amqqueue() | amqqueue:target(), rabbit_fifo_client:state()) -> rabbit_fifo_client:state(). -update(Q, State) when ?amqqueue_is_quorum(Q) -> - %% QQ state maintains it's own updates +update(_Q, State) -> + %% QQ state maintains its own updates State. -spec handle_event(rabbit_amqqueue:name(), @@ -1140,7 +1140,6 @@ deliver(QSs, Msg0, Options) -> end end, {[], []}, QSs). - state_info(S) -> #{pending_raft_commands => rabbit_fifo_client:pending_size(S), cached_segments => rabbit_fifo_client:num_cached_segments(S)}. diff --git a/deps/rabbit/src/rabbit_stream_coordinator.erl b/deps/rabbit/src/rabbit_stream_coordinator.erl index 28837c048765..cb01bfadf6d0 100644 --- a/deps/rabbit/src/rabbit_stream_coordinator.erl +++ b/deps/rabbit/src/rabbit_stream_coordinator.erl @@ -169,20 +169,23 @@ restart_stream(QRes) -> {ok, node()} | {error, term()} | {timeout, term()}. -restart_stream(QRes, Options) - when element(1, QRes) == resource -> - restart_stream(hd(rabbit_amqqueue:lookup_many([QRes])), Options); restart_stream(Q, Options) - when ?is_amqqueue(Q) andalso - ?amqqueue_is_stream(Q) -> + when ?amqqueue_is_stream(Q) -> ?LOG_INFO("restarting stream ~s in vhost ~s with options ~p", - [maps:get(name, amqqueue:get_type_state(Q)), amqqueue:get_vhost(Q), Options]), + [maps:get(name, amqqueue:get_type_state(Q)), amqqueue:get_vhost(Q), Options]), #{name := StreamId} = amqqueue:get_type_state(Q), case process_command({restart_stream, StreamId, Options}) of {ok, {ok, LeaderPid}, _} -> {ok, node(LeaderPid)}; Err -> Err + end; +restart_stream(QRes, Options) -> + case rabbit_amqqueue:lookup(QRes) of + {ok, Q} -> + restart_stream(Q, Options); + Err -> + Err end. delete_stream(Q, ActingUser) diff --git a/deps/rabbit/src/rabbit_stream_queue.erl b/deps/rabbit/src/rabbit_stream_queue.erl index def01719f9df..63e19d88c111 100644 --- a/deps/rabbit/src/rabbit_stream_queue.erl +++ b/deps/rabbit/src/rabbit_stream_queue.erl @@ -1013,7 +1013,7 @@ init(Q) when ?is_amqqueue(Q) -> {error, stream_not_found}; {error, coordinator_unavailable} = E -> ?LOG_WARNING("Failed to start stream client ~tp: coordinator unavailable", - [rabbit_misc:rs(QName)]), + [rabbit_misc:rs(QName)]), E end. @@ -1024,8 +1024,7 @@ close(#stream_client{readers = Readers, rabbit_core_metrics:consumer_deleted(self(), CTag, QName) end, Readers). -update(Q, State) - when ?is_amqqueue(Q) -> +update(_Q, State) -> State. update_leader_pid(Pid, #stream_client{leader = Pid} = State) -> diff --git a/deps/rabbit/src/rabbit_volatile_queue.erl b/deps/rabbit/src/rabbit_volatile_queue.erl index 487422ca721c..d8886de6c36a 100644 --- a/deps/rabbit/src/rabbit_volatile_queue.erl +++ b/deps/rabbit/src/rabbit_volatile_queue.erl @@ -16,6 +16,7 @@ -include_lib("rabbit_common/include/rabbit.hrl"). -export([new/1, + new_target/1, new_name/0, is/1, key_from_name/1, @@ -94,6 +95,16 @@ new(#resource{virtual_host = Vhost, new0(Name, Pid, Vhost) -> amqqueue:new(Name, Pid, false, true, none, [], Vhost, #{}, ?MODULE). +-spec new_target(rabbit_amqqueue:name()) -> + amqqueue:target() | error. +new_target(#resource{name = NameBin} = Name) -> + case pid_from_name(NameBin) of + {ok, Pid} when is_pid(Pid) -> + amqqueue:new_target(Name, {?MODULE, Pid, none}); + _ -> + error + end. + -spec is(rabbit_misc:resource_name()) -> boolean(). is(<>) -> diff --git a/deps/rabbit/test/rabbit_db_queue_SUITE.erl b/deps/rabbit/test/rabbit_db_queue_SUITE.erl index c80b1fcfba8f..9ee433524869 100644 --- a/deps/rabbit/test/rabbit_db_queue_SUITE.erl +++ b/deps/rabbit/test/rabbit_db_queue_SUITE.erl @@ -31,7 +31,7 @@ all_tests() -> [ create_or_get, get, - get_many, + get_targets, get_all, get_all_by_vhost, get_all_by_type, @@ -131,20 +131,24 @@ get1(_Config) -> rabbit_db_queue:get(rabbit_misc:r(?VHOST, queue, <<"test-queue2">>))), passed. -get_many(Config) -> - passed = rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, get_many1, [Config]). +get_targets(Config) -> + passed = rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, get_targets1, [Config]). -get_many1(_Config) -> +get_targets1(_Config) -> QName = rabbit_misc:r(?VHOST, queue, <<"test-queue">>), QName2 = rabbit_misc:r(?VHOST, queue, <<"test-queue2">>), Q = new_queue(QName, rabbit_classic_queue), Q2 = new_queue(QName2, rabbit_classic_queue), ok = rabbit_db_queue:set(Q), - ?assertEqual([Q], rabbit_db_queue:get_many([QName])), - ?assertEqual([Q], rabbit_db_queue:get_many([QName, QName2])), - ?assertEqual([], rabbit_db_queue:get_many([QName2])), + Target = {rabbit_classic_queue, none, none}, + QTarget = amqqueue:new_target(QName, Target), + QTarget2 = amqqueue:new_target(QName2, Target), + ?assertEqual([QTarget], rabbit_db_queue:get_targets([QName])), + ?assertEqual([QTarget], rabbit_db_queue:get_targets([QName, QName2])), + ?assertEqual([], rabbit_db_queue:get_targets([QName2])), ok = rabbit_db_queue:set(Q2), - ?assertEqual(lists:sort([Q, Q2]), lists:sort(rabbit_db_queue:get_many([QName, QName2]))), + ?assertEqual(lists:sort([QTarget, QTarget2]), + lists:sort(rabbit_db_queue:get_targets([QName, QName2]))), passed. get_all(Config) -> diff --git a/deps/rabbitmq_management/test/clustering_SUITE.erl b/deps/rabbitmq_management/test/clustering_SUITE.erl index 6e6178dc9ad7..aece4220033e 100644 --- a/deps/rabbitmq_management/test/clustering_SUITE.erl +++ b/deps/rabbitmq_management/test/clustering_SUITE.erl @@ -214,7 +214,7 @@ queue_on_other_node(Config) -> Conn = rabbit_ct_client_helpers:open_unmanaged_connection(Config, 1), {ok, Chan} = amqp_connection:open_channel(Conn), _ = queue_declare(Chan, <<"some-queue">>), - _ = wait_for_queue(Config, "/queues/%2F/some-queue"), + eventually(?_assertEqual(1, length(http_get(Config, "/queues/%2F"))), 2000, 5), {ok, Chan2} = amqp_connection:open_channel(?config(conn, Config)), consume(Chan2, <<"some-queue">>), @@ -899,7 +899,6 @@ wait_for_queue(Config, Path, Keys) -> wait_for_queue(_Config, Path, Keys, 0) -> exit({timeout, {Path, Keys}}); - wait_for_queue(Config, Path, Keys, Count) -> Res = http_get(Config, Path), case present(Keys, Res) of diff --git a/deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl b/deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl index e8ac5e4232bb..48ab2a86977d 100644 --- a/deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl +++ b/deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl @@ -1697,7 +1697,7 @@ deliver_to_queues(Message, RoutedToQNames, State0 = #state{queue_states = QStates0, cfg = #cfg{proto_ver = ProtoVer}}) -> - Qs0 = rabbit_amqqueue:lookup_many(RoutedToQNames), + Qs0 = rabbit_db_queue:get_targets(RoutedToQNames), Qs = rabbit_amqqueue:prepend_extra_bcc(Qs0), case rabbit_queue_type:deliver(Qs, Message, Options, QStates0) of {ok, QStates, Actions} -> diff --git a/deps/rabbitmq_mqtt/src/rabbit_mqtt_qos0_queue.erl b/deps/rabbitmq_mqtt/src/rabbit_mqtt_qos0_queue.erl index 21b9cfe74081..c596e6dd660f 100644 --- a/deps/rabbitmq_mqtt/src/rabbit_mqtt_qos0_queue.erl +++ b/deps/rabbitmq_mqtt/src/rabbit_mqtt_qos0_queue.erl @@ -133,7 +133,7 @@ delete(Q, _IfUnused, _IfEmpty, ActingUser) -> supports_stateful_delivery() -> false. --spec deliver([{amqqueue:amqqueue(), stateless}], +-spec deliver([{amqqueue:target(), stateless}], Msg :: mc:state(), rabbit_queue_type:delivery_options()) -> {[], rabbit_queue_type:actions()}. diff --git a/deps/rabbitmq_recent_history_exchange/src/rabbit_exchange_type_recent_history.erl b/deps/rabbitmq_recent_history_exchange/src/rabbit_exchange_type_recent_history.erl index 552dfd2730fa..840990690ba4 100644 --- a/deps/rabbitmq_recent_history_exchange/src/rabbit_exchange_type_recent_history.erl +++ b/deps/rabbitmq_recent_history_exchange/src/rabbit_exchange_type_recent_history.erl @@ -91,12 +91,12 @@ add_binding(_Tx, #exchange{ name = XName }, {ok, X} -> Msgs = get_msgs_from_cache(XName), [begin - Qs = rabbit_exchange:route(X, Msg), - case rabbit_amqqueue:lookup_many(Qs) of + QNames = rabbit_exchange:route(X, Msg), + case rabbit_db_queue:get_targets(QNames) of [] -> - destination_not_found_error(Qs); - QPids -> - deliver_messages(QPids, [Msg]) + destination_not_found_error(QNames); + Qs -> + deliver_messages(Qs, [Msg]) end end || Msg <- Msgs] end, diff --git a/deps/rabbitmq_shovel/src/rabbit_local_shovel.erl b/deps/rabbitmq_shovel/src/rabbit_local_shovel.erl index ac491cb5fbd1..924e20b235b9 100644 --- a/deps/rabbitmq_shovel/src/rabbit_local_shovel.erl +++ b/deps/rabbitmq_shovel/src/rabbit_local_shovel.erl @@ -382,7 +382,7 @@ forward(Tag, Msg0, #{dest := #{current := #{queue_states := QState} = Current} = end, Msg = set_annotations(Msg0, Dest), RoutedQNames = route(Msg, Dest), - Queues = rabbit_amqqueue:lookup_many(RoutedQNames), + Queues = rabbit_db_queue:get_targets(RoutedQNames), messages_received(AckMode), case rabbit_queue_type:deliver(Queues, Msg, Options, QState) of {ok, QState1, Actions} ->