Skip to content

Commit 338c826

Browse files
ansdmergify[bot]
authored andcommitted
Disallow multiple consumers on one volatile queue
There can be at most one consumer per volatile queue instance. This consumer must also have attached on the same channel/session as the creator of the queue. Prior to this commit, it was possible for clients on other connections or sessions to attach a receiving link to an existing volatile queue name, even though no messages would be delivered. It's better for RabbitMQ to directly refuse the link at attach time. (cherry picked from commit 0843704)
1 parent 13f0e96 commit 338c826

File tree

3 files changed

+91
-17
lines changed

3 files changed

+91
-17
lines changed

deps/rabbit/src/rabbit_stream_queue.erl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -341,7 +341,7 @@ consume(Q, Spec, #stream_client{} = QState0)
341341
ok_msg := OkMsg,
342342
acting_user := ActingUser} = Spec,
343343
?LOG_DEBUG("~s:~s Local pid resolved ~0p",
344-
[?MODULE, ?FUNCTION_NAME, LocalPid]),
344+
[?MODULE, ?FUNCTION_NAME, LocalPid]),
345345
case parse_offset_arg(
346346
rabbit_misc:table_lookup(Args, <<"x-stream-offset">>)) of
347347
{ok, OffsetSpec} ->

deps/rabbit/src/rabbit_volatile_queue.erl

Lines changed: 30 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,7 @@ new(#resource{virtual_host = Vhost,
8484
new0(Name, self(), Vhost);
8585
new(#resource{virtual_host = Vhost,
8686
name = NameBin} = Name) ->
87-
case pid_from_name(NameBin, nodes_with_hashes()) of
87+
case pid_from_name(NameBin) of
8888
{ok, Pid} when is_pid(Pid) ->
8989
new0(Name, Pid, Vhost);
9090
_ ->
@@ -104,19 +104,31 @@ is(Name) when is_binary(Name) ->
104104
init(Q) ->
105105
{ok, #?STATE{name = amqqueue:get_name(Q)}}.
106106

107-
consume(_Q, Spec, State) ->
108-
#{no_ack := true,
109-
consumer_tag := Ctag,
110-
mode := Mode} = Spec,
111-
{DeliveryCount, Credit} = case Mode of
112-
{credited, InitialDC} ->
113-
{InitialDC, 0};
114-
{simple_prefetch, 0} ->
115-
{undefined, undefined}
116-
end,
117-
{ok, State#?STATE{ctag = Ctag,
118-
delivery_count = DeliveryCount,
119-
credit = Credit}}.
107+
consume(Q, Spec, #?STATE{name = Name,
108+
ctag = undefined} = State) ->
109+
case amqqueue:get_pid(Q) of
110+
Pid when Pid =:= self() ->
111+
#{no_ack := true,
112+
consumer_tag := Ctag,
113+
mode := Mode} = Spec,
114+
{DeliveryCount, Credit} = case Mode of
115+
{credited, InitialDC} ->
116+
{InitialDC, 0};
117+
{simple_prefetch, 0} ->
118+
{undefined, undefined}
119+
end,
120+
{ok, State#?STATE{ctag = Ctag,
121+
delivery_count = DeliveryCount,
122+
credit = Credit}};
123+
_ ->
124+
{error, precondition_failed,
125+
"only creator channel may consume from ~ts",
126+
[rabbit_misc:rs(Name)]}
127+
end;
128+
consume(_Q, _Spec, #?STATE{name = Name}) ->
129+
{error, precondition_failed,
130+
"multiple consumers are unsupported for ~ts",
131+
[rabbit_misc:rs(Name)]}.
120132

121133
declare(Q, _Node) ->
122134
#resource{name = NameBin} = Name = amqqueue:get_name(Q),
@@ -135,7 +147,7 @@ declare(Q, _Node) ->
135147
-spec exists(rabbit_amqqueue:name()) -> boolean().
136148
exists(#resource{kind = queue,
137149
name = QNameBin} = QName) ->
138-
case pid_from_name(QNameBin, nodes_with_hashes()) of
150+
case pid_from_name(QNameBin) of
139151
{ok, Pid} when is_pid(Pid) ->
140152
case ff_enabled() of
141153
true ->
@@ -363,6 +375,9 @@ encode_pid(Pid) ->
363375
PidParts0),
364376
base64:encode(rabbit_pid_codec:recompose_to_binary(PidParts)).
365377

378+
pid_from_name(Name) ->
379+
pid_from_name(Name, nodes_with_hashes()).
380+
366381
-spec pid_from_name(rabbit_misc:resource_name(),
367382
#{non_neg_integer() => node()}) ->
368383
{ok, pid()} | error.

deps/rabbit/test/direct_reply_to_amqp_SUITE.erl

Lines changed: 60 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,9 @@ groups() ->
3838
[
3939
responder_attaches_queue_target,
4040
many_replies,
41-
many_volatile_queues_same_session
41+
many_volatile_queues_same_session,
42+
failure_multiple_consumers_same_session_same_queue,
43+
failure_multiple_consumers_different_session_same_queue
4244
]},
4345
{cluster_size_3, [shuffle],
4446
[
@@ -495,6 +497,63 @@ many_volatile_queues_same_session(Config) ->
495497
ok = close_connection_sync(ConnResponder),
496498
ok = close_connection_sync(ConnRequester).
497499

500+
%% Test that there can't be multiple consumers (from one session) on one volatile queue.
501+
failure_multiple_consumers_same_session_same_queue(Config) ->
502+
OpnConf0 = connection_config(Config),
503+
OpnConf = OpnConf0#{notify_with_performative => true},
504+
{ok, Conn} = amqp10_client:open_connection(OpnConf),
505+
{ok, Session} = amqp10_client:begin_session_sync(Conn),
506+
{ok, Receiver1} = amqp10_client:attach_link(Session, attach_args(<<"receiver-1">>)),
507+
AddrVolQ = receive {amqp10_event, {link, Receiver1, {attached, Attach}}} ->
508+
#'v1_0.attach'{
509+
source = #'v1_0.source'{
510+
address = {utf8, Addr}}} = Attach,
511+
Addr
512+
after 9000 -> ct:fail({missing_event, ?LINE})
513+
end,
514+
515+
{ok, Receiver2} = amqp10_client:attach_receiver_link(
516+
Session, <<"receiver-2">>, AddrVolQ, settled),
517+
receive {amqp10_event, {link, Receiver2, {detached, #'v1_0.detach'{}}}} ->
518+
ok
519+
after 9000 -> ct:fail({missing_event, ?LINE})
520+
end,
521+
522+
ok = end_session_sync(Session),
523+
ok = close_connection_sync(Conn).
524+
525+
%% Test that there can't be multiple consumers (from different sessions) on one volatile queue.
526+
failure_multiple_consumers_different_session_same_queue(Config) ->
527+
OpnConf0 = connection_config(Config),
528+
OpnConf = OpnConf0#{notify_with_performative => true},
529+
{ok, Conn} = amqp10_client:open_connection(OpnConf),
530+
{ok, Session1} = amqp10_client:begin_session_sync(Conn),
531+
{ok, Session2} = amqp10_client:begin_session_sync(Conn),
532+
533+
{ok, Receiver1} = amqp10_client:attach_link(Session1, attach_args(<<"receiver-1">>)),
534+
AddrVolQ = receive {amqp10_event, {link, Receiver1, {attached, Attach}}} ->
535+
#'v1_0.attach'{
536+
source = #'v1_0.source'{
537+
address = {utf8, Addr}}} = Attach,
538+
Addr
539+
after 9000 -> ct:fail({missing_event, ?LINE})
540+
end,
541+
542+
{ok, Receiver2} = amqp10_client:attach_receiver_link(
543+
Session2, <<"receiver-2">>, AddrVolQ, settled),
544+
receive {amqp10_event, {link, Receiver2, {detached, #'v1_0.detach'{error = Error}}}} ->
545+
?assertMatch(
546+
#'v1_0.error'{
547+
description = {utf8, <<"only creator channel may consume from "
548+
"queue 'amq.rabbitmq.reply-to.", _/binary>>}},
549+
Error)
550+
after 9000 -> ct:fail({missing_event, ?LINE})
551+
end,
552+
553+
ok = end_session_sync(Session1),
554+
ok = end_session_sync(Session2),
555+
ok = close_connection_sync(Conn).
556+
498557
%% "new" and "old" refers to new and old RabbitMQ versions in mixed version tests.
499558
rpc_new_to_old_node(Config) ->
500559
rpc(0, 1, Config).

0 commit comments

Comments
 (0)