diff --git a/deps/rabbit/src/amqqueue.erl b/deps/rabbit/src/amqqueue.erl index 8bf5a2345f19..3c958d90fea0 100644 --- a/deps/rabbit/src/amqqueue.erl +++ b/deps/rabbit/src/amqqueue.erl @@ -14,6 +14,7 @@ new/9, new_with_version/9, new_with_version/10, + new_target/2, fields/0, fields/1, field_vhost/0, @@ -39,6 +40,7 @@ % options get_options/1, set_options/2, + get_extra_bcc/1, % pid get_pid/1, set_pid/2, @@ -77,13 +79,15 @@ qnode/1, to_printable/1, to_printable/2, - macros/0]). + macros/0 + ]). -define(record_version, amqqueue_v2). -define(is_backwards_compat_classic(T), (T =:= classic orelse T =:= ?amqqueue_v1_type)). -type amqqueue_options() :: map() | ets:match_pattern(). +-type extra_bcc() :: rabbit_misc:resource_name() | none. -record(amqqueue, { %% immutable @@ -120,6 +124,17 @@ type_state = #{} :: map() | ets:match_pattern() }). +%% A subset of the amqqueue record containing just the necessary fields +%% to deliver a message to a target queue. +-record(queue_target, + {name :: rabbit_amqqueue:name(), + target :: {rabbit_queue_type:queue_type(), + pid() | ra_server_id() | none, + extra_bcc()} + }). + +-opaque target() :: #queue_target{}. + -type amqqueue() :: amqqueue_v2(). -type amqqueue_v2() :: #amqqueue{ name :: rabbit_amqqueue:name(), @@ -175,6 +190,7 @@ amqqueue_v2/0, amqqueue_pattern/0, amqqueue_v2_pattern/0, + target/0, ra_server_id/0]). -spec new(rabbit_amqqueue:name(), @@ -328,6 +344,15 @@ new_with_version(?record_version, options = Options, type = ensure_type_compat(Type)}. +-spec new_target(rabbit_amqqueue:name(), + {rabbit_queue_type:queue_type(), + pid() | ra_server_id() | none, + extra_bcc()}) -> + target(). +new_target(Name, Target) when tuple_size(Target) =:= 3 -> + #queue_target{name = Name, + target = Target}. + -spec is_amqqueue(any()) -> boolean(). is_amqqueue(#amqqueue{}) -> true. @@ -361,15 +386,21 @@ set_arguments(#amqqueue{} = Queue, Args) -> % options -spec get_options(amqqueue()) -> amqqueue_options(). - get_options(#amqqueue{options = Options}) -> Options. -spec set_options(amqqueue(), amqqueue_options()) -> amqqueue(). - set_options(#amqqueue{} = Queue, Options) -> Queue#amqqueue{options = Options}. +-spec get_extra_bcc(amqqueue() | target()) -> + extra_bcc(). +get_extra_bcc(#amqqueue{options = #{extra_bcc := ExtraBcc}}) -> + ExtraBcc; +get_extra_bcc(#amqqueue{}) -> + none; +get_extra_bcc(#queue_target{target = {_Type, _Pid, ExtraBcc}}) -> + ExtraBcc. % decorators @@ -418,9 +449,10 @@ set_operator_policy(#amqqueue{} = Queue, Policy) -> % name --spec get_name(amqqueue()) -> rabbit_amqqueue:name(). +-spec get_name(amqqueue() | target()) -> rabbit_amqqueue:name(). -get_name(#amqqueue{name = Name}) -> Name. +get_name(#amqqueue{name = Name}) -> Name; +get_name(#queue_target{name = Name}) -> Name. -spec set_name(amqqueue(), rabbit_amqqueue:name()) -> amqqueue(). @@ -429,9 +461,10 @@ set_name(#amqqueue{} = Queue, Name) -> % pid --spec get_pid(amqqueue_v2()) -> pid() | ra_server_id() | none. +-spec get_pid(amqqueue_v2() | target()) -> pid() | ra_server_id() | none. -get_pid(#amqqueue{pid = Pid}) -> Pid. +get_pid(#amqqueue{pid = Pid}) -> Pid; +get_pid(#queue_target{target = {_Type, Pid, _ExtraBcc}}) -> Pid. -spec set_pid(amqqueue_v2(), pid() | ra_server_id() | none) -> amqqueue_v2(). @@ -488,9 +521,10 @@ set_state(#amqqueue{} = Queue, State) -> %% New in v2. --spec get_type(amqqueue()) -> atom(). +-spec get_type(amqqueue() | target()) -> atom(). -get_type(#amqqueue{type = Type}) -> Type. +get_type(#amqqueue{type = Type}) -> Type; +get_type(#queue_target{target = {Type, _Pid, _ExtraBcc}}) -> Type. -spec get_vhost(amqqueue()) -> rabbit_types:vhost() | undefined. @@ -629,8 +663,6 @@ to_printable(QName = #resource{name = Name, virtual_host = VHost}, Type) -> <<"virtual_host">> => VHost, <<"type">> => Type}. -% private - macros() -> io:format( "-define(is_~ts(Q), is_record(Q, amqqueue, ~b)).~n~n", diff --git a/deps/rabbit/src/rabbit_amqp_session.erl b/deps/rabbit/src/rabbit_amqp_session.erl index cd29844725f0..398d8832e9c3 100644 --- a/deps/rabbit/src/rabbit_amqp_session.erl +++ b/deps/rabbit/src/rabbit_amqp_session.erl @@ -2528,7 +2528,7 @@ incoming_link_transfer( QNames = rabbit_exchange:route(X, Mc2, #{return_binding_keys => true}), rabbit_trace:tap_in(Mc2, QNames, ConnName, ChannelNum, Username, Trace), Opts = #{correlation => {HandleInt, DeliveryId}}, - Qs0 = rabbit_amqqueue:lookup_many(QNames), + Qs0 = rabbit_db_queue:get_targets(QNames), Qs = rabbit_amqqueue:prepend_extra_bcc(Qs0), Mc = ensure_mc_cluster_compat(Mc2), case rabbit_queue_type:deliver(Qs, Mc, Opts, QStates0) of @@ -2674,8 +2674,8 @@ process_routing_confirm([], _SenderSettles = false, DeliveryId, U) -> Disposition = released(DeliveryId), {U, [Disposition]}; process_routing_confirm([_|_] = Qs, SenderSettles, DeliveryId, U0) -> - QNames = rabbit_amqqueue:queue_names(Qs), false = maps:is_key(DeliveryId, U0), + QNames = rabbit_amqqueue:queue_names(Qs), Map = maps:from_keys(QNames, ok), U = U0#{DeliveryId => {Map, SenderSettles, false}}, rabbit_global_counters:messages_routed(?PROTOCOL, map_size(Map)), diff --git a/deps/rabbit/src/rabbit_amqqueue.erl b/deps/rabbit/src/rabbit_amqqueue.erl index 3bb8f53f3bff..36f6b63966df 100644 --- a/deps/rabbit/src/rabbit_amqqueue.erl +++ b/deps/rabbit/src/rabbit_amqqueue.erl @@ -11,7 +11,7 @@ delete_immediately/1, delete_exclusive/2, delete/4, purge/1, forget_all_durable/1]). -export([pseudo_queue/2, pseudo_queue/3]). --export([exists/1, lookup/1, lookup/2, lookup_many/1, lookup_durable_queue/1, +-export([exists/1, lookup/1, lookup/2, lookup_durable_queue/1, not_found_or_absent_dirty/1, with/2, with/3, with_or_die/2, assert_equivalence/5, @@ -367,14 +367,6 @@ lookup(Name) when is_record(Name, resource) -> lookup_durable_queue(QName) -> rabbit_db_queue:get_durable(QName). --spec lookup_many(rabbit_exchange:route_return()) -> - [amqqueue:amqqueue() | {amqqueue:amqqueue(), route_infos()}]. -lookup_many([]) -> - %% optimisation - []; -lookup_many(Names) when is_list(Names) -> - rabbit_db_queue:get_many(Names). - -spec lookup(binary(), binary()) -> rabbit_types:ok(amqqueue:amqqueue()) | rabbit_types:error('not_found'). @@ -2051,68 +2043,57 @@ get_quorum_nodes(Q) -> end. -spec prepend_extra_bcc(Qs) -> - Qs when Qs :: [amqqueue:amqqueue() | - {amqqueue:amqqueue(), route_infos()}]. + Qs when Qs :: [amqqueue:target() | {amqqueue:target(), route_infos()}]. prepend_extra_bcc([]) -> []; prepend_extra_bcc([Q0] = Qs) -> Q = queue(Q0), - case amqqueue:get_options(Q) of - #{extra_bcc := BCCName} -> - case get_bcc_queue(Q, BCCName) of - {ok, BCCQueue} -> - [BCCQueue | Qs]; - {error, not_found} -> - Qs - end; - _ -> - Qs + case amqqueue:get_extra_bcc(Q) of + none -> + Qs; + Name -> + lookup_extra_bcc(Q, Name) ++ Qs end; prepend_extra_bcc(Qs) -> - BCCQueues = - lists:filtermap( - fun(Q0) -> - Q = queue(Q0), - case amqqueue:get_options(Q) of - #{extra_bcc := BCCName} -> - case get_bcc_queue(Q, BCCName) of - {ok, BCCQ} -> - {true, BCCQ}; - {error, not_found} -> - false - end; - _ -> - false - end - end, Qs), - lists:usort(BCCQueues) ++ Qs. + ExtraQs = lists:filtermap( + fun(Q0) -> + Q = queue(Q0), + case amqqueue:get_extra_bcc(Q) of + none -> + false; + Name -> + case lookup_extra_bcc(Q, Name) of + [ExtraQ] -> + {true, ExtraQ}; + [] -> + false + end + end + end, Qs), + lists:usort(ExtraQs) ++ Qs. -spec queue(Q | {Q, route_infos()}) -> - Q when Q :: amqqueue:amqqueue(). -queue(Q) - when ?is_amqqueue(Q) -> + Q when Q :: amqqueue:target(). +queue({Q, RouteInfos}) when is_map(RouteInfos) -> Q; -queue({Q, RouteInfos}) - when ?is_amqqueue(Q) andalso is_map(RouteInfos) -> +queue(Q) -> Q. -spec queue_names([Q | {Q, route_infos()}]) -> - [name()] when Q :: amqqueue:amqqueue(). -queue_names(Queues) - when is_list(Queues) -> - lists:map(fun(Q) when ?is_amqqueue(Q) -> + [name()] when Q :: amqqueue:target(). +queue_names(Queues) -> + lists:map(fun({Q, RouteInfos}) when is_map(RouteInfos) -> amqqueue:get_name(Q); - ({Q, RouteInfos}) - when ?is_amqqueue(Q) andalso is_map(RouteInfos) -> + (Q) -> amqqueue:get_name(Q) end, Queues). --spec get_bcc_queue(amqqueue:amqqueue(), binary()) -> - {ok, amqqueue:amqqueue()} | {error, not_found}. -get_bcc_queue(Q, BCCName) -> +-spec lookup_extra_bcc(amqqueue:target(), binary()) -> + [amqqueue:target()]. +lookup_extra_bcc(Q, BCCName) -> #resource{virtual_host = VHost} = amqqueue:get_name(Q), BCCQueueName = rabbit_misc:r(VHost, queue, BCCName), - lookup(BCCQueueName). + rabbit_db_queue:get_targets([BCCQueueName]). is_queue_args_combination_permitted(Q) -> Durable = amqqueue:is_durable(Q), diff --git a/deps/rabbit/src/rabbit_channel.erl b/deps/rabbit/src/rabbit_channel.erl index 6e63cda0b74b..3c7c865fcb00 100644 --- a/deps/rabbit/src/rabbit_channel.erl +++ b/deps/rabbit/src/rabbit_channel.erl @@ -1222,7 +1222,7 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin, check_user_id_header(Message0, User), Message = rabbit_msg_interceptor:intercept_incoming(Message0, MsgIcptCtx), QNames = rabbit_exchange:route(Exchange, Message, #{return_binding_keys => true}), - Queues = rabbit_amqqueue:lookup_many(QNames), + Queues = rabbit_db_queue:get_targets(QNames), rabbit_trace:tap_in(Message, QNames, ConnName, ChannelNum, Username, TraceState), %% TODO: call delivery_to_queues with plain args @@ -2126,7 +2126,12 @@ deliver_to_queues(XName, rabbit_misc:protocol_error( resource_error, "Stream coordinator unavailable for ~ts", - [rabbit_misc:rs(Resource)]) + [rabbit_misc:rs(Resource)]); + {error, Reason} -> + rabbit_misc:protocol_error( + resource_error, + "failed to deliver message: ~tp", + [Reason]) end. process_routing_mandatory(_Mandatory = true, diff --git a/deps/rabbit/src/rabbit_classic_queue.erl b/deps/rabbit/src/rabbit_classic_queue.erl index 0311dd8741a4..3ecec12fe89c 100644 --- a/deps/rabbit/src/rabbit_classic_queue.erl +++ b/deps/rabbit/src/rabbit_classic_queue.erl @@ -295,8 +295,8 @@ init(Q) when ?amqqueue_is_classic(Q) -> close(_State) -> ok. --spec update(amqqueue:amqqueue(), state()) -> state(). -update(Q, #?STATE{pid = Pid} = State) when ?amqqueue_is_classic(Q) -> +-spec update(amqqueue:amqqueue() | amqqueue:target(), state()) -> state(). +update(Q, #?STATE{pid = Pid} = State) -> case amqqueue:get_pid(Q) of Pid -> State; @@ -473,10 +473,10 @@ settlement_action(Type, QRef, MsgSeqs, Acc) -> supports_stateful_delivery() -> true. --spec deliver([{amqqueue:amqqueue(), state()}], +-spec deliver([{amqqueue:target(), state()}], Delivery :: mc:state(), rabbit_queue_type:delivery_options()) -> - {[{amqqueue:amqqueue(), state()}], rabbit_queue_type:actions()}. + {[{amqqueue:target(), state()}], rabbit_queue_type:actions()}. deliver(Qs0, Msg0, Options) -> %% add guid to content here instead of in rabbit_basic:message/3, %% as classic queues are the only ones that need it diff --git a/deps/rabbit/src/rabbit_db_queue.erl b/deps/rabbit/src/rabbit_db_queue.erl index 4cc90e39ac4d..ed4ee2274379 100644 --- a/deps/rabbit/src/rabbit_db_queue.erl +++ b/deps/rabbit/src/rabbit_db_queue.erl @@ -18,7 +18,7 @@ -export([ get/1, - get_many/1, + get_targets/1, get_all/0, get_all/1, get_all_by_type/1, @@ -85,6 +85,7 @@ -define(MNESIA_DURABLE_TABLE, rabbit_durable_queue). -define(KHEPRI_PROJECTION, rabbit_khepri_queue). +-define(KHEPRI_TARGET_PROJECTION, rabbit_khepri_queue_target). %% ------------------------------------------------------------------- %% get_all(). @@ -469,58 +470,62 @@ internal_delete_in_mnesia(QueueName, OnlyDurable, Reason) -> rabbit_db_binding:delete_for_destination_in_mnesia(QueueName, OnlyDurable). %% ------------------------------------------------------------------- -%% get_many(). +%% get_targets(). %% ------------------------------------------------------------------- --spec get_many(rabbit_exchange:route_return()) -> - [amqqueue:amqqueue() | {amqqueue:amqqueue(), rabbit_exchange:route_infos()}]. -get_many(Names) when is_list(Names) -> +-spec get_targets(rabbit_exchange:route_return()) -> + [amqqueue:target() | {amqqueue:target(), rabbit_exchange:route_infos()}]. +get_targets(Names) -> rabbit_khepri:handle_fallback( - #{mnesia => fun() -> get_many_in_ets(?MNESIA_TABLE, Names) end, - khepri => fun() -> get_many_in_khepri(Names) end + #{mnesia => fun() -> lookup_targets(mnesia, Names) end, + khepri => fun() -> lookup_targets(khepri, Names) end }). -get_many_in_khepri(Names) -> - try - get_many_in_ets(?KHEPRI_PROJECTION, Names) - catch - error:badarg -> - [] - end. - -get_many_in_ets(Table, [{Name, RouteInfos}]) - when is_map(RouteInfos) -> - case ets_lookup(Table, Name) of - [] -> []; - [Q] -> [{Q, RouteInfos}] - end; -get_many_in_ets(Table, [Name]) -> - ets_lookup(Table, Name); -get_many_in_ets(Table, Names) when is_list(Names) -> +lookup_targets(Store, Names) -> lists:filtermap(fun({Name, RouteInfos}) when is_map(RouteInfos) -> - case ets_lookup(Table, Name) of - [] -> false; - [Q] -> {true, {Q, RouteInfos}} + case lookup_target(Store, Name) of + not_found -> false; + Target -> {true, {Target, RouteInfos}} end; (Name) -> - case ets_lookup(Table, Name) of - [] -> false; - [Q] -> {true, Q} + case lookup_target(Store, Name) of + not_found -> false; + Target -> {true, Target} end end, Names). -ets_lookup(Table, QName = #resource{name = QNameBin}) -> - case rabbit_volatile_queue:is(QNameBin) of +lookup_target(Store, #resource{name = NameBin} = Name) -> + case rabbit_volatile_queue:is(NameBin) of true -> - %% This queue record is not stored in the database. - %% We create it on the fly. - case rabbit_volatile_queue:new(QName) of - error -> []; - Q -> [Q] + %% This queue is not stored in the database. We create it on the fly. + case rabbit_volatile_queue:new_target(Name) of + error -> not_found; + Target -> Target end; false -> - ets:lookup(Table, QName) + lookup_target0(Store, Name) + end. + +lookup_target0(khepri, Name) -> + try ets:lookup_element(?KHEPRI_TARGET_PROJECTION, Name, 2, not_found) of + not_found -> + not_found; + Target -> + amqqueue:new_target(Name, Target) + catch + error:badarg -> + not_found + end; +lookup_target0(mnesia, Name) -> + case ets:lookup(?MNESIA_TABLE, Name) of + [] -> + not_found; + [Q] -> + Type = amqqueue:get_type(Q), + Pid = amqqueue:get_pid(Q), + ExtraBcc = amqqueue:get_extra_bcc(Q), + amqqueue:new_target(Name, {Type, Pid, ExtraBcc}) end. %% ------------------------------------------------------------------- @@ -610,6 +615,14 @@ get_many_durable_in_khepri(Names) -> [] end. +get_many_in_ets(Table, Names) -> + lists:filtermap(fun(Name) -> + case ets:lookup(Table, Name) of + [] -> false; + [Q] -> {true, Q} + end + end, Names). + %% ------------------------------------------------------------------- %% update(). %% ------------------------------------------------------------------- diff --git a/deps/rabbit/src/rabbit_dead_letter.erl b/deps/rabbit/src/rabbit_dead_letter.erl index 172bc0bc9306..1b7d1da232f1 100644 --- a/deps/rabbit/src/rabbit_dead_letter.erl +++ b/deps/rabbit/src/rabbit_dead_letter.erl @@ -44,7 +44,7 @@ publish(Msg0, Reason, #exchange{name = XName} = DLX, RK, Routed0 = rabbit_exchange:route(DLX, DLMsg, #{return_binding_keys => true}), {Cycles, Routed} = detect_cycles(Reason, DLMsg, Routed0), lists:foreach(fun log_cycle_once/1, Cycles), - Qs0 = rabbit_amqqueue:lookup_many(Routed), + Qs0 = rabbit_db_queue:get_targets(Routed), Qs = rabbit_amqqueue:prepend_extra_bcc(Qs0), _ = rabbit_queue_type:deliver(Qs, DLMsg, #{}, stateless), ok. diff --git a/deps/rabbit/src/rabbit_fifo_dlx_worker.erl b/deps/rabbit/src/rabbit_fifo_dlx_worker.erl index 4084793846ab..a6fc3ce48318 100644 --- a/deps/rabbit/src/rabbit_fifo_dlx_worker.erl +++ b/deps/rabbit/src/rabbit_fifo_dlx_worker.erl @@ -342,7 +342,7 @@ forward(ConsumedMsg, ConsumedMsgId, ConsumedQRef, DLX, Reason, {Cycles, RouteToQs1} = rabbit_dead_letter:detect_cycles( Reason, Msg, RouteToQs0), State1 = log_cycles(Cycles, [RKey], State0), - RouteToQs2 = rabbit_amqqueue:lookup_many(RouteToQs1), + RouteToQs2 = rabbit_db_queue:get_targets(RouteToQs1), RouteToQs = rabbit_amqqueue:prepend_extra_bcc(RouteToQs2), State2 = case RouteToQs of [] -> @@ -496,7 +496,7 @@ redeliver0(#pending{delivery = Msg0, %% queues that do not exist. Therefore, filter out non-existent target queues. RouteToQs0 = queue_names( rabbit_amqqueue:prepend_extra_bcc( - rabbit_amqqueue:lookup_many( + rabbit_db_queue:get_targets( rabbit_exchange:route(DLX, Msg)))), case {RouteToQs0, Settled} of {[], [_|_]} -> @@ -529,7 +529,10 @@ redeliver0(#pending{delivery = Msg0, rejected = []}, State = State0#state{pendings = maps:update(OutSeq, Pend, Pendings)}, Options = #{correlation => OutSeq}, - deliver_to_queues(Msg, Options, rabbit_amqqueue:lookup_many(RouteToQs), State) + deliver_to_queues(Msg, + Options, + rabbit_db_queue:get_targets(RouteToQs), + State) end end. @@ -569,8 +572,7 @@ cancel_timer(#state{timer = TRef} = State) cancel_timer(State) -> State. -queue_names(Qs) - when is_list(Qs) -> +queue_names(Qs) -> lists:map(fun amqqueue:get_name/1, Qs). format_status(#{state := #state{ diff --git a/deps/rabbit/src/rabbit_khepri.erl b/deps/rabbit/src/rabbit_khepri.erl index 9357f2318018..c7423b2731ee 100644 --- a/deps/rabbit/src/rabbit_khepri.erl +++ b/deps/rabbit/src/rabbit_khepri.erl @@ -1310,6 +1310,7 @@ delete_or_fail(Path) -> register_projections() -> RegFuns = [fun register_rabbit_exchange_projection/0, fun register_rabbit_queue_projection/0, + fun register_rabbit_queue_target_projection/0, fun register_rabbit_vhost_projection/0, fun register_rabbit_users_projection/0, fun register_rabbit_global_runtime_parameters_projection/0, @@ -1351,7 +1352,25 @@ register_rabbit_queue_projection() -> _VHost = ?KHEPRI_WILDCARD_STAR, _Name = ?KHEPRI_WILDCARD_STAR), KeyPos = 2, %% #amqqueue.name - register_simple_projection(Name, PathPattern, KeyPos, true). + register_simple_projection(Name, PathPattern, KeyPos, false). + +%% This projection exists to avoid looking up the full amqqueue record +%% per message delivered to a target queue. +register_rabbit_queue_target_projection() -> + PathPattern = rabbit_db_queue:khepri_queue_path( + _VHost = ?KHEPRI_WILDCARD_STAR, + _Name = ?KHEPRI_WILDCARD_STAR), + Fun = fun(_Path, Q) -> + Name = amqqueue:get_name(Q), + Type = amqqueue:get_type(Q), + Pid = amqqueue:get_pid(Q), + ExtraBcc = amqqueue:get_extra_bcc(Q), + {Name, {Type, Pid, ExtraBcc}} + end, + Opts = #{keypos => 1, + read_concurrency => true}, + Projection = khepri_projection:new(rabbit_khepri_queue_target, Fun, Opts), + khepri:register_projection(?STORE_ID, PathPattern, Projection). register_rabbit_vhost_projection() -> Name = rabbit_khepri_vhost, diff --git a/deps/rabbit/src/rabbit_queue_type.erl b/deps/rabbit/src/rabbit_queue_type.erl index 028dca23f608..953c34731d79 100644 --- a/deps/rabbit/src/rabbit_queue_type.erl +++ b/deps/rabbit/src/rabbit_queue_type.erl @@ -208,8 +208,9 @@ {ok, queue_state()} | {error, Reason :: term()}. -callback close(queue_state()) -> ok. -%% update the queue type state from amqqrecord --callback update(amqqueue:amqqueue(), queue_state()) -> queue_state(). + +-callback update(amqqueue:amqqueue() | amqqueue:target(), queue_state()) -> + queue_state(). -callback consume(amqqueue:amqqueue(), consume_spec(), @@ -232,10 +233,10 @@ -callback supports_stateful_delivery() -> boolean(). --callback deliver([{amqqueue:amqqueue(), queue_state()}], +-callback deliver([{amqqueue:target(), queue_state()}], Message :: mc:state(), Options :: delivery_options()) -> - {[{amqqueue:amqqueue(), queue_state()}], actions()}. + {[{amqqueue:target(), queue_state()}], actions()}. -callback settle(queue_name(), settle_op(), rabbit_types:ctag(), [non_neg_integer()], queue_state()) -> @@ -619,14 +620,14 @@ publish_at_most_once(#resource{} = XName, Msg) -> Err end; publish_at_most_once(X, Msg) - when element(1, X) == exchange -> % hacky but good enough + when element(1, X) == exchange -> % hacky but good enough QNames = rabbit_exchange:route(X, Msg, #{return_binding_keys => true}), - Qs = rabbit_amqqueue:lookup_many(QNames), + Qs = rabbit_db_queue:get_targets(QNames), _ = deliver(Qs, Msg, #{}, stateless), ok. --spec deliver([amqqueue:amqqueue() | - {amqqueue:amqqueue(), rabbit_exchange:route_infos()}], +-spec deliver([amqqueue:target() | + {amqqueue:target(), rabbit_exchange:route_infos()}], Message :: mc:state(), delivery_options(), stateless | state()) -> @@ -688,14 +689,13 @@ deliver0(Qs, Message0, Options, #?STATE{} = State0) -> end, State0, Xs), {ok, State, Actions}. -queue_binding_keys(Q) - when ?is_amqqueue(Q) -> - {Q, #{}}; queue_binding_keys({Q, #{binding_keys := BindingKeys}}) - when ?is_amqqueue(Q) andalso is_map(BindingKeys) -> + when is_map(BindingKeys) -> {Q, BindingKeys}; -queue_binding_keys({Q, _RouteInfos}) - when ?is_amqqueue(Q) -> +queue_binding_keys({Q, RouteInfos}) + when is_map(RouteInfos) -> + {Q, #{}}; +queue_binding_keys(Q) -> {Q, #{}}. add_binding_keys(Message, BindingKeys) @@ -775,9 +775,15 @@ removed_from_rabbit_registry(_Type) -> ok. get_ctx(QOrQref, State) -> get_ctx_with(QOrQref, State, undefined). -get_ctx_with(Q, #?STATE{ctxs = Contexts}, InitState) - when ?is_amqqueue(Q) -> - Ref = qref(Q), +get_ctx_with(#resource{kind = queue} = QRef, Contexts, undefined) -> + case get_ctx(QRef, Contexts, undefined) of + undefined -> + exit({queue_context_not_found, QRef}); + Ctx -> + Ctx + end; +get_ctx_with(Q, #?STATE{ctxs = Contexts}, InitState) -> + Ref = amqqueue:get_name(Q), case Contexts of #{Ref := #ctx{module = Mod, state = State} = Ctx} -> @@ -785,25 +791,20 @@ get_ctx_with(Q, #?STATE{ctxs = Contexts}, InitState) _ when InitState == undefined -> %% not found and no initial state passed - initialize new state Mod = amqqueue:get_type(Q), - case Mod:init(Q) of + maybe + {ok, Q1} ?= to_queue(Q), + {ok, QState} ?= Mod:init(Q1), + #ctx{module = Mod, + state = QState} + else {error, Reason} -> - exit({Reason, Ref}); - {ok, QState} -> - #ctx{module = Mod, - state = QState} + exit({Reason, Ref}) end; _ -> %% not found - initialize with supplied initial state Mod = amqqueue:get_type(Q), #ctx{module = Mod, state = InitState} - end; -get_ctx_with(#resource{kind = queue} = QRef, Contexts, undefined) -> - case get_ctx(QRef, Contexts, undefined) of - undefined -> - exit({queue_context_not_found, QRef}); - Ctx -> - Ctx end. get_ctx(QRef, #?STATE{ctxs = Contexts}, Default) -> @@ -817,9 +818,15 @@ set_ctx(QRef, Ctx, #?STATE{ctxs = Contexts} = State) -> qref(#resource{kind = queue} = QName) -> QName; -qref(Q) when ?is_amqqueue(Q) -> +qref(Q) -> amqqueue:get_name(Q). +to_queue(Q) when ?is_amqqueue(Q) -> + {ok, Q}; +to_queue(Target) -> + QName = amqqueue:get_name(Target), + rabbit_amqqueue:lookup(QName). + -spec known_queue_type_modules() -> [module()]. known_queue_type_modules() -> Registered = rabbit_registry:lookup_all(queue), diff --git a/deps/rabbit/src/rabbit_quorum_queue.erl b/deps/rabbit/src/rabbit_quorum_queue.erl index 3e8a1df80e2a..2e69f227841b 100644 --- a/deps/rabbit/src/rabbit_quorum_queue.erl +++ b/deps/rabbit/src/rabbit_quorum_queue.erl @@ -234,10 +234,10 @@ init(Q) when ?is_amqqueue(Q) -> close(State) -> rabbit_fifo_client:close(State). --spec update(amqqueue:amqqueue(), rabbit_fifo_client:state()) -> +-spec update(amqqueue:amqqueue() | amqqueue:target(), rabbit_fifo_client:state()) -> rabbit_fifo_client:state(). -update(Q, State) when ?amqqueue_is_quorum(Q) -> - %% QQ state maintains it's own updates +update(_Q, State) -> + %% QQ state maintains its own updates State. -spec handle_event(rabbit_amqqueue:name(), @@ -1140,7 +1140,6 @@ deliver(QSs, Msg0, Options) -> end end, {[], []}, QSs). - state_info(S) -> #{pending_raft_commands => rabbit_fifo_client:pending_size(S), cached_segments => rabbit_fifo_client:num_cached_segments(S)}. diff --git a/deps/rabbit/src/rabbit_stream_coordinator.erl b/deps/rabbit/src/rabbit_stream_coordinator.erl index 28837c048765..cb01bfadf6d0 100644 --- a/deps/rabbit/src/rabbit_stream_coordinator.erl +++ b/deps/rabbit/src/rabbit_stream_coordinator.erl @@ -169,20 +169,23 @@ restart_stream(QRes) -> {ok, node()} | {error, term()} | {timeout, term()}. -restart_stream(QRes, Options) - when element(1, QRes) == resource -> - restart_stream(hd(rabbit_amqqueue:lookup_many([QRes])), Options); restart_stream(Q, Options) - when ?is_amqqueue(Q) andalso - ?amqqueue_is_stream(Q) -> + when ?amqqueue_is_stream(Q) -> ?LOG_INFO("restarting stream ~s in vhost ~s with options ~p", - [maps:get(name, amqqueue:get_type_state(Q)), amqqueue:get_vhost(Q), Options]), + [maps:get(name, amqqueue:get_type_state(Q)), amqqueue:get_vhost(Q), Options]), #{name := StreamId} = amqqueue:get_type_state(Q), case process_command({restart_stream, StreamId, Options}) of {ok, {ok, LeaderPid}, _} -> {ok, node(LeaderPid)}; Err -> Err + end; +restart_stream(QRes, Options) -> + case rabbit_amqqueue:lookup(QRes) of + {ok, Q} -> + restart_stream(Q, Options); + Err -> + Err end. delete_stream(Q, ActingUser) diff --git a/deps/rabbit/src/rabbit_stream_queue.erl b/deps/rabbit/src/rabbit_stream_queue.erl index def01719f9df..63e19d88c111 100644 --- a/deps/rabbit/src/rabbit_stream_queue.erl +++ b/deps/rabbit/src/rabbit_stream_queue.erl @@ -1013,7 +1013,7 @@ init(Q) when ?is_amqqueue(Q) -> {error, stream_not_found}; {error, coordinator_unavailable} = E -> ?LOG_WARNING("Failed to start stream client ~tp: coordinator unavailable", - [rabbit_misc:rs(QName)]), + [rabbit_misc:rs(QName)]), E end. @@ -1024,8 +1024,7 @@ close(#stream_client{readers = Readers, rabbit_core_metrics:consumer_deleted(self(), CTag, QName) end, Readers). -update(Q, State) - when ?is_amqqueue(Q) -> +update(_Q, State) -> State. update_leader_pid(Pid, #stream_client{leader = Pid} = State) -> diff --git a/deps/rabbit/src/rabbit_volatile_queue.erl b/deps/rabbit/src/rabbit_volatile_queue.erl index 487422ca721c..d8886de6c36a 100644 --- a/deps/rabbit/src/rabbit_volatile_queue.erl +++ b/deps/rabbit/src/rabbit_volatile_queue.erl @@ -16,6 +16,7 @@ -include_lib("rabbit_common/include/rabbit.hrl"). -export([new/1, + new_target/1, new_name/0, is/1, key_from_name/1, @@ -94,6 +95,16 @@ new(#resource{virtual_host = Vhost, new0(Name, Pid, Vhost) -> amqqueue:new(Name, Pid, false, true, none, [], Vhost, #{}, ?MODULE). +-spec new_target(rabbit_amqqueue:name()) -> + amqqueue:target() | error. +new_target(#resource{name = NameBin} = Name) -> + case pid_from_name(NameBin) of + {ok, Pid} when is_pid(Pid) -> + amqqueue:new_target(Name, {?MODULE, Pid, none}); + _ -> + error + end. + -spec is(rabbit_misc:resource_name()) -> boolean(). is(<>) -> diff --git a/deps/rabbit/test/rabbit_db_queue_SUITE.erl b/deps/rabbit/test/rabbit_db_queue_SUITE.erl index c80b1fcfba8f..9ee433524869 100644 --- a/deps/rabbit/test/rabbit_db_queue_SUITE.erl +++ b/deps/rabbit/test/rabbit_db_queue_SUITE.erl @@ -31,7 +31,7 @@ all_tests() -> [ create_or_get, get, - get_many, + get_targets, get_all, get_all_by_vhost, get_all_by_type, @@ -131,20 +131,24 @@ get1(_Config) -> rabbit_db_queue:get(rabbit_misc:r(?VHOST, queue, <<"test-queue2">>))), passed. -get_many(Config) -> - passed = rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, get_many1, [Config]). +get_targets(Config) -> + passed = rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, get_targets1, [Config]). -get_many1(_Config) -> +get_targets1(_Config) -> QName = rabbit_misc:r(?VHOST, queue, <<"test-queue">>), QName2 = rabbit_misc:r(?VHOST, queue, <<"test-queue2">>), Q = new_queue(QName, rabbit_classic_queue), Q2 = new_queue(QName2, rabbit_classic_queue), ok = rabbit_db_queue:set(Q), - ?assertEqual([Q], rabbit_db_queue:get_many([QName])), - ?assertEqual([Q], rabbit_db_queue:get_many([QName, QName2])), - ?assertEqual([], rabbit_db_queue:get_many([QName2])), + Target = {rabbit_classic_queue, none, none}, + QTarget = amqqueue:new_target(QName, Target), + QTarget2 = amqqueue:new_target(QName2, Target), + ?assertEqual([QTarget], rabbit_db_queue:get_targets([QName])), + ?assertEqual([QTarget], rabbit_db_queue:get_targets([QName, QName2])), + ?assertEqual([], rabbit_db_queue:get_targets([QName2])), ok = rabbit_db_queue:set(Q2), - ?assertEqual(lists:sort([Q, Q2]), lists:sort(rabbit_db_queue:get_many([QName, QName2]))), + ?assertEqual(lists:sort([QTarget, QTarget2]), + lists:sort(rabbit_db_queue:get_targets([QName, QName2]))), passed. get_all(Config) -> diff --git a/deps/rabbitmq_management/test/clustering_SUITE.erl b/deps/rabbitmq_management/test/clustering_SUITE.erl index 6e6178dc9ad7..aece4220033e 100644 --- a/deps/rabbitmq_management/test/clustering_SUITE.erl +++ b/deps/rabbitmq_management/test/clustering_SUITE.erl @@ -214,7 +214,7 @@ queue_on_other_node(Config) -> Conn = rabbit_ct_client_helpers:open_unmanaged_connection(Config, 1), {ok, Chan} = amqp_connection:open_channel(Conn), _ = queue_declare(Chan, <<"some-queue">>), - _ = wait_for_queue(Config, "/queues/%2F/some-queue"), + eventually(?_assertEqual(1, length(http_get(Config, "/queues/%2F"))), 2000, 5), {ok, Chan2} = amqp_connection:open_channel(?config(conn, Config)), consume(Chan2, <<"some-queue">>), @@ -899,7 +899,6 @@ wait_for_queue(Config, Path, Keys) -> wait_for_queue(_Config, Path, Keys, 0) -> exit({timeout, {Path, Keys}}); - wait_for_queue(Config, Path, Keys, Count) -> Res = http_get(Config, Path), case present(Keys, Res) of diff --git a/deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl b/deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl index e8ac5e4232bb..48ab2a86977d 100644 --- a/deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl +++ b/deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl @@ -1697,7 +1697,7 @@ deliver_to_queues(Message, RoutedToQNames, State0 = #state{queue_states = QStates0, cfg = #cfg{proto_ver = ProtoVer}}) -> - Qs0 = rabbit_amqqueue:lookup_many(RoutedToQNames), + Qs0 = rabbit_db_queue:get_targets(RoutedToQNames), Qs = rabbit_amqqueue:prepend_extra_bcc(Qs0), case rabbit_queue_type:deliver(Qs, Message, Options, QStates0) of {ok, QStates, Actions} -> diff --git a/deps/rabbitmq_mqtt/src/rabbit_mqtt_qos0_queue.erl b/deps/rabbitmq_mqtt/src/rabbit_mqtt_qos0_queue.erl index 21b9cfe74081..c596e6dd660f 100644 --- a/deps/rabbitmq_mqtt/src/rabbit_mqtt_qos0_queue.erl +++ b/deps/rabbitmq_mqtt/src/rabbit_mqtt_qos0_queue.erl @@ -133,7 +133,7 @@ delete(Q, _IfUnused, _IfEmpty, ActingUser) -> supports_stateful_delivery() -> false. --spec deliver([{amqqueue:amqqueue(), stateless}], +-spec deliver([{amqqueue:target(), stateless}], Msg :: mc:state(), rabbit_queue_type:delivery_options()) -> {[], rabbit_queue_type:actions()}. diff --git a/deps/rabbitmq_recent_history_exchange/src/rabbit_exchange_type_recent_history.erl b/deps/rabbitmq_recent_history_exchange/src/rabbit_exchange_type_recent_history.erl index 552dfd2730fa..840990690ba4 100644 --- a/deps/rabbitmq_recent_history_exchange/src/rabbit_exchange_type_recent_history.erl +++ b/deps/rabbitmq_recent_history_exchange/src/rabbit_exchange_type_recent_history.erl @@ -91,12 +91,12 @@ add_binding(_Tx, #exchange{ name = XName }, {ok, X} -> Msgs = get_msgs_from_cache(XName), [begin - Qs = rabbit_exchange:route(X, Msg), - case rabbit_amqqueue:lookup_many(Qs) of + QNames = rabbit_exchange:route(X, Msg), + case rabbit_db_queue:get_targets(QNames) of [] -> - destination_not_found_error(Qs); - QPids -> - deliver_messages(QPids, [Msg]) + destination_not_found_error(QNames); + Qs -> + deliver_messages(Qs, [Msg]) end end || Msg <- Msgs] end, diff --git a/deps/rabbitmq_shovel/src/rabbit_local_shovel.erl b/deps/rabbitmq_shovel/src/rabbit_local_shovel.erl index ac491cb5fbd1..924e20b235b9 100644 --- a/deps/rabbitmq_shovel/src/rabbit_local_shovel.erl +++ b/deps/rabbitmq_shovel/src/rabbit_local_shovel.erl @@ -382,7 +382,7 @@ forward(Tag, Msg0, #{dest := #{current := #{queue_states := QState} = Current} = end, Msg = set_annotations(Msg0, Dest), RoutedQNames = route(Msg, Dest), - Queues = rabbit_amqqueue:lookup_many(RoutedQNames), + Queues = rabbit_db_queue:get_targets(RoutedQNames), messages_received(AckMode), case rabbit_queue_type:deliver(Queues, Msg, Options, QState) of {ok, QState1, Actions} ->