Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion deps/rabbit/src/rabbit_stream_queue.erl
Original file line number Diff line number Diff line change
Expand Up @@ -341,7 +341,7 @@ consume(Q, Spec, #stream_client{} = QState0)
ok_msg := OkMsg,
acting_user := ActingUser} = Spec,
?LOG_DEBUG("~s:~s Local pid resolved ~0p",
[?MODULE, ?FUNCTION_NAME, LocalPid]),
[?MODULE, ?FUNCTION_NAME, LocalPid]),
case parse_offset_arg(
rabbit_misc:table_lookup(Args, <<"x-stream-offset">>)) of
{ok, OffsetSpec} ->
Expand Down
45 changes: 30 additions & 15 deletions deps/rabbit/src/rabbit_volatile_queue.erl
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ new(#resource{virtual_host = Vhost,
new0(Name, self(), Vhost);
new(#resource{virtual_host = Vhost,
name = NameBin} = Name) ->
case pid_from_name(NameBin, nodes_with_hashes()) of
case pid_from_name(NameBin) of
{ok, Pid} when is_pid(Pid) ->
new0(Name, Pid, Vhost);
_ ->
Expand All @@ -104,19 +104,31 @@ is(Name) when is_binary(Name) ->
init(Q) ->
{ok, #?STATE{name = amqqueue:get_name(Q)}}.

consume(_Q, Spec, State) ->
#{no_ack := true,
consumer_tag := Ctag,
mode := Mode} = Spec,
{DeliveryCount, Credit} = case Mode of
{credited, InitialDC} ->
{InitialDC, 0};
{simple_prefetch, 0} ->
{undefined, undefined}
end,
{ok, State#?STATE{ctag = Ctag,
delivery_count = DeliveryCount,
credit = Credit}}.
consume(Q, Spec, #?STATE{name = Name,
ctag = undefined} = State) ->
case amqqueue:get_pid(Q) of
Pid when Pid =:= self() ->
#{no_ack := true,
consumer_tag := Ctag,
mode := Mode} = Spec,
{DeliveryCount, Credit} = case Mode of
{credited, InitialDC} ->
{InitialDC, 0};
{simple_prefetch, 0} ->
{undefined, undefined}
end,
{ok, State#?STATE{ctag = Ctag,
delivery_count = DeliveryCount,
credit = Credit}};
_ ->
{error, precondition_failed,
"only creator channel may consume from ~ts",
[rabbit_misc:rs(Name)]}
end;
consume(_Q, _Spec, #?STATE{name = Name}) ->
{error, precondition_failed,
"multiple consumers are unsupported for ~ts",
[rabbit_misc:rs(Name)]}.

declare(Q, _Node) ->
#resource{name = NameBin} = Name = amqqueue:get_name(Q),
Expand All @@ -135,7 +147,7 @@ declare(Q, _Node) ->
-spec exists(rabbit_amqqueue:name()) -> boolean().
exists(#resource{kind = queue,
name = QNameBin} = QName) ->
case pid_from_name(QNameBin, nodes_with_hashes()) of
case pid_from_name(QNameBin) of
{ok, Pid} when is_pid(Pid) ->
case ff_enabled() of
true ->
Expand Down Expand Up @@ -363,6 +375,9 @@ encode_pid(Pid) ->
PidParts0),
base64:encode(rabbit_pid_codec:recompose_to_binary(PidParts)).

pid_from_name(Name) ->
pid_from_name(Name, nodes_with_hashes()).

-spec pid_from_name(rabbit_misc:resource_name(),
#{non_neg_integer() => node()}) ->
{ok, pid()} | error.
Expand Down
61 changes: 60 additions & 1 deletion deps/rabbit/test/direct_reply_to_amqp_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,9 @@ groups() ->
[
responder_attaches_queue_target,
many_replies,
many_volatile_queues_same_session
many_volatile_queues_same_session,
failure_multiple_consumers_same_session_same_queue,
failure_multiple_consumers_different_session_same_queue
]},
{cluster_size_3, [shuffle],
[
Expand Down Expand Up @@ -495,6 +497,63 @@ many_volatile_queues_same_session(Config) ->
ok = close_connection_sync(ConnResponder),
ok = close_connection_sync(ConnRequester).

%% Test that there can't be multiple consumers (from one session) on one volatile queue.
failure_multiple_consumers_same_session_same_queue(Config) ->
OpnConf0 = connection_config(Config),
OpnConf = OpnConf0#{notify_with_performative => true},
{ok, Conn} = amqp10_client:open_connection(OpnConf),
{ok, Session} = amqp10_client:begin_session_sync(Conn),
{ok, Receiver1} = amqp10_client:attach_link(Session, attach_args(<<"receiver-1">>)),
AddrVolQ = receive {amqp10_event, {link, Receiver1, {attached, Attach}}} ->
#'v1_0.attach'{
source = #'v1_0.source'{
address = {utf8, Addr}}} = Attach,
Addr
after 9000 -> ct:fail({missing_event, ?LINE})
end,

{ok, Receiver2} = amqp10_client:attach_receiver_link(
Session, <<"receiver-2">>, AddrVolQ, settled),
receive {amqp10_event, {link, Receiver2, {detached, #'v1_0.detach'{}}}} ->
ok
after 9000 -> ct:fail({missing_event, ?LINE})
end,

ok = end_session_sync(Session),
ok = close_connection_sync(Conn).

%% Test that there can't be multiple consumers (from different sessions) on one volatile queue.
failure_multiple_consumers_different_session_same_queue(Config) ->
OpnConf0 = connection_config(Config),
OpnConf = OpnConf0#{notify_with_performative => true},
{ok, Conn} = amqp10_client:open_connection(OpnConf),
{ok, Session1} = amqp10_client:begin_session_sync(Conn),
{ok, Session2} = amqp10_client:begin_session_sync(Conn),

{ok, Receiver1} = amqp10_client:attach_link(Session1, attach_args(<<"receiver-1">>)),
AddrVolQ = receive {amqp10_event, {link, Receiver1, {attached, Attach}}} ->
#'v1_0.attach'{
source = #'v1_0.source'{
address = {utf8, Addr}}} = Attach,
Addr
after 9000 -> ct:fail({missing_event, ?LINE})
end,

{ok, Receiver2} = amqp10_client:attach_receiver_link(
Session2, <<"receiver-2">>, AddrVolQ, settled),
receive {amqp10_event, {link, Receiver2, {detached, #'v1_0.detach'{error = Error}}}} ->
?assertMatch(
#'v1_0.error'{
description = {utf8, <<"only creator channel may consume from "
"queue 'amq.rabbitmq.reply-to.", _/binary>>}},
Error)
after 9000 -> ct:fail({missing_event, ?LINE})
end,

ok = end_session_sync(Session1),
ok = end_session_sync(Session2),
ok = close_connection_sync(Conn).

%% "new" and "old" refers to new and old RabbitMQ versions in mixed version tests.
rpc_new_to_old_node(Config) ->
rpc(0, 1, Config).
Expand Down
Loading