diff --git a/deps/rabbit/src/rabbit_quorum_queue.erl b/deps/rabbit/src/rabbit_quorum_queue.erl index 205f42fe1ef..925d1d5eb57 100644 --- a/deps/rabbit/src/rabbit_quorum_queue.erl +++ b/deps/rabbit/src/rabbit_quorum_queue.erl @@ -857,68 +857,72 @@ restart_server({_, _} = Ref) -> boolean(), boolean(), rabbit_types:username()) -> {ok, QLen :: non_neg_integer()} | + rabbit_types:error('not_empty') | + rabbit_types:error('in_use') | {protocol_error, Type :: atom(), Reason :: string(), Args :: term()}. -delete(Q, true, _IfEmpty, _ActingUser) when ?amqqueue_is_quorum(Q) -> - {protocol_error, not_implemented, - "cannot delete ~ts. queue.delete operations with if-unused flag set are not supported by quorum queues", - [rabbit_misc:rs(amqqueue:get_name(Q))]}; -delete(Q, _IfUnused, true, _ActingUser) when ?amqqueue_is_quorum(Q) -> - {protocol_error, not_implemented, - "cannot delete ~ts. queue.delete operations with if-empty flag set are not supported by quorum queues", - [rabbit_misc:rs(amqqueue:get_name(Q))]}; -delete(Q, _IfUnused, _IfEmpty, ActingUser) when ?amqqueue_is_quorum(Q) -> +delete(Q, IfUnused, IfEmpty, ActingUser) when ?amqqueue_is_quorum(Q) -> + do_delete(Q, IfUnused, IfEmpty, ActingUser). + +do_delete(Q, IfUnused, IfEmpty, ActingUser) when ?amqqueue_is_quorum(Q) -> {Name, _} = amqqueue:get_pid(Q), QName = amqqueue:get_name(Q), QNodes = get_nodes(Q), - %% TODO Quorum queue needs to support consumer tracking for IfUnused Timeout = ?DELETE_TIMEOUT, - {ok, ReadyMsgs, _} = stat(Q), - Servers = [{Name, Node} || Node <- QNodes], - case ra:delete_cluster(Servers, Timeout) of - {ok, {_, LeaderNode} = Leader} -> - MRef = erlang:monitor(process, Leader), - receive - {'DOWN', MRef, process, _, _} -> - %% leader is down, - %% force delete remaining members - ok = force_delete_queue(lists:delete(Leader, Servers)), - ok - after Timeout -> - erlang:demonitor(MRef, [flush]), - ok = force_delete_queue(Servers) - end, - notify_decorators(QName, shutdown), - case delete_queue_data(Q, ActingUser) of - ok -> - _ = erpc_call(LeaderNode, rabbit_core_metrics, queue_deleted, [QName], - ?RPC_TIMEOUT), - {ok, ReadyMsgs}; - {error, timeout} = Err -> - Err - end; - {error, {no_more_servers_to_try, Errs}} -> - case lists:all(fun({{error, noproc}, _}) -> true; - (_) -> false - end, Errs) of - true -> - %% If all ra nodes were already down, the delete - %% has succeed - ok; - false -> - %% attempt forced deletion of all servers - ?LOG_WARNING( - "Could not delete quorum '~ts', not enough nodes " - " online to reach a quorum: ~255p." - " Attempting force delete.", - [rabbit_misc:rs(QName), Errs]), - ok = force_delete_queue(Servers), - notify_decorators(QName, shutdown) - end, - case delete_queue_data(Q, ActingUser) of - ok -> - {ok, ReadyMsgs}; - {error, timeout} = Err -> - Err + {ok, ReadyMsgs, ConsumerCount} = stat(Q), + IsEmpty = ReadyMsgs == 0, + IsUnused = ConsumerCount == 0, + %% Check preconditions, matching classic queue behavior + if + IfEmpty andalso not IsEmpty -> {error, not_empty}; + IfUnused andalso not IsUnused -> {error, in_use}; + true -> + Servers = [{Name, Node} || Node <- QNodes], + case ra:delete_cluster(Servers, Timeout) of + {ok, {_, LeaderNode} = Leader} -> + MRef = erlang:monitor(process, Leader), + receive + {'DOWN', MRef, process, _, _} -> + %% leader is down, + %% force delete remaining members + ok = force_delete_queue(lists:delete(Leader, Servers)), + ok + after Timeout -> + erlang:demonitor(MRef, [flush]), + ok = force_delete_queue(Servers) + end, + notify_decorators(QName, shutdown), + case delete_queue_data(Q, ActingUser) of + ok -> + _ = erpc_call(LeaderNode, rabbit_core_metrics, queue_deleted, [QName], + ?RPC_TIMEOUT), + {ok, ReadyMsgs}; + {error, timeout} = Err -> + Err + end; + {error, {no_more_servers_to_try, Errs}} -> + case lists:all(fun({{error, noproc}, _}) -> true; + (_) -> false + end, Errs) of + true -> + %% If all ra nodes were already down, the delete + %% has succeed + ok; + false -> + %% attempt forced deletion of all servers + ?LOG_WARNING( + "Could not delete quorum '~ts', not enough nodes " + " online to reach a quorum: ~255p." + " Attempting force delete.", + [rabbit_misc:rs(QName), Errs]), + ok = force_delete_queue(Servers), + notify_decorators(QName, shutdown) + end, + case delete_queue_data(Q, ActingUser) of + ok -> + {ok, ReadyMsgs}; + {error, timeout} = Err -> + Err + end end end. diff --git a/deps/rabbit/test/quorum_queue_SUITE.erl b/deps/rabbit/test/quorum_queue_SUITE.erl index 16ca4f76293..9e9ef23e12f 100644 --- a/deps/rabbit/test/quorum_queue_SUITE.erl +++ b/deps/rabbit/test/quorum_queue_SUITE.erl @@ -180,6 +180,7 @@ all_tests() -> pre_existing_invalid_policy, delete_if_empty, delete_if_unused, + delete_if_empty_and_unused, queue_ttl, peek, oldest_entry_timestamp, @@ -4157,12 +4158,31 @@ delete_if_empty(Config) -> QQ = ?config(queue_name, Config), ?assertEqual({'queue.declare_ok', QQ, 0, 0}, declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), + + %% Test 1: Delete fails when queue has messages (error 406 PRECONDITION_FAILED) publish(Ch, QQ), wait_for_messages(Config, [[QQ, <<"1">>, <<"1">>, <<"0">>]]), - %% Try to delete the quorum queue - ?assertExit({{shutdown, {connection_closing, {server_initiated_close, 540, _}}}, _}, + ?assertExit({{shutdown, {server_initiated_close, 406, _}}, _}, amqp_channel:call(Ch, #'queue.delete'{queue = QQ, - if_empty = true})). + if_empty = true})), + + %% Test 2: Delete succeeds when queue is empty + Ch2 = rabbit_ct_client_helpers:open_channel(Config, Server), + %% Consume the message to empty the queue + subscribe(Ch2, QQ, false), + receive + {#'basic.deliver'{delivery_tag = Tag}, _} -> + amqp_channel:cast(Ch2, #'basic.ack'{delivery_tag = Tag}) + after 5000 -> + ct:fail("Timeout waiting for message") + end, + wait_for_messages(Config, [[QQ, <<"0">>, <<"0">>, <<"0">>]]), + %% Cancel the subscription before trying to delete + amqp_channel:call(Ch2, #'basic.cancel'{consumer_tag = <<"ctag">>}), + %% Now delete should succeed + ?assertMatch(#'queue.delete_ok'{}, + amqp_channel:call(Ch2, #'queue.delete'{queue = QQ, + if_empty = true})). delete_if_unused(Config) -> Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename), @@ -4171,12 +4191,65 @@ delete_if_unused(Config) -> QQ = ?config(queue_name, Config), ?assertEqual({'queue.declare_ok', QQ, 0, 0}, declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), + + %% Test 1: Delete fails when queue has active consumer (error 406 PRECONDITION_FAILED) + subscribe(Ch, QQ, false), + ?assertExit({{shutdown, {server_initiated_close, 406, _}}, _}, + amqp_channel:call(Ch, #'queue.delete'{queue = QQ, + if_unused = true})), + + %% Test 2: Delete succeeds when queue has no consumers + Ch2 = rabbit_ct_client_helpers:open_channel(Config, Server), + %% Publish a message (queue can have messages, just no consumers) + publish(Ch2, QQ), + wait_for_messages(Config, [[QQ, <<"1">>, <<"1">>, <<"0">>]]), + %% Delete with if_unused should succeed since there are no consumers + ?assertMatch(#'queue.delete_ok'{message_count = 1}, + amqp_channel:call(Ch2, #'queue.delete'{queue = QQ, + if_unused = true})). +delete_if_empty_and_unused(Config) -> + Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename), + + Ch = rabbit_ct_client_helpers:open_channel(Config, Server), + QQ = <<"test-queue-both-flags">>, + ?assertEqual({'queue.declare_ok', QQ, 0, 0}, + declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), + + %% Test 1: Delete fails when queue has messages (if_empty check fails first) publish(Ch, QQ), wait_for_messages(Config, [[QQ, <<"1">>, <<"1">>, <<"0">>]]), - %% Try to delete the quorum queue - ?assertExit({{shutdown, {connection_closing, {server_initiated_close, 540, _}}}, _}, + ?assertExit({{shutdown, {server_initiated_close, 406, _}}, _}, amqp_channel:call(Ch, #'queue.delete'{queue = QQ, - if_unused = true})). + if_empty = true, + if_unused = true})), + + %% Test 2: Delete fails when queue has consumers (if_unused check fails) + Ch2 = rabbit_ct_client_helpers:open_channel(Config, Server), + %% Consume the message to empty the queue + subscribe(Ch2, QQ, false), + receive + {#'basic.deliver'{delivery_tag = Tag2}, _} -> + amqp_channel:cast(Ch2, #'basic.ack'{delivery_tag = Tag2}) + after 5000 -> + ct:fail("Timeout waiting for message in test 3b") + end, + wait_for_messages(Config, [[QQ, <<"0">>, <<"0">>, <<"0">>]]), + %% Queue is empty but has a consumer, should fail with in_use + ?assertExit({{shutdown, {server_initiated_close, 406, _}}, _}, + amqp_channel:call(Ch2, #'queue.delete'{queue = QQ, + if_empty = true, + if_unused = true})), + + %% Test 3: Delete succeeds when queue is empty and has no consumers + Ch3 = rabbit_ct_client_helpers:open_channel(Config, Server), + QQ2 = <<"test-queue-both-flags-ok">>, + ?assertEqual({'queue.declare_ok', QQ2, 0, 0}, + declare(Ch3, QQ2, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), + %% Now delete should succeed (no consumers, no messages) + ?assertMatch(#'queue.delete_ok'{message_count = 0}, + amqp_channel:call(Ch3, #'queue.delete'{queue = QQ2, + if_empty = true, + if_unused = true})). queue_ttl(Config) -> Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),