Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 43 additions & 11 deletions deps/rabbit/src/amqqueue.erl
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
new/9,
new_with_version/9,
new_with_version/10,
new_target/2,
fields/0,
fields/1,
field_vhost/0,
Expand All @@ -39,6 +40,7 @@
% options
get_options/1,
set_options/2,
get_extra_bcc/1,
% pid
get_pid/1,
set_pid/2,
Expand Down Expand Up @@ -77,13 +79,15 @@
qnode/1,
to_printable/1,
to_printable/2,
macros/0]).
macros/0
]).

-define(record_version, amqqueue_v2).
-define(is_backwards_compat_classic(T),
(T =:= classic orelse T =:= ?amqqueue_v1_type)).

-type amqqueue_options() :: map() | ets:match_pattern().
-type extra_bcc() :: rabbit_misc:resource_name() | none.

-record(amqqueue, {
%% immutable
Expand Down Expand Up @@ -120,6 +124,17 @@
type_state = #{} :: map() | ets:match_pattern()
}).

%% A subset of the amqqueue record containing just the necessary fields
%% to deliver a message to a target queue.
-record(queue_target,
{name :: rabbit_amqqueue:name(),
target :: {rabbit_queue_type:queue_type(),
pid() | ra_server_id() | none,
extra_bcc()}
}).

-opaque target() :: #queue_target{}.

-type amqqueue() :: amqqueue_v2().
-type amqqueue_v2() :: #amqqueue{
name :: rabbit_amqqueue:name(),
Expand Down Expand Up @@ -175,6 +190,7 @@
amqqueue_v2/0,
amqqueue_pattern/0,
amqqueue_v2_pattern/0,
target/0,
ra_server_id/0]).

-spec new(rabbit_amqqueue:name(),
Expand Down Expand Up @@ -328,6 +344,15 @@ new_with_version(?record_version,
options = Options,
type = ensure_type_compat(Type)}.

-spec new_target(rabbit_amqqueue:name(),
{rabbit_queue_type:queue_type(),
pid() | ra_server_id() | none,
extra_bcc()}) ->
target().
new_target(Name, Target) when tuple_size(Target) =:= 3 ->
#queue_target{name = Name,
target = Target}.

-spec is_amqqueue(any()) -> boolean().

is_amqqueue(#amqqueue{}) -> true.
Expand Down Expand Up @@ -361,15 +386,21 @@ set_arguments(#amqqueue{} = Queue, Args) ->
% options

-spec get_options(amqqueue()) -> amqqueue_options().

get_options(#amqqueue{options = Options}) ->
Options.

-spec set_options(amqqueue(), amqqueue_options()) -> amqqueue().

set_options(#amqqueue{} = Queue, Options) ->
Queue#amqqueue{options = Options}.

-spec get_extra_bcc(amqqueue() | target()) ->
extra_bcc().
get_extra_bcc(#amqqueue{options = #{extra_bcc := ExtraBcc}}) ->
ExtraBcc;
get_extra_bcc(#amqqueue{}) ->
none;
get_extra_bcc(#queue_target{target = {_Type, _Pid, ExtraBcc}}) ->
ExtraBcc.

% decorators

Expand Down Expand Up @@ -418,9 +449,10 @@ set_operator_policy(#amqqueue{} = Queue, Policy) ->

% name

-spec get_name(amqqueue()) -> rabbit_amqqueue:name().
-spec get_name(amqqueue() | target()) -> rabbit_amqqueue:name().

get_name(#amqqueue{name = Name}) -> Name.
get_name(#amqqueue{name = Name}) -> Name;
get_name(#queue_target{name = Name}) -> Name.

-spec set_name(amqqueue(), rabbit_amqqueue:name()) -> amqqueue().

Expand All @@ -429,9 +461,10 @@ set_name(#amqqueue{} = Queue, Name) ->

% pid

-spec get_pid(amqqueue_v2()) -> pid() | ra_server_id() | none.
-spec get_pid(amqqueue_v2() | target()) -> pid() | ra_server_id() | none.

get_pid(#amqqueue{pid = Pid}) -> Pid.
get_pid(#amqqueue{pid = Pid}) -> Pid;
get_pid(#queue_target{target = {_Type, Pid, _ExtraBcc}}) -> Pid.

-spec set_pid(amqqueue_v2(), pid() | ra_server_id() | none) -> amqqueue_v2().

Expand Down Expand Up @@ -488,9 +521,10 @@ set_state(#amqqueue{} = Queue, State) ->

%% New in v2.

-spec get_type(amqqueue()) -> atom().
-spec get_type(amqqueue() | target()) -> atom().

get_type(#amqqueue{type = Type}) -> Type.
get_type(#amqqueue{type = Type}) -> Type;
get_type(#queue_target{target = {Type, _Pid, _ExtraBcc}}) -> Type.

-spec get_vhost(amqqueue()) -> rabbit_types:vhost() | undefined.

Expand Down Expand Up @@ -629,8 +663,6 @@ to_printable(QName = #resource{name = Name, virtual_host = VHost}, Type) ->
<<"virtual_host">> => VHost,
<<"type">> => Type}.

% private

macros() ->
io:format(
"-define(is_~ts(Q), is_record(Q, amqqueue, ~b)).~n~n",
Expand Down
4 changes: 2 additions & 2 deletions deps/rabbit/src/rabbit_amqp_session.erl
Original file line number Diff line number Diff line change
Expand Up @@ -2528,7 +2528,7 @@ incoming_link_transfer(
QNames = rabbit_exchange:route(X, Mc2, #{return_binding_keys => true}),
rabbit_trace:tap_in(Mc2, QNames, ConnName, ChannelNum, Username, Trace),
Opts = #{correlation => {HandleInt, DeliveryId}},
Qs0 = rabbit_amqqueue:lookup_many(QNames),
Qs0 = rabbit_db_queue:get_targets(QNames),
Qs = rabbit_amqqueue:prepend_extra_bcc(Qs0),
Mc = ensure_mc_cluster_compat(Mc2),
case rabbit_queue_type:deliver(Qs, Mc, Opts, QStates0) of
Expand Down Expand Up @@ -2674,8 +2674,8 @@ process_routing_confirm([], _SenderSettles = false, DeliveryId, U) ->
Disposition = released(DeliveryId),
{U, [Disposition]};
process_routing_confirm([_|_] = Qs, SenderSettles, DeliveryId, U0) ->
QNames = rabbit_amqqueue:queue_names(Qs),
false = maps:is_key(DeliveryId, U0),
QNames = rabbit_amqqueue:queue_names(Qs),
Map = maps:from_keys(QNames, ok),
U = U0#{DeliveryId => {Map, SenderSettles, false}},
rabbit_global_counters:messages_routed(?PROTOCOL, map_size(Map)),
Expand Down
87 changes: 34 additions & 53 deletions deps/rabbit/src/rabbit_amqqueue.erl
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
delete_immediately/1, delete_exclusive/2, delete/4, purge/1,
forget_all_durable/1]).
-export([pseudo_queue/2, pseudo_queue/3]).
-export([exists/1, lookup/1, lookup/2, lookup_many/1, lookup_durable_queue/1,
-export([exists/1, lookup/1, lookup/2, lookup_durable_queue/1,
not_found_or_absent_dirty/1,
with/2, with/3, with_or_die/2,
assert_equivalence/5,
Expand Down Expand Up @@ -367,14 +367,6 @@ lookup(Name) when is_record(Name, resource) ->
lookup_durable_queue(QName) ->
rabbit_db_queue:get_durable(QName).

-spec lookup_many(rabbit_exchange:route_return()) ->
[amqqueue:amqqueue() | {amqqueue:amqqueue(), route_infos()}].
lookup_many([]) ->
%% optimisation
[];
lookup_many(Names) when is_list(Names) ->
rabbit_db_queue:get_many(Names).

-spec lookup(binary(), binary()) ->
rabbit_types:ok(amqqueue:amqqueue()) |
rabbit_types:error('not_found').
Expand Down Expand Up @@ -2051,68 +2043,57 @@ get_quorum_nodes(Q) ->
end.

-spec prepend_extra_bcc(Qs) ->
Qs when Qs :: [amqqueue:amqqueue() |
{amqqueue:amqqueue(), route_infos()}].
Qs when Qs :: [amqqueue:target() | {amqqueue:target(), route_infos()}].
prepend_extra_bcc([]) ->
[];
prepend_extra_bcc([Q0] = Qs) ->
Q = queue(Q0),
case amqqueue:get_options(Q) of
#{extra_bcc := BCCName} ->
case get_bcc_queue(Q, BCCName) of
{ok, BCCQueue} ->
[BCCQueue | Qs];
{error, not_found} ->
Qs
end;
_ ->
Qs
case amqqueue:get_extra_bcc(Q) of
none ->
Qs;
Name ->
lookup_extra_bcc(Q, Name) ++ Qs
end;
prepend_extra_bcc(Qs) ->
BCCQueues =
lists:filtermap(
fun(Q0) ->
Q = queue(Q0),
case amqqueue:get_options(Q) of
#{extra_bcc := BCCName} ->
case get_bcc_queue(Q, BCCName) of
{ok, BCCQ} ->
{true, BCCQ};
{error, not_found} ->
false
end;
_ ->
false
end
end, Qs),
lists:usort(BCCQueues) ++ Qs.
ExtraQs = lists:filtermap(
fun(Q0) ->
Q = queue(Q0),
case amqqueue:get_extra_bcc(Q) of
none ->
false;
Name ->
case lookup_extra_bcc(Q, Name) of
[ExtraQ] ->
{true, ExtraQ};
[] ->
false
end
end
end, Qs),
lists:usort(ExtraQs) ++ Qs.

-spec queue(Q | {Q, route_infos()}) ->
Q when Q :: amqqueue:amqqueue().
queue(Q)
when ?is_amqqueue(Q) ->
Q when Q :: amqqueue:target().
queue({Q, RouteInfos}) when is_map(RouteInfos) ->
Q;
queue({Q, RouteInfos})
when ?is_amqqueue(Q) andalso is_map(RouteInfos) ->
queue(Q) ->
Q.

-spec queue_names([Q | {Q, route_infos()}]) ->
[name()] when Q :: amqqueue:amqqueue().
queue_names(Queues)
when is_list(Queues) ->
lists:map(fun(Q) when ?is_amqqueue(Q) ->
[name()] when Q :: amqqueue:target().
queue_names(Queues) ->
lists:map(fun({Q, RouteInfos}) when is_map(RouteInfos) ->
amqqueue:get_name(Q);
({Q, RouteInfos})
when ?is_amqqueue(Q) andalso is_map(RouteInfos) ->
(Q) ->
amqqueue:get_name(Q)
end, Queues).

-spec get_bcc_queue(amqqueue:amqqueue(), binary()) ->
{ok, amqqueue:amqqueue()} | {error, not_found}.
get_bcc_queue(Q, BCCName) ->
-spec lookup_extra_bcc(amqqueue:target(), binary()) ->
[amqqueue:target()].
lookup_extra_bcc(Q, BCCName) ->
#resource{virtual_host = VHost} = amqqueue:get_name(Q),
BCCQueueName = rabbit_misc:r(VHost, queue, BCCName),
lookup(BCCQueueName).
rabbit_db_queue:get_targets([BCCQueueName]).

is_queue_args_combination_permitted(Q) ->
Durable = amqqueue:is_durable(Q),
Expand Down
9 changes: 7 additions & 2 deletions deps/rabbit/src/rabbit_channel.erl
Original file line number Diff line number Diff line change
Expand Up @@ -1222,7 +1222,7 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin,
check_user_id_header(Message0, User),
Message = rabbit_msg_interceptor:intercept_incoming(Message0, MsgIcptCtx),
QNames = rabbit_exchange:route(Exchange, Message, #{return_binding_keys => true}),
Queues = rabbit_amqqueue:lookup_many(QNames),
Queues = rabbit_db_queue:get_targets(QNames),
rabbit_trace:tap_in(Message, QNames, ConnName, ChannelNum,
Username, TraceState),
%% TODO: call delivery_to_queues with plain args
Expand Down Expand Up @@ -2126,7 +2126,12 @@ deliver_to_queues(XName,
rabbit_misc:protocol_error(
resource_error,
"Stream coordinator unavailable for ~ts",
[rabbit_misc:rs(Resource)])
[rabbit_misc:rs(Resource)]);
{error, Reason} ->
rabbit_misc:protocol_error(
resource_error,
"failed to deliver message: ~tp",
[Reason])
end.

process_routing_mandatory(_Mandatory = true,
Expand Down
8 changes: 4 additions & 4 deletions deps/rabbit/src/rabbit_classic_queue.erl
Original file line number Diff line number Diff line change
Expand Up @@ -295,8 +295,8 @@ init(Q) when ?amqqueue_is_classic(Q) ->
close(_State) ->
ok.

-spec update(amqqueue:amqqueue(), state()) -> state().
update(Q, #?STATE{pid = Pid} = State) when ?amqqueue_is_classic(Q) ->
-spec update(amqqueue:amqqueue() | amqqueue:target(), state()) -> state().
update(Q, #?STATE{pid = Pid} = State) ->
case amqqueue:get_pid(Q) of
Pid ->
State;
Expand Down Expand Up @@ -473,10 +473,10 @@ settlement_action(Type, QRef, MsgSeqs, Acc) ->

supports_stateful_delivery() -> true.

-spec deliver([{amqqueue:amqqueue(), state()}],
-spec deliver([{amqqueue:target(), state()}],
Delivery :: mc:state(),
rabbit_queue_type:delivery_options()) ->
{[{amqqueue:amqqueue(), state()}], rabbit_queue_type:actions()}.
{[{amqqueue:target(), state()}], rabbit_queue_type:actions()}.
deliver(Qs0, Msg0, Options) ->
%% add guid to content here instead of in rabbit_basic:message/3,
%% as classic queues are the only ones that need it
Expand Down
Loading
Loading