From 0843704dbb4133c6f8b0eeec765bce50d076e89e Mon Sep 17 00:00:00 2001 From: David Ansari Date: Fri, 12 Sep 2025 11:59:57 +0200 Subject: [PATCH] Disallow multiple consumers on one volatile queue There can be at most one consumer per volatile queue instance. This consumer must also have attached on the same channel/session as the creator of the queue. Prior to this commit, it was possible for clients on other connections or sessions to attach a receiving link to an existing volatile queue name, even though no messages would be delivered. It's better for RabbitMQ to directly refuse the link at attach time. --- deps/rabbit/src/rabbit_stream_queue.erl | 2 +- deps/rabbit/src/rabbit_volatile_queue.erl | 45 +++++++++----- .../test/direct_reply_to_amqp_SUITE.erl | 61 ++++++++++++++++++- 3 files changed, 91 insertions(+), 17 deletions(-) diff --git a/deps/rabbit/src/rabbit_stream_queue.erl b/deps/rabbit/src/rabbit_stream_queue.erl index 8600c6973287..6fc36681555b 100644 --- a/deps/rabbit/src/rabbit_stream_queue.erl +++ b/deps/rabbit/src/rabbit_stream_queue.erl @@ -341,7 +341,7 @@ consume(Q, Spec, #stream_client{} = QState0) ok_msg := OkMsg, acting_user := ActingUser} = Spec, ?LOG_DEBUG("~s:~s Local pid resolved ~0p", - [?MODULE, ?FUNCTION_NAME, LocalPid]), + [?MODULE, ?FUNCTION_NAME, LocalPid]), case parse_offset_arg( rabbit_misc:table_lookup(Args, <<"x-stream-offset">>)) of {ok, OffsetSpec} -> diff --git a/deps/rabbit/src/rabbit_volatile_queue.erl b/deps/rabbit/src/rabbit_volatile_queue.erl index 79b82032a6eb..487422ca721c 100644 --- a/deps/rabbit/src/rabbit_volatile_queue.erl +++ b/deps/rabbit/src/rabbit_volatile_queue.erl @@ -84,7 +84,7 @@ new(#resource{virtual_host = Vhost, new0(Name, self(), Vhost); new(#resource{virtual_host = Vhost, name = NameBin} = Name) -> - case pid_from_name(NameBin, nodes_with_hashes()) of + case pid_from_name(NameBin) of {ok, Pid} when is_pid(Pid) -> new0(Name, Pid, Vhost); _ -> @@ -104,19 +104,31 @@ is(Name) when is_binary(Name) -> init(Q) -> {ok, #?STATE{name = amqqueue:get_name(Q)}}. -consume(_Q, Spec, State) -> - #{no_ack := true, - consumer_tag := Ctag, - mode := Mode} = Spec, - {DeliveryCount, Credit} = case Mode of - {credited, InitialDC} -> - {InitialDC, 0}; - {simple_prefetch, 0} -> - {undefined, undefined} - end, - {ok, State#?STATE{ctag = Ctag, - delivery_count = DeliveryCount, - credit = Credit}}. +consume(Q, Spec, #?STATE{name = Name, + ctag = undefined} = State) -> + case amqqueue:get_pid(Q) of + Pid when Pid =:= self() -> + #{no_ack := true, + consumer_tag := Ctag, + mode := Mode} = Spec, + {DeliveryCount, Credit} = case Mode of + {credited, InitialDC} -> + {InitialDC, 0}; + {simple_prefetch, 0} -> + {undefined, undefined} + end, + {ok, State#?STATE{ctag = Ctag, + delivery_count = DeliveryCount, + credit = Credit}}; + _ -> + {error, precondition_failed, + "only creator channel may consume from ~ts", + [rabbit_misc:rs(Name)]} + end; +consume(_Q, _Spec, #?STATE{name = Name}) -> + {error, precondition_failed, + "multiple consumers are unsupported for ~ts", + [rabbit_misc:rs(Name)]}. declare(Q, _Node) -> #resource{name = NameBin} = Name = amqqueue:get_name(Q), @@ -135,7 +147,7 @@ declare(Q, _Node) -> -spec exists(rabbit_amqqueue:name()) -> boolean(). exists(#resource{kind = queue, name = QNameBin} = QName) -> - case pid_from_name(QNameBin, nodes_with_hashes()) of + case pid_from_name(QNameBin) of {ok, Pid} when is_pid(Pid) -> case ff_enabled() of true -> @@ -363,6 +375,9 @@ encode_pid(Pid) -> PidParts0), base64:encode(rabbit_pid_codec:recompose_to_binary(PidParts)). +pid_from_name(Name) -> + pid_from_name(Name, nodes_with_hashes()). + -spec pid_from_name(rabbit_misc:resource_name(), #{non_neg_integer() => node()}) -> {ok, pid()} | error. diff --git a/deps/rabbit/test/direct_reply_to_amqp_SUITE.erl b/deps/rabbit/test/direct_reply_to_amqp_SUITE.erl index bc4c18290e4f..8ac8037fa17a 100644 --- a/deps/rabbit/test/direct_reply_to_amqp_SUITE.erl +++ b/deps/rabbit/test/direct_reply_to_amqp_SUITE.erl @@ -38,7 +38,9 @@ groups() -> [ responder_attaches_queue_target, many_replies, - many_volatile_queues_same_session + many_volatile_queues_same_session, + failure_multiple_consumers_same_session_same_queue, + failure_multiple_consumers_different_session_same_queue ]}, {cluster_size_3, [shuffle], [ @@ -495,6 +497,63 @@ many_volatile_queues_same_session(Config) -> ok = close_connection_sync(ConnResponder), ok = close_connection_sync(ConnRequester). +%% Test that there can't be multiple consumers (from one session) on one volatile queue. +failure_multiple_consumers_same_session_same_queue(Config) -> + OpnConf0 = connection_config(Config), + OpnConf = OpnConf0#{notify_with_performative => true}, + {ok, Conn} = amqp10_client:open_connection(OpnConf), + {ok, Session} = amqp10_client:begin_session_sync(Conn), + {ok, Receiver1} = amqp10_client:attach_link(Session, attach_args(<<"receiver-1">>)), + AddrVolQ = receive {amqp10_event, {link, Receiver1, {attached, Attach}}} -> + #'v1_0.attach'{ + source = #'v1_0.source'{ + address = {utf8, Addr}}} = Attach, + Addr + after 9000 -> ct:fail({missing_event, ?LINE}) + end, + + {ok, Receiver2} = amqp10_client:attach_receiver_link( + Session, <<"receiver-2">>, AddrVolQ, settled), + receive {amqp10_event, {link, Receiver2, {detached, #'v1_0.detach'{}}}} -> + ok + after 9000 -> ct:fail({missing_event, ?LINE}) + end, + + ok = end_session_sync(Session), + ok = close_connection_sync(Conn). + +%% Test that there can't be multiple consumers (from different sessions) on one volatile queue. +failure_multiple_consumers_different_session_same_queue(Config) -> + OpnConf0 = connection_config(Config), + OpnConf = OpnConf0#{notify_with_performative => true}, + {ok, Conn} = amqp10_client:open_connection(OpnConf), + {ok, Session1} = amqp10_client:begin_session_sync(Conn), + {ok, Session2} = amqp10_client:begin_session_sync(Conn), + + {ok, Receiver1} = amqp10_client:attach_link(Session1, attach_args(<<"receiver-1">>)), + AddrVolQ = receive {amqp10_event, {link, Receiver1, {attached, Attach}}} -> + #'v1_0.attach'{ + source = #'v1_0.source'{ + address = {utf8, Addr}}} = Attach, + Addr + after 9000 -> ct:fail({missing_event, ?LINE}) + end, + + {ok, Receiver2} = amqp10_client:attach_receiver_link( + Session2, <<"receiver-2">>, AddrVolQ, settled), + receive {amqp10_event, {link, Receiver2, {detached, #'v1_0.detach'{error = Error}}}} -> + ?assertMatch( + #'v1_0.error'{ + description = {utf8, <<"only creator channel may consume from " + "queue 'amq.rabbitmq.reply-to.", _/binary>>}}, + Error) + after 9000 -> ct:fail({missing_event, ?LINE}) + end, + + ok = end_session_sync(Session1), + ok = end_session_sync(Session2), + ok = close_connection_sync(Conn). + %% "new" and "old" refers to new and old RabbitMQ versions in mixed version tests. rpc_new_to_old_node(Config) -> rpc(0, 1, Config).