diff --git a/deps/rabbit/src/rabbit_amqqueue.erl b/deps/rabbit/src/rabbit_amqqueue.erl index 2ef86b0203da..411d688aa854 100644 --- a/deps/rabbit/src/rabbit_amqqueue.erl +++ b/deps/rabbit/src/rabbit_amqqueue.erl @@ -76,6 +76,9 @@ -export([internal_declare/2, internal_delete/2, run_backing_queue/3, emit_consumers_local/3, internal_delete/3]). +%% Deprecated feature callback. +-export([are_transient_nonexcl_used/1]). + -include_lib("rabbit_common/include/rabbit.hrl"). -include_lib("stdlib/include/qlc.hrl"). -include("amqqueue.hrl"). @@ -110,9 +113,19 @@ -rabbit_deprecated_feature( {transient_nonexcl_queues, #{deprecation_phase => permitted_by_default, - doc_url => "https://blog.rabbitmq.com/posts/2021/08/4.0-deprecation-announcements/#removal-of-transient-non-exclusive-queues" + doc_url => "https://blog.rabbitmq.com/posts/2021/08/4.0-deprecation-announcements/#removal-of-transient-non-exclusive-queues", + callbacks => #{is_feature_used => {?MODULE, are_transient_nonexcl_used}} }}). +are_transient_nonexcl_used(_) -> + case rabbit_db_queue:list_transient() of + {ok, Queues} -> + NonExclQueues = [Q || Q <- Queues, not is_exclusive(Q)], + length(NonExclQueues) > 0; + {error, _} -> + undefined + end. + -define(CONSUMER_INFO_KEYS, [queue_name, channel_pid, consumer_tag, ack_required, prefetch_count, active, activity_status, arguments]). diff --git a/deps/rabbit/src/rabbit_db_queue.erl b/deps/rabbit/src/rabbit_db_queue.erl index 30251f4d5598..1e64a7f78c08 100644 --- a/deps/rabbit/src/rabbit_db_queue.erl +++ b/deps/rabbit/src/rabbit_db_queue.erl @@ -46,7 +46,8 @@ %% Used by on_node_up and on_node_down. %% Can be deleted once transient entities/mnesia are removed. --export([foreach_transient/1, +-export([list_transient/0, + foreach_transient/1, delete_transient/1]). %% Only used by rabbit_amqqueue:forget_node_for_queue, which is only called @@ -965,6 +966,40 @@ set_in_khepri(Q) -> Path = khepri_queue_path(amqqueue:get_name(Q)), rabbit_khepri:put(Path, Q). +%% ------------------------------------------------------------------- +%% list_transient(). +%% ------------------------------------------------------------------- + +-spec list_transient() -> {ok, Queues} | {error, any()} when + Queues :: [amqqueue:amqqueue()]. +%% @doc Applies `UpdateFun' to all transient queue records. +%% +%% @private + +list_transient() -> + rabbit_khepri:handle_fallback( + #{mnesia => fun() -> list_transient_in_mnesia() end, + khepri => fun() -> list_transient_in_khepri() end + }). + +list_transient_in_mnesia() -> + Pattern = amqqueue:pattern_match_all(), + AllQueues = mnesia:dirty_match_object( + ?MNESIA_TABLE, + Pattern), + {ok, AllQueues}. + +list_transient_in_khepri() -> + try + List = ets:match_object( + ?KHEPRI_PROJECTION, + amqqueue:pattern_match_on_durable(false)), + {ok, List} + catch + error:badarg -> + {error, {khepri_projection_missing, ?KHEPRI_WILDCARD_STAR}} + end. + %% ------------------------------------------------------------------- %% delete_transient(). %% -------------------------------------------------------------------