Skip to content

Commit 61e8fc0

Browse files
committed
Support ifEmpty flag in quorum queues deletion
1 parent 04b9c50 commit 61e8fc0

File tree

2 files changed

+80
-57
lines changed

2 files changed

+80
-57
lines changed

deps/rabbit/src/rabbit_quorum_queue.erl

Lines changed: 60 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -856,69 +856,75 @@ restart_server({_, _} = Ref) ->
856856
-spec delete(amqqueue:amqqueue(),
857857
boolean(), boolean(),
858858
rabbit_types:username()) ->
859-
{ok, QLen :: non_neg_integer()} |
860-
{protocol_error, Type :: atom(), Reason :: string(), Args :: term()}.
859+
{ok, QLen :: non_neg_integer()} |
860+
rabbit_types:error('not_empty') |
861+
{protocol_error, Type :: atom(), Reason :: string(), Args :: term()}.
861862
delete(Q, true, _IfEmpty, _ActingUser) when ?amqqueue_is_quorum(Q) ->
862863
{protocol_error, not_implemented,
863864
"cannot delete ~ts. queue.delete operations with if-unused flag set are not supported by quorum queues",
864865
[rabbit_misc:rs(amqqueue:get_name(Q))]};
865-
delete(Q, _IfUnused, true, _ActingUser) when ?amqqueue_is_quorum(Q) ->
866-
{protocol_error, not_implemented,
867-
"cannot delete ~ts. queue.delete operations with if-empty flag set are not supported by quorum queues",
868-
[rabbit_misc:rs(amqqueue:get_name(Q))]};
869-
delete(Q, _IfUnused, _IfEmpty, ActingUser) when ?amqqueue_is_quorum(Q) ->
866+
delete(Q, _IfUnused, IfEmpty, ActingUser) when ?amqqueue_is_quorum(Q) ->
867+
do_delete(Q, IfEmpty, ActingUser).
868+
869+
do_delete(Q, IfEmpty, ActingUser) when ?amqqueue_is_quorum(Q) ->
870870
{Name, _} = amqqueue:get_pid(Q),
871871
QName = amqqueue:get_name(Q),
872872
QNodes = get_nodes(Q),
873873
%% TODO Quorum queue needs to support consumer tracking for IfUnused
874874
Timeout = ?DELETE_TIMEOUT,
875-
{ok, ReadyMsgs, _} = stat(Q),
876-
Servers = [{Name, Node} || Node <- QNodes],
877-
case ra:delete_cluster(Servers, Timeout) of
878-
{ok, {_, LeaderNode} = Leader} ->
879-
MRef = erlang:monitor(process, Leader),
880-
receive
881-
{'DOWN', MRef, process, _, _} ->
882-
%% leader is down,
883-
%% force delete remaining members
884-
ok = force_delete_queue(lists:delete(Leader, Servers)),
885-
ok
886-
after Timeout ->
887-
erlang:demonitor(MRef, [flush]),
888-
ok = force_delete_queue(Servers)
889-
end,
890-
notify_decorators(QName, shutdown),
891-
case delete_queue_data(Q, ActingUser) of
892-
ok ->
893-
_ = erpc_call(LeaderNode, rabbit_core_metrics, queue_deleted, [QName],
894-
?RPC_TIMEOUT),
895-
{ok, ReadyMsgs};
896-
{error, timeout} = Err ->
897-
Err
898-
end;
899-
{error, {no_more_servers_to_try, Errs}} ->
900-
case lists:all(fun({{error, noproc}, _}) -> true;
901-
(_) -> false
902-
end, Errs) of
903-
true ->
904-
%% If all ra nodes were already down, the delete
905-
%% has succeed
906-
ok;
907-
false ->
908-
%% attempt forced deletion of all servers
909-
?LOG_WARNING(
910-
"Could not delete quorum '~ts', not enough nodes "
911-
" online to reach a quorum: ~255p."
912-
" Attempting force delete.",
913-
[rabbit_misc:rs(QName), Errs]),
914-
ok = force_delete_queue(Servers),
915-
notify_decorators(QName, shutdown)
916-
end,
917-
case delete_queue_data(Q, ActingUser) of
918-
ok ->
919-
{ok, ReadyMsgs};
920-
{error, timeout} = Err ->
921-
Err
875+
{ok, ReadyMsgs, UnackedMsgs} = stat(Q),
876+
%% Early return if IfEmpty flag is set and queue is not empty
877+
case IfEmpty of
878+
true when ReadyMsgs > 0 orelse UnackedMsgs > 0 ->
879+
{error, not_empty};
880+
_ ->
881+
Servers = [{Name, Node} || Node <- QNodes],
882+
case ra:delete_cluster(Servers, Timeout) of
883+
{ok, {_, LeaderNode} = Leader} ->
884+
MRef = erlang:monitor(process, Leader),
885+
receive
886+
{'DOWN', MRef, process, _, _} ->
887+
%% leader is down,
888+
%% force delete remaining members
889+
ok = force_delete_queue(lists:delete(Leader, Servers)),
890+
ok
891+
after Timeout ->
892+
erlang:demonitor(MRef, [flush]),
893+
ok = force_delete_queue(Servers)
894+
end,
895+
notify_decorators(QName, shutdown),
896+
case delete_queue_data(Q, ActingUser) of
897+
ok ->
898+
_ = erpc_call(LeaderNode, rabbit_core_metrics, queue_deleted, [QName],
899+
?RPC_TIMEOUT),
900+
{ok, ReadyMsgs};
901+
{error, timeout} = Err ->
902+
Err
903+
end;
904+
{error, {no_more_servers_to_try, Errs}} ->
905+
case lists:all(fun({{error, noproc}, _}) -> true;
906+
(_) -> false
907+
end, Errs) of
908+
true ->
909+
%% If all ra nodes were already down, the delete
910+
%% has succeed
911+
ok;
912+
false ->
913+
%% attempt forced deletion of all servers
914+
?LOG_WARNING(
915+
"Could not delete quorum '~ts', not enough nodes "
916+
" online to reach a quorum: ~255p."
917+
" Attempting force delete.",
918+
[rabbit_misc:rs(QName), Errs]),
919+
ok = force_delete_queue(Servers),
920+
notify_decorators(QName, shutdown)
921+
end,
922+
case delete_queue_data(Q, ActingUser) of
923+
ok ->
924+
{ok, ReadyMsgs};
925+
{error, timeout} = Err ->
926+
Err
927+
end
922928
end
923929
end.
924930

deps/rabbit/test/quorum_queue_SUITE.erl

Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4157,12 +4157,29 @@ delete_if_empty(Config) ->
41574157
QQ = ?config(queue_name, Config),
41584158
?assertEqual({'queue.declare_ok', QQ, 0, 0},
41594159
declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
4160+
4161+
%% Test 1: Delete fails when queue has messages (error 406 PRECONDITION_FAILED)
41604162
publish(Ch, QQ),
41614163
wait_for_messages(Config, [[QQ, <<"1">>, <<"1">>, <<"0">>]]),
4162-
%% Try to delete the quorum queue
4163-
?assertExit({{shutdown, {connection_closing, {server_initiated_close, 540, _}}}, _},
4164+
?assertExit({{shutdown, {connection_closing, {server_initiated_close, 406, _}}}, _},
41644165
amqp_channel:call(Ch, #'queue.delete'{queue = QQ,
4165-
if_empty = true})).
4166+
if_empty = true})),
4167+
4168+
%% Test 2: Delete succeeds when queue is empty
4169+
Ch2 = rabbit_ct_client_helpers:open_channel(Config, Server),
4170+
%% Consume the message to empty the queue
4171+
subscribe(Ch2, QQ, false),
4172+
receive
4173+
{#'basic.deliver'{delivery_tag = Tag}, _} ->
4174+
amqp_channel:cast(Ch2, #'basic.ack'{delivery_tag = Tag})
4175+
after 5000 ->
4176+
ct:fail("Timeout waiting for message")
4177+
end,
4178+
wait_for_messages(Config, [[QQ, <<"0">>, <<"0">>, <<"0">>]]),
4179+
%% Now delete should succeed
4180+
?assertMatch(#'queue.delete_ok'{},
4181+
amqp_channel:call(Ch2, #'queue.delete'{queue = QQ,
4182+
if_empty = true})).
41664183

41674184
delete_if_unused(Config) ->
41684185
Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),

0 commit comments

Comments
 (0)