Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion deps/rabbit/src/rabbit_amqqueue.erl
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@

-export_type([name/0, qmsg/0, absent_reason/0]).

-type name() :: rabbit_types:r('queue').
-type name() :: rabbit_types:r('queue') | rabbit_types:r('virtual_queue').

-type qpids() :: [pid()].
-type qlen() :: rabbit_types:ok(non_neg_integer()).
Expand Down
36 changes: 26 additions & 10 deletions deps/rabbit/src/rabbit_channel.erl
Original file line number Diff line number Diff line change
Expand Up @@ -1399,7 +1399,10 @@ handle_method(#'basic.consume'{queue = <<"amq.rabbitmq.reply-to">>,
consumer_tag = CTag0,
no_ack = NoAck,
nowait = NoWait},
_, State = #ch{reply_consumer = ReplyConsumer,
_, State = #ch{cfg = #conf{user = User,
virtual_host = VHostPath},
reply_consumer = ReplyConsumer,
queue_states = QStates0,
consumer_mapping = ConsumerMapping}) ->
case maps:find(CTag0, ConsumerMapping) of
error ->
Expand All @@ -1410,10 +1413,23 @@ handle_method(#'basic.consume'{queue = <<"amq.rabbitmq.reply-to">>,
rabbit_guid:gen_secure(), "amq.ctag");
Other -> Other
end,
%% Precalculate both suffix and key
{Key, Suffix} = rabbit_direct_reply_to:compute_key_and_suffix_v2(self()),
TmpQName = #resource{name = <<"amq.rabbitmq.reply-to">>,
kind = queue,
virtual_host = VHostPath},
Q = rabbit_virtual_queue:create_amqqueue(TmpQName),
%% suffix is pre-calculated by create_amqqueue
#{key := Key,
suffix := Suffix} = amqqueue:get_type_state(Q),
Consumer = {CTag, Suffix, Key},
State1 = State#ch{reply_consumer = Consumer},
Spec = #{no_ack => true,
channel_pid => self(),
consumer_tag => CTag,
ok_msg => undefined,
acting_user => User},
{ok, QStates1, Actions} = rabbit_queue_type:consume(Q, Spec, QStates0),
State1 = handle_queue_actions(Actions,
State#ch{reply_consumer = Consumer,
queue_states = QStates1}),
case NoWait of
true -> {noreply, State1};
false -> Rep = #'basic.consume_ok'{consumer_tag = CTag},
Expand Down Expand Up @@ -2174,15 +2190,15 @@ deliver_to_queues({Delivery = #delivery{message = Message = #basic_message{ex
confirm = Confirm,
msg_seq_no = MsgSeqNo},
RoutedToQueueNames = [QName]}, State0 = #ch{queue_states = QueueStates0}) -> %% optimisation when there is one queue
Qs0 = rabbit_amqqueue:lookup(RoutedToQueueNames),
Qs0 = rabbit_queue_type:lookup(RoutedToQueueNames),
Qs = rabbit_amqqueue:prepend_extra_bcc(Qs0),
QueueNames = lists:map(fun amqqueue:get_name/1, Qs),
case rabbit_queue_type:deliver(Qs, Delivery, QueueStates0) of
{ok, QueueStates, Actions} ->
{ok, QueueStates, Actions} ->
rabbit_global_counters:messages_routed(amqp091, erlang:min(1, length(Qs))),
%% NB: the order here is important since basic.returns must be
%% sent before confirms.
ok = process_routing_mandatory(Mandatory, Qs, Message, State0),
QueueNames = lists:map(fun amqqueue:get_name/1, Qs),
ok = process_routing_mandatory(Mandatory, QueueNames, Message, State0),
State1 = process_routing_confirm(Confirm, QueueNames, MsgSeqNo, XName, State0),
%% Actions must be processed after registering confirms as actions may
%% contain rejections of publishes
Expand Down Expand Up @@ -2242,13 +2258,13 @@ deliver_to_queues({Delivery = #delivery{message = Message = #basic_message{ex
end.

process_routing_mandatory(_Mandatory = true,
_RoutedToQs = [],
_RoutedToQNames = [],
Msg, State) ->
rabbit_global_counters:messages_unroutable_returned(amqp091, 1),
ok = basic_return(Msg, State, no_route),
ok;
process_routing_mandatory(_Mandatory = false,
_RoutedToQs = [],
_RoutedToQNames = [],
#basic_message{exchange_name = ExchangeName}, State) ->
rabbit_global_counters:messages_unroutable_dropped(amqp091, 1),
?INCR_STATS(exchange_stats, ExchangeName, 1, drop_unroutable, State),
Expand Down
2 changes: 1 addition & 1 deletion deps/rabbit/src/rabbit_classic_queue.erl
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ consume(Q, Spec, State) when ?amqqueue_is_classic(Q) ->
exclusive_consume := ExclusiveConsume,
args := Args,
ok_msg := OkMsg,
acting_user := ActingUser} = Spec,
acting_user := ActingUser} = Spec,
case delegate:invoke(QPid,
{gen_server2, call,
[{basic_consume, NoAck, ChPid, LimiterPid,
Expand Down
14 changes: 3 additions & 11 deletions deps/rabbit/src/rabbit_exchange.erl
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@

-module(rabbit_exchange).
-include_lib("rabbit_common/include/rabbit.hrl").
-include_lib("rabbit_common/include/rabbit_framing.hrl").

-export([recover/1, policy_changed/2, callback/4, declare/7,
assert_equivalence/6, assert_args_equivalence/2, check_type/1, exists/1,
Expand Down Expand Up @@ -405,27 +404,20 @@ info_all(VHostPath, Items, Ref, AggregatorPid) ->
rabbit_control_misc:emitting_map(
AggregatorPid, Ref, fun(X) -> info(X, Items) end, list(VHostPath)).

-spec route(rabbit_types:exchange(), rabbit_types:delivery())
-> [rabbit_amqqueue:name()].

-spec route(rabbit_types:exchange(), rabbit_types:delivery()) ->
[rabbit_amqqueue:name()].
route(#exchange{name = #resource{virtual_host = VHost, name = RName} = XName,
decorators = Decorators} = X,
#delivery{message = #basic_message{routing_keys = RKs}} = Delivery) ->
case RName of
<<>> ->
RKsSorted = lists:usort(RKs),
[rabbit_channel:deliver_reply(RK, Delivery) ||
RK <- RKsSorted, virtual_reply_queue(RK)],
[rabbit_misc:r(VHost, queue, RK) || RK <- RKsSorted,
not virtual_reply_queue(RK)];
[rabbit_misc:r(VHost, queue, RK) || RK <- RKsSorted];
_ ->
Decs = rabbit_exchange_decorator:select(route, Decorators),
lists:usort(route1(Delivery, Decs, {[X], XName, []}))
end.

virtual_reply_queue(<<"amq.rabbitmq.reply-to.", _/binary>>) -> true;
virtual_reply_queue(_) -> false.

route1(_, _, {[], _, QNames}) ->
QNames;
route1(Delivery, Decorators,
Expand Down
30 changes: 29 additions & 1 deletion deps/rabbit/src/rabbit_queue_type.erl
Original file line number Diff line number Diff line change
Expand Up @@ -47,12 +47,18 @@
notify_decorators/1
]).

-export([
lookup/1
]).

-type queue_name() :: rabbit_types:r(queue).
-type queue_ref() :: queue_name() | atom().
-type queue_state() :: term().
-type msg_tag() :: term().
-type arguments() :: queue_arguments | consumer_arguments.
-type queue_type() :: rabbit_classic_queue | rabbit_quorum_queue | rabbit_stream_queue.
-type queue_type() :: rabbit_classic_queue |
rabbit_quorum_queue |
rabbit_stream_queue.

-define(STATE, ?MODULE).

Expand Down Expand Up @@ -350,6 +356,28 @@ notify_decorators(Q) ->
Mod = amqqueue:get_type(Q),
Mod:notify_decorators(Q).

-spec lookup([rabbit_amqqueue:name()]) ->
[amqqueue:amqqueue()].
lookup(QNames) when is_list(QNames) ->
lookup0(QNames, []).

lookup0([], Acc) ->
Acc;
lookup0([#resource{kind = queue} = Name | Rem], Acc) ->
case rabbit_virtual_queue:is_virtual(Name) of
false ->
case ets:lookup(rabbit_queue, Name) of
[] ->
lookup0(Rem, Acc);
[Q] ->
lookup0(Rem, [Q | Acc])
end;
true ->
%% virtual queues are not persisted,
%% create a temporary amqqueue record here
lookup0(Rem, [rabbit_virtual_queue:create_amqqueue(Name) | Acc])
end.

-spec init() -> state().
init() ->
#?STATE{}.
Expand Down
Loading