Skip to content

Commit 5b9f98a

Browse files
authored
Merge pull request #14528 from rabbitmq/volatile-consumers
Disallow multiple consumers on one volatile queue
2 parents 439ea52 + 0843704 commit 5b9f98a

File tree

3 files changed

+91
-17
lines changed

3 files changed

+91
-17
lines changed

deps/rabbit/src/rabbit_stream_queue.erl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -341,7 +341,7 @@ consume(Q, Spec, #stream_client{} = QState0)
341341
ok_msg := OkMsg,
342342
acting_user := ActingUser} = Spec,
343343
?LOG_DEBUG("~s:~s Local pid resolved ~0p",
344-
[?MODULE, ?FUNCTION_NAME, LocalPid]),
344+
[?MODULE, ?FUNCTION_NAME, LocalPid]),
345345
case parse_offset_arg(
346346
rabbit_misc:table_lookup(Args, <<"x-stream-offset">>)) of
347347
{ok, OffsetSpec} ->

deps/rabbit/src/rabbit_volatile_queue.erl

Lines changed: 30 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,7 @@ new(#resource{virtual_host = Vhost,
8484
new0(Name, self(), Vhost);
8585
new(#resource{virtual_host = Vhost,
8686
name = NameBin} = Name) ->
87-
case pid_from_name(NameBin, nodes_with_hashes()) of
87+
case pid_from_name(NameBin) of
8888
{ok, Pid} when is_pid(Pid) ->
8989
new0(Name, Pid, Vhost);
9090
_ ->
@@ -104,19 +104,31 @@ is(Name) when is_binary(Name) ->
104104
init(Q) ->
105105
{ok, #?STATE{name = amqqueue:get_name(Q)}}.
106106

107-
consume(_Q, Spec, State) ->
108-
#{no_ack := true,
109-
consumer_tag := Ctag,
110-
mode := Mode} = Spec,
111-
{DeliveryCount, Credit} = case Mode of
112-
{credited, InitialDC} ->
113-
{InitialDC, 0};
114-
{simple_prefetch, 0} ->
115-
{undefined, undefined}
116-
end,
117-
{ok, State#?STATE{ctag = Ctag,
118-
delivery_count = DeliveryCount,
119-
credit = Credit}}.
107+
consume(Q, Spec, #?STATE{name = Name,
108+
ctag = undefined} = State) ->
109+
case amqqueue:get_pid(Q) of
110+
Pid when Pid =:= self() ->
111+
#{no_ack := true,
112+
consumer_tag := Ctag,
113+
mode := Mode} = Spec,
114+
{DeliveryCount, Credit} = case Mode of
115+
{credited, InitialDC} ->
116+
{InitialDC, 0};
117+
{simple_prefetch, 0} ->
118+
{undefined, undefined}
119+
end,
120+
{ok, State#?STATE{ctag = Ctag,
121+
delivery_count = DeliveryCount,
122+
credit = Credit}};
123+
_ ->
124+
{error, precondition_failed,
125+
"only creator channel may consume from ~ts",
126+
[rabbit_misc:rs(Name)]}
127+
end;
128+
consume(_Q, _Spec, #?STATE{name = Name}) ->
129+
{error, precondition_failed,
130+
"multiple consumers are unsupported for ~ts",
131+
[rabbit_misc:rs(Name)]}.
120132

121133
declare(Q, _Node) ->
122134
#resource{name = NameBin} = Name = amqqueue:get_name(Q),
@@ -135,7 +147,7 @@ declare(Q, _Node) ->
135147
-spec exists(rabbit_amqqueue:name()) -> boolean().
136148
exists(#resource{kind = queue,
137149
name = QNameBin} = QName) ->
138-
case pid_from_name(QNameBin, nodes_with_hashes()) of
150+
case pid_from_name(QNameBin) of
139151
{ok, Pid} when is_pid(Pid) ->
140152
case ff_enabled() of
141153
true ->
@@ -363,6 +375,9 @@ encode_pid(Pid) ->
363375
PidParts0),
364376
base64:encode(rabbit_pid_codec:recompose_to_binary(PidParts)).
365377

378+
pid_from_name(Name) ->
379+
pid_from_name(Name, nodes_with_hashes()).
380+
366381
-spec pid_from_name(rabbit_misc:resource_name(),
367382
#{non_neg_integer() => node()}) ->
368383
{ok, pid()} | error.

deps/rabbit/test/direct_reply_to_amqp_SUITE.erl

Lines changed: 60 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,9 @@ groups() ->
3838
[
3939
responder_attaches_queue_target,
4040
many_replies,
41-
many_volatile_queues_same_session
41+
many_volatile_queues_same_session,
42+
failure_multiple_consumers_same_session_same_queue,
43+
failure_multiple_consumers_different_session_same_queue
4244
]},
4345
{cluster_size_3, [shuffle],
4446
[
@@ -495,6 +497,63 @@ many_volatile_queues_same_session(Config) ->
495497
ok = close_connection_sync(ConnResponder),
496498
ok = close_connection_sync(ConnRequester).
497499

500+
%% Test that there can't be multiple consumers (from one session) on one volatile queue.
501+
failure_multiple_consumers_same_session_same_queue(Config) ->
502+
OpnConf0 = connection_config(Config),
503+
OpnConf = OpnConf0#{notify_with_performative => true},
504+
{ok, Conn} = amqp10_client:open_connection(OpnConf),
505+
{ok, Session} = amqp10_client:begin_session_sync(Conn),
506+
{ok, Receiver1} = amqp10_client:attach_link(Session, attach_args(<<"receiver-1">>)),
507+
AddrVolQ = receive {amqp10_event, {link, Receiver1, {attached, Attach}}} ->
508+
#'v1_0.attach'{
509+
source = #'v1_0.source'{
510+
address = {utf8, Addr}}} = Attach,
511+
Addr
512+
after 9000 -> ct:fail({missing_event, ?LINE})
513+
end,
514+
515+
{ok, Receiver2} = amqp10_client:attach_receiver_link(
516+
Session, <<"receiver-2">>, AddrVolQ, settled),
517+
receive {amqp10_event, {link, Receiver2, {detached, #'v1_0.detach'{}}}} ->
518+
ok
519+
after 9000 -> ct:fail({missing_event, ?LINE})
520+
end,
521+
522+
ok = end_session_sync(Session),
523+
ok = close_connection_sync(Conn).
524+
525+
%% Test that there can't be multiple consumers (from different sessions) on one volatile queue.
526+
failure_multiple_consumers_different_session_same_queue(Config) ->
527+
OpnConf0 = connection_config(Config),
528+
OpnConf = OpnConf0#{notify_with_performative => true},
529+
{ok, Conn} = amqp10_client:open_connection(OpnConf),
530+
{ok, Session1} = amqp10_client:begin_session_sync(Conn),
531+
{ok, Session2} = amqp10_client:begin_session_sync(Conn),
532+
533+
{ok, Receiver1} = amqp10_client:attach_link(Session1, attach_args(<<"receiver-1">>)),
534+
AddrVolQ = receive {amqp10_event, {link, Receiver1, {attached, Attach}}} ->
535+
#'v1_0.attach'{
536+
source = #'v1_0.source'{
537+
address = {utf8, Addr}}} = Attach,
538+
Addr
539+
after 9000 -> ct:fail({missing_event, ?LINE})
540+
end,
541+
542+
{ok, Receiver2} = amqp10_client:attach_receiver_link(
543+
Session2, <<"receiver-2">>, AddrVolQ, settled),
544+
receive {amqp10_event, {link, Receiver2, {detached, #'v1_0.detach'{error = Error}}}} ->
545+
?assertMatch(
546+
#'v1_0.error'{
547+
description = {utf8, <<"only creator channel may consume from "
548+
"queue 'amq.rabbitmq.reply-to.", _/binary>>}},
549+
Error)
550+
after 9000 -> ct:fail({missing_event, ?LINE})
551+
end,
552+
553+
ok = end_session_sync(Session1),
554+
ok = end_session_sync(Session2),
555+
ok = close_connection_sync(Conn).
556+
498557
%% "new" and "old" refers to new and old RabbitMQ versions in mixed version tests.
499558
rpc_new_to_old_node(Config) ->
500559
rpc(0, 1, Config).

0 commit comments

Comments
 (0)