Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
118 changes: 61 additions & 57 deletions deps/rabbit/src/rabbit_quorum_queue.erl
Original file line number Diff line number Diff line change
Expand Up @@ -857,68 +857,72 @@ restart_server({_, _} = Ref) ->
boolean(), boolean(),
rabbit_types:username()) ->
{ok, QLen :: non_neg_integer()} |
rabbit_types:error('not_empty') |
rabbit_types:error('in_use') |
{protocol_error, Type :: atom(), Reason :: string(), Args :: term()}.
delete(Q, true, _IfEmpty, _ActingUser) when ?amqqueue_is_quorum(Q) ->
{protocol_error, not_implemented,
"cannot delete ~ts. queue.delete operations with if-unused flag set are not supported by quorum queues",
[rabbit_misc:rs(amqqueue:get_name(Q))]};
delete(Q, _IfUnused, true, _ActingUser) when ?amqqueue_is_quorum(Q) ->
{protocol_error, not_implemented,
"cannot delete ~ts. queue.delete operations with if-empty flag set are not supported by quorum queues",
[rabbit_misc:rs(amqqueue:get_name(Q))]};
delete(Q, _IfUnused, _IfEmpty, ActingUser) when ?amqqueue_is_quorum(Q) ->
delete(Q, IfUnused, IfEmpty, ActingUser) when ?amqqueue_is_quorum(Q) ->
do_delete(Q, IfUnused, IfEmpty, ActingUser).

do_delete(Q, IfUnused, IfEmpty, ActingUser) when ?amqqueue_is_quorum(Q) ->
{Name, _} = amqqueue:get_pid(Q),
QName = amqqueue:get_name(Q),
QNodes = get_nodes(Q),
%% TODO Quorum queue needs to support consumer tracking for IfUnused
Timeout = ?DELETE_TIMEOUT,
{ok, ReadyMsgs, _} = stat(Q),
Servers = [{Name, Node} || Node <- QNodes],
case ra:delete_cluster(Servers, Timeout) of
{ok, {_, LeaderNode} = Leader} ->
MRef = erlang:monitor(process, Leader),
receive
{'DOWN', MRef, process, _, _} ->
%% leader is down,
%% force delete remaining members
ok = force_delete_queue(lists:delete(Leader, Servers)),
ok
after Timeout ->
erlang:demonitor(MRef, [flush]),
ok = force_delete_queue(Servers)
end,
notify_decorators(QName, shutdown),
case delete_queue_data(Q, ActingUser) of
ok ->
_ = erpc_call(LeaderNode, rabbit_core_metrics, queue_deleted, [QName],
?RPC_TIMEOUT),
{ok, ReadyMsgs};
{error, timeout} = Err ->
Err
end;
{error, {no_more_servers_to_try, Errs}} ->
case lists:all(fun({{error, noproc}, _}) -> true;
(_) -> false
end, Errs) of
true ->
%% If all ra nodes were already down, the delete
%% has succeed
ok;
false ->
%% attempt forced deletion of all servers
?LOG_WARNING(
"Could not delete quorum '~ts', not enough nodes "
" online to reach a quorum: ~255p."
" Attempting force delete.",
[rabbit_misc:rs(QName), Errs]),
ok = force_delete_queue(Servers),
notify_decorators(QName, shutdown)
end,
case delete_queue_data(Q, ActingUser) of
ok ->
{ok, ReadyMsgs};
{error, timeout} = Err ->
Err
{ok, ReadyMsgs, ConsumerCount} = stat(Q),
IsEmpty = ReadyMsgs == 0,
IsUnused = ConsumerCount == 0,
%% Check preconditions, matching classic queue behavior
if
IfEmpty andalso not IsEmpty -> {error, not_empty};
IfUnused andalso not IsUnused -> {error, in_use};
true ->
Servers = [{Name, Node} || Node <- QNodes],
case ra:delete_cluster(Servers, Timeout) of
{ok, {_, LeaderNode} = Leader} ->
MRef = erlang:monitor(process, Leader),
receive
{'DOWN', MRef, process, _, _} ->
%% leader is down,
%% force delete remaining members
ok = force_delete_queue(lists:delete(Leader, Servers)),
ok
after Timeout ->
erlang:demonitor(MRef, [flush]),
ok = force_delete_queue(Servers)
end,
notify_decorators(QName, shutdown),
case delete_queue_data(Q, ActingUser) of
ok ->
_ = erpc_call(LeaderNode, rabbit_core_metrics, queue_deleted, [QName],
?RPC_TIMEOUT),
{ok, ReadyMsgs};
{error, timeout} = Err ->
Err
end;
{error, {no_more_servers_to_try, Errs}} ->
case lists:all(fun({{error, noproc}, _}) -> true;
(_) -> false
end, Errs) of
true ->
%% If all ra nodes were already down, the delete
%% has succeed
ok;
false ->
%% attempt forced deletion of all servers
?LOG_WARNING(
"Could not delete quorum '~ts', not enough nodes "
" online to reach a quorum: ~255p."
" Attempting force delete.",
[rabbit_misc:rs(QName), Errs]),
ok = force_delete_queue(Servers),
notify_decorators(QName, shutdown)
end,
case delete_queue_data(Q, ActingUser) of
ok ->
{ok, ReadyMsgs};
{error, timeout} = Err ->
Err
end
end
end.

Expand Down
85 changes: 79 additions & 6 deletions deps/rabbit/test/quorum_queue_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,7 @@ all_tests() ->
pre_existing_invalid_policy,
delete_if_empty,
delete_if_unused,
delete_if_empty_and_unused,
queue_ttl,
peek,
oldest_entry_timestamp,
Expand Down Expand Up @@ -4157,12 +4158,31 @@ delete_if_empty(Config) ->
QQ = ?config(queue_name, Config),
?assertEqual({'queue.declare_ok', QQ, 0, 0},
declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),

%% Test 1: Delete fails when queue has messages (error 406 PRECONDITION_FAILED)
publish(Ch, QQ),
wait_for_messages(Config, [[QQ, <<"1">>, <<"1">>, <<"0">>]]),
%% Try to delete the quorum queue
?assertExit({{shutdown, {connection_closing, {server_initiated_close, 540, _}}}, _},
?assertExit({{shutdown, {server_initiated_close, 406, _}}, _},
amqp_channel:call(Ch, #'queue.delete'{queue = QQ,
if_empty = true})).
if_empty = true})),

%% Test 2: Delete succeeds when queue is empty
Ch2 = rabbit_ct_client_helpers:open_channel(Config, Server),
%% Consume the message to empty the queue
subscribe(Ch2, QQ, false),
receive
{#'basic.deliver'{delivery_tag = Tag}, _} ->
amqp_channel:cast(Ch2, #'basic.ack'{delivery_tag = Tag})
after 5000 ->
ct:fail("Timeout waiting for message")
end,
wait_for_messages(Config, [[QQ, <<"0">>, <<"0">>, <<"0">>]]),
%% Cancel the subscription before trying to delete
amqp_channel:call(Ch2, #'basic.cancel'{consumer_tag = <<"ctag">>}),
%% Now delete should succeed
?assertMatch(#'queue.delete_ok'{},
amqp_channel:call(Ch2, #'queue.delete'{queue = QQ,
if_empty = true})).

delete_if_unused(Config) ->
Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
Expand All @@ -4171,12 +4191,65 @@ delete_if_unused(Config) ->
QQ = ?config(queue_name, Config),
?assertEqual({'queue.declare_ok', QQ, 0, 0},
declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),

%% Test 1: Delete fails when queue has active consumer (error 406 PRECONDITION_FAILED)
subscribe(Ch, QQ, false),
?assertExit({{shutdown, {server_initiated_close, 406, _}}, _},
amqp_channel:call(Ch, #'queue.delete'{queue = QQ,
if_unused = true})),

%% Test 2: Delete succeeds when queue has no consumers
Ch2 = rabbit_ct_client_helpers:open_channel(Config, Server),
%% Publish a message (queue can have messages, just no consumers)
publish(Ch2, QQ),
wait_for_messages(Config, [[QQ, <<"1">>, <<"1">>, <<"0">>]]),
%% Delete with if_unused should succeed since there are no consumers
?assertMatch(#'queue.delete_ok'{message_count = 1},
amqp_channel:call(Ch2, #'queue.delete'{queue = QQ,
if_unused = true})).
delete_if_empty_and_unused(Config) ->
Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),

Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
QQ = <<"test-queue-both-flags">>,
?assertEqual({'queue.declare_ok', QQ, 0, 0},
declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),

%% Test 1: Delete fails when queue has messages (if_empty check fails first)
publish(Ch, QQ),
wait_for_messages(Config, [[QQ, <<"1">>, <<"1">>, <<"0">>]]),
%% Try to delete the quorum queue
?assertExit({{shutdown, {connection_closing, {server_initiated_close, 540, _}}}, _},
?assertExit({{shutdown, {server_initiated_close, 406, _}}, _},
amqp_channel:call(Ch, #'queue.delete'{queue = QQ,
if_unused = true})).
if_empty = true,
if_unused = true})),

%% Test 2: Delete fails when queue has consumers (if_unused check fails)
Ch2 = rabbit_ct_client_helpers:open_channel(Config, Server),
%% Consume the message to empty the queue
subscribe(Ch2, QQ, false),
receive
{#'basic.deliver'{delivery_tag = Tag2}, _} ->
amqp_channel:cast(Ch2, #'basic.ack'{delivery_tag = Tag2})
after 5000 ->
ct:fail("Timeout waiting for message in test 3b")
end,
wait_for_messages(Config, [[QQ, <<"0">>, <<"0">>, <<"0">>]]),
%% Queue is empty but has a consumer, should fail with in_use
?assertExit({{shutdown, {server_initiated_close, 406, _}}, _},
amqp_channel:call(Ch2, #'queue.delete'{queue = QQ,
if_empty = true,
if_unused = true})),

%% Test 3: Delete succeeds when queue is empty and has no consumers
Ch3 = rabbit_ct_client_helpers:open_channel(Config, Server),
QQ2 = <<"test-queue-both-flags-ok">>,
?assertEqual({'queue.declare_ok', QQ2, 0, 0},
declare(Ch3, QQ2, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
%% Now delete should succeed (no consumers, no messages)
?assertMatch(#'queue.delete_ok'{message_count = 0},
amqp_channel:call(Ch3, #'queue.delete'{queue = QQ2,
if_empty = true,
if_unused = true})).

queue_ttl(Config) ->
Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
Expand Down
Loading