Skip to content

Commit 53e12d0

Browse files
committed
Add also support for ifUnused flag in the request
1 parent 5f1975c commit 53e12d0

File tree

2 files changed

+69
-17
lines changed

2 files changed

+69
-17
lines changed

deps/rabbit/src/rabbit_quorum_queue.erl

Lines changed: 12 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -858,26 +858,24 @@ restart_server({_, _} = Ref) ->
858858
rabbit_types:username()) ->
859859
{ok, QLen :: non_neg_integer()} |
860860
rabbit_types:error('not_empty') |
861+
rabbit_types:error('in_use') |
861862
{protocol_error, Type :: atom(), Reason :: string(), Args :: term()}.
862-
delete(Q, true, _IfEmpty, _ActingUser) when ?amqqueue_is_quorum(Q) ->
863-
{protocol_error, not_implemented,
864-
"cannot delete ~ts. queue.delete operations with if-unused flag set are not supported by quorum queues",
865-
[rabbit_misc:rs(amqqueue:get_name(Q))]};
866-
delete(Q, _IfUnused, IfEmpty, ActingUser) when ?amqqueue_is_quorum(Q) ->
867-
do_delete(Q, IfEmpty, ActingUser).
863+
delete(Q, IfUnused, IfEmpty, ActingUser) when ?amqqueue_is_quorum(Q) ->
864+
do_delete(Q, IfUnused, IfEmpty, ActingUser).
868865

869-
do_delete(Q, IfEmpty, ActingUser) when ?amqqueue_is_quorum(Q) ->
866+
do_delete(Q, IfUnused, IfEmpty, ActingUser) when ?amqqueue_is_quorum(Q) ->
870867
{Name, _} = amqqueue:get_pid(Q),
871868
QName = amqqueue:get_name(Q),
872869
QNodes = get_nodes(Q),
873-
%% TODO Quorum queue needs to support consumer tracking for IfUnused
874870
Timeout = ?DELETE_TIMEOUT,
875-
{ok, ReadyMsgs, UnackedMsgs} = stat(Q),
876-
%% Early return if IfEmpty flag is set and queue is not empty
877-
case IfEmpty of
878-
true when ReadyMsgs > 0 orelse UnackedMsgs > 0 ->
879-
{error, not_empty};
880-
_ ->
871+
{ok, ReadyMsgs, ConsumerCount} = stat(Q),
872+
IsEmpty = ReadyMsgs == 0,
873+
IsUnused = ConsumerCount == 0,
874+
%% Check preconditions, matching classic queue behavior
875+
if
876+
IfEmpty andalso not IsEmpty -> {error, not_empty};
877+
IfUnused andalso not IsUnused -> {error, in_use};
878+
true ->
881879
Servers = [{Name, Node} || Node <- QNodes],
882880
case ra:delete_cluster(Servers, Timeout) of
883881
{ok, {_, LeaderNode} = Leader} ->

deps/rabbit/test/quorum_queue_SUITE.erl

Lines changed: 57 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -180,6 +180,7 @@ all_tests() ->
180180
pre_existing_invalid_policy,
181181
delete_if_empty,
182182
delete_if_unused,
183+
delete_if_empty_and_unused,
183184
queue_ttl,
184185
peek,
185186
oldest_entry_timestamp,
@@ -4190,12 +4191,65 @@ delete_if_unused(Config) ->
41904191
QQ = ?config(queue_name, Config),
41914192
?assertEqual({'queue.declare_ok', QQ, 0, 0},
41924193
declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
4194+
4195+
%% Test 1: Delete fails when queue has active consumer (error 406 PRECONDITION_FAILED)
4196+
subscribe(Ch, QQ, false),
4197+
?assertExit({{shutdown, {server_initiated_close, 406, _}}, _},
4198+
amqp_channel:call(Ch, #'queue.delete'{queue = QQ,
4199+
if_unused = true})),
4200+
4201+
%% Test 2: Delete succeeds when queue has no consumers
4202+
Ch2 = rabbit_ct_client_helpers:open_channel(Config, Server),
4203+
%% Publish a message (queue can have messages, just no consumers)
4204+
publish(Ch2, QQ),
4205+
wait_for_messages(Config, [[QQ, <<"1">>, <<"1">>, <<"0">>]]),
4206+
%% Delete with if_unused should succeed since there are no consumers
4207+
?assertMatch(#'queue.delete_ok'{message_count = 1},
4208+
amqp_channel:call(Ch2, #'queue.delete'{queue = QQ,
4209+
if_unused = true})).
4210+
delete_if_empty_and_unused(Config) ->
4211+
Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
4212+
4213+
Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
4214+
QQ = <<"test-queue-both-flags">>,
4215+
?assertEqual({'queue.declare_ok', QQ, 0, 0},
4216+
declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
4217+
4218+
%% Test 1: Delete fails when queue has messages (if_empty check fails first)
41934219
publish(Ch, QQ),
41944220
wait_for_messages(Config, [[QQ, <<"1">>, <<"1">>, <<"0">>]]),
4195-
%% Try to delete the quorum queue
4196-
?assertExit({{shutdown, {connection_closing, {server_initiated_close, 540, _}}}, _},
4221+
?assertExit({{shutdown, {server_initiated_close, 406, _}}, _},
41974222
amqp_channel:call(Ch, #'queue.delete'{queue = QQ,
4198-
if_unused = true})).
4223+
if_empty = true,
4224+
if_unused = true})),
4225+
4226+
%% Test 2: Delete fails when queue has consumers (if_unused check fails)
4227+
Ch2 = rabbit_ct_client_helpers:open_channel(Config, Server),
4228+
%% Consume the message to empty the queue
4229+
subscribe(Ch2, QQ, false),
4230+
receive
4231+
{#'basic.deliver'{delivery_tag = Tag2}, _} ->
4232+
amqp_channel:cast(Ch2, #'basic.ack'{delivery_tag = Tag2})
4233+
after 5000 ->
4234+
ct:fail("Timeout waiting for message in test 3b")
4235+
end,
4236+
wait_for_messages(Config, [[QQ, <<"0">>, <<"0">>, <<"0">>]]),
4237+
%% Queue is empty but has a consumer, should fail with in_use
4238+
?assertExit({{shutdown, {server_initiated_close, 406, _}}, _},
4239+
amqp_channel:call(Ch2, #'queue.delete'{queue = QQ,
4240+
if_empty = true,
4241+
if_unused = true})),
4242+
4243+
%% Test 3: Delete succeeds when queue is empty and has no consumers
4244+
Ch3 = rabbit_ct_client_helpers:open_channel(Config, Server),
4245+
QQ2 = <<"test-queue-both-flags-ok">>,
4246+
?assertEqual({'queue.declare_ok', QQ2, 0, 0},
4247+
declare(Ch3, QQ2, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
4248+
%% Now delete should succeed (no consumers, no messages)
4249+
?assertMatch(#'queue.delete_ok'{message_count = 0},
4250+
amqp_channel:call(Ch3, #'queue.delete'{queue = QQ2,
4251+
if_empty = true,
4252+
if_unused = true})).
41994253

42004254
queue_ttl(Config) ->
42014255
Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),

0 commit comments

Comments
 (0)