diff --git a/deps/rabbit/include/amqqueue.hrl b/deps/rabbit/include/amqqueue.hrl
index 30c3917d48a9..8e0aaa3f578b 100644
--- a/deps/rabbit/include/amqqueue.hrl
+++ b/deps/rabbit/include/amqqueue.hrl
@@ -48,6 +48,10 @@
(?is_amqqueue_v2(Q) andalso
?amqqueue_v2_field_type(Q) =:= Type)).
+-define(amqqueue_type(Q),
+ (?is_amqqueue_v2(Q) andalso
+ ?amqqueue_v2_field_type(Q))).
+
-define(amqqueue_has_valid_pid(Q),
(?is_amqqueue_v2(Q) andalso
is_pid(?amqqueue_v2_field_pid(Q)))).
diff --git a/deps/rabbit/src/amqqueue.erl b/deps/rabbit/src/amqqueue.erl
index 2d416582ceb6..d3d878231fce 100644
--- a/deps/rabbit/src/amqqueue.erl
+++ b/deps/rabbit/src/amqqueue.erl
@@ -61,6 +61,10 @@
is_exclusive/1,
is_classic/1,
is_quorum/1,
+ is_internal/1,
+ internal_owner/1,
+ make_internal/1,
+ make_internal/2,
pattern_match_all/0,
pattern_match_on_name/1,
pattern_match_on_type/1,
@@ -76,6 +80,8 @@
-define(is_backwards_compat_classic(T),
(T =:= classic orelse T =:= ?amqqueue_v1_type)).
+-type amqqueue_options() :: map() | ets:match_pattern().
+
-record(amqqueue, {
%% immutable
name :: rabbit_amqqueue:name() | ets:match_pattern(),
@@ -106,7 +112,7 @@
slave_pids_pending_shutdown = [], %% reserved
%% secondary index
vhost :: rabbit_types:vhost() | undefined | ets:match_pattern(),
- options = #{} :: map() | ets:match_pattern(),
+ options = #{} :: amqqueue_options(),
type = ?amqqueue_v1_type :: module() | ets:match_pattern(),
type_state = #{} :: map() | ets:match_pattern()
}).
@@ -349,6 +355,19 @@ get_arguments(#amqqueue{arguments = Args}) ->
set_arguments(#amqqueue{} = Queue, Args) ->
Queue#amqqueue{arguments = Args}.
+% options
+
+-spec get_options(amqqueue()) -> amqqueue_options().
+
+get_options(#amqqueue{options = Options}) ->
+ Options.
+
+-spec set_options(amqqueue(), amqqueue_options()) -> amqqueue().
+
+set_options(#amqqueue{} = Queue, Options) ->
+ Queue#amqqueue{options = Options}.
+
+
% decorators
-spec get_decorators(amqqueue()) -> [atom()] | none | undefined.
@@ -368,6 +387,7 @@ get_exclusive_owner(#amqqueue{exclusive_owner = Owner}) ->
-spec get_leader(amqqueue_v2()) -> node().
+%% TODO: not only qqs can have leaders, dispatch via queue type
get_leader(#amqqueue{type = rabbit_quorum_queue, pid = {_, Leader}}) -> Leader.
% operator_policy
@@ -393,15 +413,6 @@ get_name(#amqqueue{name = Name}) -> Name.
set_name(#amqqueue{} = Queue, Name) ->
Queue#amqqueue{name = Name}.
--spec get_options(amqqueue()) -> map().
-
-get_options(#amqqueue{options = Options}) -> Options.
-
--spec set_options(amqqueue(), map()) -> amqqueue().
-
-set_options(#amqqueue{} = Queue, Options) ->
- Queue#amqqueue{options = Options}.
-
% pid
-spec get_pid(amqqueue_v2()) -> pid() | ra_server_id() | none.
@@ -495,6 +506,27 @@ is_classic(Queue) ->
is_quorum(Queue) ->
get_type(Queue) =:= rabbit_quorum_queue.
+-spec is_internal(amqqueue()) -> boolean().
+
+is_internal(#amqqueue{options = #{internal := true}}) -> true;
+is_internal(#amqqueue{}) -> false.
+
+-spec internal_owner(amqqueue()) -> #resource{}.
+
+internal_owner(#amqqueue{options = #{internal := true,
+ internal_owner := IOwner}}) ->
+ IOwner;
+internal_owner(#amqqueue{}) ->
+ undefined.
+
+make_internal(Q = #amqqueue{options = Options}) when is_map(Options) ->
+ Q#amqqueue{options = maps:merge(Options, #{internal => true,
+ internal_owner => undefined})}.
+make_internal(Q = #amqqueue{options = Options}, Owner)
+ when is_map(Options) andalso is_record(Owner, resource) ->
+ Q#amqqueue{options = maps:merge(Options, #{internal => true,
+ interna_owner => Owner})}.
+
fields() ->
fields(?record_version).
diff --git a/deps/rabbit/src/rabbit_amqp_management.erl b/deps/rabbit/src/rabbit_amqp_management.erl
index 65e9603495d0..b12b3544bc0d 100644
--- a/deps/rabbit/src/rabbit_amqp_management.erl
+++ b/deps/rabbit/src/rabbit_amqp_management.erl
@@ -461,32 +461,8 @@ encode_queue(Q, NumMsgs, NumConsumers) ->
-spec queue_topology(amqqueue:amqqueue()) ->
{Leader :: undefined | node(), Replicas :: undefined | [node(),...]}.
queue_topology(Q) ->
- case amqqueue:get_type(Q) of
- rabbit_quorum_queue ->
- [{leader, Leader0},
- {members, Members}] = rabbit_queue_type:info(Q, [leader, members]),
- Leader = case Leader0 of
- '' -> undefined;
- _ -> Leader0
- end,
- {Leader, Members};
- rabbit_stream_queue ->
- #{name := StreamId} = amqqueue:get_type_state(Q),
- case rabbit_stream_coordinator:members(StreamId) of
- {ok, Members} ->
- maps:fold(fun(Node, {_Pid, writer}, {_, Replicas}) ->
- {Node, [Node | Replicas]};
- (Node, {_Pid, replica}, {Writer, Replicas}) ->
- {Writer, [Node | Replicas]}
- end, {undefined, []}, Members);
- {error, _} ->
- {undefined, undefined}
- end;
- _ ->
- Pid = amqqueue:get_pid(Q),
- Node = node(Pid),
- {Node, [Node]}
- end.
+ Type = amqqueue:get_type(Q),
+ Type:queue_topology(Q).
decode_exchange({map, KVList}) ->
M = lists:foldl(
diff --git a/deps/rabbit/src/rabbit_amqqueue.erl b/deps/rabbit/src/rabbit_amqqueue.erl
index effecec8954b..b905d074ef45 100644
--- a/deps/rabbit/src/rabbit_amqqueue.erl
+++ b/deps/rabbit/src/rabbit_amqqueue.erl
@@ -150,11 +150,7 @@ filter_pid_per_type(QPids) ->
-spec stop(rabbit_types:vhost()) -> 'ok'.
stop(VHost) ->
- %% Classic queues
- ok = rabbit_amqqueue_sup_sup:stop_for_vhost(VHost),
- {ok, BQ} = application:get_env(rabbit, backing_queue_module),
- ok = BQ:stop(VHost),
- rabbit_quorum_queue:stop(VHost).
+ rabbit_queue_type:stop(VHost).
-spec start([amqqueue:amqqueue()]) -> 'ok'.
@@ -424,6 +420,8 @@ rebalance(Type, VhostSpec, QueueSpec) ->
%% We have not yet acquired the rebalance_queues global lock.
maybe_rebalance(get_rebalance_lock(self()), Type, VhostSpec, QueueSpec).
+%% TODO: classic queues do not support rebalancing, it looks like they are simply
+%% filtered out with is_replicated(Q). Maybe error instead?
maybe_rebalance({true, Id}, Type, VhostSpec, QueueSpec) ->
rabbit_log:info("Starting queue rebalance operation: '~ts' for vhosts matching '~ts' and queues matching '~ts'",
[Type, VhostSpec, QueueSpec]),
@@ -459,10 +457,15 @@ filter_per_type(stream, Q) ->
filter_per_type(classic, Q) ->
?amqqueue_is_classic(Q).
-rebalance_module(Q) when ?amqqueue_is_quorum(Q) ->
- rabbit_quorum_queue;
-rebalance_module(Q) when ?amqqueue_is_stream(Q) ->
- rabbit_stream_queue.
+%% TODO: note that it can return {error, not_supported}.
+%% this will result in a badmatch. However that's fine
+%% for now because the original function will fail with
+%% bad clause if called with classical queue.
+%% The assumption is all non-replicated queues
+%% are filtered before calling this with is_replicated/0
+rebalance_module(Q) ->
+ TypeModule = ?amqqueue_type(Q),
+ TypeModule:rebalance_module().
get_resource_name(#resource{name = Name}) ->
Name.
@@ -487,13 +490,19 @@ iterative_rebalance(ByNode, MaxQueuesDesired) ->
maybe_migrate(ByNode, MaxQueuesDesired) ->
maybe_migrate(ByNode, MaxQueuesDesired, maps:keys(ByNode)).
+%% TODO: unfortunate part - UI bits mixed deep inside logic.
+%% I will not be moving this inside queue type. Instead
+%% an attempt to generate something more readable than
+%% Other made.
column_name(rabbit_classic_queue) -> <<"Number of replicated classic queues">>;
column_name(rabbit_quorum_queue) -> <<"Number of quorum queues">>;
column_name(rabbit_stream_queue) -> <<"Number of streams">>;
-column_name(Other) -> Other.
+column_name(TypeModule) ->
+ Alias = rabbit_queue_type:short_alias_of(TypeModule),
+ <<"Number of \"", Alias/binary, "\" queues">>.
maybe_migrate(ByNode, _, []) ->
- ByNodeAndType = maps:map(fun(_Node, Queues) -> maps:groups_from_list(fun({_, Q, _}) -> column_name(?amqqueue_v2_field_type(Q)) end, Queues) end, ByNode),
+ ByNodeAndType = maps:map(fun(_Node, Queues) -> maps:groups_from_list(fun({_, Q, _}) -> column_name(?amqqueue_type(Q)) end, Queues) end, ByNode),
CountByNodeAndType = maps:map(fun(_Node, Type) -> maps:map(fun (_, Qs)-> length(Qs) end, Type) end, ByNodeAndType),
{ok, maps:values(maps:map(fun(Node,Counts) -> [{<<"Node name">>, Node} | maps:to_list(Counts)] end, CountByNodeAndType))};
maybe_migrate(ByNode, MaxQueuesDesired, [N | Nodes]) ->
@@ -811,6 +820,35 @@ check_exclusive_access(Q, _ReaderPid, _MatchType) ->
"match that of the original declaration.",
[rabbit_misc:rs(QueueName)]).
+-spec check_internal(amqqueue:amqqueue(), rabbit_types:username()) ->
+ 'ok' | rabbit_types:channel_exit().
+check_internal(Q, Username) ->
+ case amqqueue:is_internal(Q) of
+ true ->
+ case Username of
+ %% note cli delete command uses "cli_user"
+ ?INTERNAL_USER ->
+ ok;
+ _ ->
+ QueueName = amqqueue:get_name(Q),
+ case amqqueue:internal_owner(Q) of
+ undefined ->
+ rabbit_misc:protocol_error(
+ resource_locked,
+ "Cannot delete protected ~ts.",
+ [rabbit_misc:rs(QueueName)]);
+ IOwner ->
+ rabbit_misc:protocol_error(
+ resource_locked,
+ "Cannot delete protected ~ts. It was "
+ "declared as an protected and can be deleted only by deleting the owner entity: ~ts",
+ [rabbit_misc:rs(QueueName), rabbit_misc:rs(IOwner)])
+ end
+ end;
+ false ->
+ ok
+ end.
+
-spec with_exclusive_access_or_die(name(), pid(), qfun(A)) ->
A | rabbit_types:channel_exit().
with_exclusive_access_or_die(Name, ReaderPid, F) ->
@@ -1252,14 +1290,12 @@ list_durable() ->
-spec list_by_type(atom()) -> [amqqueue:amqqueue()].
-list_by_type(classic) -> list_by_type(rabbit_classic_queue);
-list_by_type(quorum) -> list_by_type(rabbit_quorum_queue);
-list_by_type(stream) -> list_by_type(rabbit_stream_queue);
-list_by_type(Type) ->
- rabbit_db_queue:get_all_durable_by_type(Type).
+list_by_type(TypeDescriptor) ->
+ TypeModule = rabbit_queue_type:discover(TypeDescriptor),
+ rabbit_db_queue:get_all_durable_by_type(TypeModule).
+%% TODO: looks unused
-spec list_local_quorum_queue_names() -> [name()].
-
list_local_quorum_queue_names() ->
[ amqqueue:get_name(Q) || Q <- list_by_type(quorum),
amqqueue:get_state(Q) =/= crashed,
@@ -1296,6 +1332,7 @@ list_local_followers() ->
rabbit_quorum_queue:is_recoverable(Q)
].
+%% TODO: looks unused
-spec list_local_quorum_queues_with_name_matching(binary()) -> [amqqueue:amqqueue()].
list_local_quorum_queues_with_name_matching(Pattern) ->
[ Q || Q <- list_by_type(quorum),
@@ -1681,6 +1718,7 @@ delete_with(QueueName, ConnPid, IfUnused, IfEmpty, Username, CheckExclusive) whe
case with(
QueueName,
fun (Q) ->
+ ok = check_internal(Q, Username),
if CheckExclusive ->
check_exclusive_access(Q, ConnPid);
true ->
@@ -1882,11 +1920,9 @@ run_backing_queue(QPid, Mod, Fun) ->
-spec is_replicated(amqqueue:amqqueue()) -> boolean().
-is_replicated(Q) when ?amqqueue_is_classic(Q) ->
- false;
-is_replicated(_Q) ->
- %% streams and quorum queues are all replicated
- true.
+is_replicated(Q) ->
+ TypeModule = ?amqqueue_type(Q),
+ TypeModule:is_replicated().
is_exclusive(Q) when ?amqqueue_exclusive_owner_is(Q, none) ->
false;
diff --git a/deps/rabbit/src/rabbit_boot_steps.erl b/deps/rabbit/src/rabbit_boot_steps.erl
index 701dbcea3a30..e4116ffa886e 100644
--- a/deps/rabbit/src/rabbit_boot_steps.erl
+++ b/deps/rabbit/src/rabbit_boot_steps.erl
@@ -4,6 +4,8 @@
%%
%% Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.
%%
+%% README: https://github.com/rabbitmq/internals/blob/master/rabbit_boot_process.md
+%%
-module(rabbit_boot_steps).
diff --git a/deps/rabbit/src/rabbit_channel.erl b/deps/rabbit/src/rabbit_channel.erl
index 28eef707dc65..be830fee2646 100644
--- a/deps/rabbit/src/rabbit_channel.erl
+++ b/deps/rabbit/src/rabbit_channel.erl
@@ -1265,11 +1265,7 @@ handle_method(#'basic.get'{queue = QueueNameBin, no_ack = NoAck},
?INCR_STATS(queue_stats, QueueName, 1, get_empty, State),
{reply, #'basic.get_empty'{}, State#ch{queue_states = QueueStates}};
{error, {unsupported, single_active_consumer}} ->
- rabbit_misc:protocol_error(
- resource_locked,
- "cannot obtain access to locked ~ts. basic.get operations "
- "are not supported by quorum queues with single active consumer",
- [rabbit_misc:rs(QueueName)]);
+ rabbit_amqqueue:with_or_die(QueueName, fun unsupported_single_active_consumer_error/1);
{error, Reason} ->
%% TODO add queue type to error message
rabbit_misc:protocol_error(internal_error,
@@ -2005,6 +2001,7 @@ foreach_per_queue(_F, [], Acc) ->
foreach_per_queue(F, [#pending_ack{tag = CTag,
queue = QName,
msg_id = MsgId}], Acc) ->
+ %% TODO: fix this abstraction leak
%% quorum queue, needs the consumer tag
F({QName, CTag}, [MsgId], Acc);
foreach_per_queue(F, UAL, Acc) ->
@@ -2032,6 +2029,7 @@ notify_limiter(Limiter, Acked) ->
case rabbit_limiter:is_active(Limiter) of
false -> ok;
true -> case lists:foldl(fun (#pending_ack{tag = CTag}, Acc) when is_integer(CTag) ->
+ %% TODO: fix absctraction leak
%% Quorum queues use integer CTags
%% classic queues use binaries
%% Quorum queues do not interact
@@ -2792,3 +2790,12 @@ maybe_decrease_global_publishers(#ch{publishing_mode = true}) ->
is_global_qos_permitted() ->
rabbit_deprecated_features:is_permitted(global_qos).
+
+-spec unsupported_single_active_consumer_error(amqqueue:amqqueue()) -> no_return().
+unsupported_single_active_consumer_error(Q) ->
+ rabbit_misc:protocol_error(
+ resource_locked,
+ "cannot obtain access to locked ~ts. basic.get operations "
+ "are not supported by ~p queues with single active consumer",
+ [rabbit_misc:rs(amqqueue:get_name(Q)),
+ rabbit_queue_type:short_alias_of(amqqueue:get_type(Q))]).
diff --git a/deps/rabbit/src/rabbit_classic_queue.erl b/deps/rabbit/src/rabbit_classic_queue.erl
index 0f92f863bf6f..2ec469bf5a1c 100644
--- a/deps/rabbit/src/rabbit_classic_queue.erl
+++ b/deps/rabbit/src/rabbit_classic_queue.erl
@@ -64,8 +64,32 @@
send_drained_credit_api_v1/4,
send_credit_reply/7]).
+-export([queue_topology/1,
+ feature_flag_name/0,
+ policy_apply_to_name/0,
+ can_redeliver/0,
+ stop/1,
+ is_replicated/0,
+ rebalance_module/0,
+ list_with_minimum_quorum/0,
+ drain/1,
+ revive/0,
+ queue_vm_stats_sups/0,
+ queue_vm_ets/0,
+ dir_base/0]).
+
-export([validate_policy/1]).
+-rabbit_boot_step(
+ {rabbit_classic_queue_type,
+ [{description, "Classic queue: queue type"},
+ {mfa, {rabbit_registry, register,
+ [queue, <<"classic">>, ?MODULE]}},
+ {cleanup, {rabbit_registry, unregister,
+ [queue, <<"classic">>]}},
+ {requires, rabbit_registry},
+ {enables, ?MODULE}]}).
+
-rabbit_boot_step(
{?MODULE,
[{description, "Deprecated queue-master-locator support."
@@ -74,7 +98,7 @@
[policy_validator, <<"queue-master-locator">>, ?MODULE]}},
{mfa, {rabbit_registry, register,
[operator_policy_validator, <<"queue-master-locator">>, ?MODULE]}},
- {requires, rabbit_registry},
+ {requires, [rabbit_classic_queue_type]},
{enables, recovery}]}).
validate_policy(Args) ->
@@ -674,3 +698,56 @@ send_credit_reply(Pid, QName, Ctag, DeliveryCount, Credit, Available, Drain) ->
send_queue_event(Pid, QName, Event) ->
gen_server:cast(Pid, {queue_event, QName, Event}).
+
+-spec queue_topology(amqqueue:amqqueue()) ->
+ {Leader :: undefined | node(), Replicas :: undefined | [node(),...]}.
+queue_topology(Q) ->
+ Pid = amqqueue:get_pid(Q),
+ Node = node(Pid),
+ {Node, [Node]}.
+
+feature_flag_name() ->
+ undefined.
+
+policy_apply_to_name() ->
+ <<"classic_queues">>.
+
+can_redeliver() ->
+ true.
+
+stop(VHost) ->
+ ok = rabbit_amqqueue_sup_sup:stop_for_vhost(VHost),
+ {ok, BQ} = application:get_env(rabbit, backing_queue_module),
+ ok = BQ:stop(VHost).
+
+is_replicated() ->
+ false.
+
+rebalance_module() ->
+ {error, not_supported}.
+
+list_with_minimum_quorum() ->
+ [].
+
+drain(_TransferCandidates) ->
+ ok.
+
+revive() ->
+ ok.
+
+queue_vm_stats_sups() ->
+ {[queue_procs], [rabbit_vm:all_vhosts_children(rabbit_amqqueue_sup_sup)]}.
+
+%% return nothing because of this line in rabbit_vm:
+%% {msg_index, MsgIndexETS + MsgIndexProc},
+%% it mixes procs and ets,
+%% TODO: maybe instead of separating sups and ets
+%% I need vm_memory callback that just
+%% returns proplist? And rabbit_vm calculates
+%% Other as usual by substraction.
+queue_vm_ets() ->
+ {[],
+ []}.
+
+dir_base() ->
+ [rabbit_vhost:msg_store_dir_base()].
diff --git a/deps/rabbit/src/rabbit_core_metrics_gc.erl b/deps/rabbit/src/rabbit_core_metrics_gc.erl
index 0e639b3aa188..b1e411e35b74 100644
--- a/deps/rabbit/src/rabbit_core_metrics_gc.erl
+++ b/deps/rabbit/src/rabbit_core_metrics_gc.erl
@@ -74,22 +74,15 @@ gc_local_queues() ->
GbSetDown = gb_sets:from_list(QueuesDown),
gc_queue_metrics(GbSet, GbSetDown),
gc_entity(queue_coarse_metrics, GbSet),
- Followers = gb_sets:from_list([amqqueue:get_name(Q) || Q <- rabbit_amqqueue:list_local_followers() ]),
- gc_leader_data(Followers).
-
-gc_leader_data(Followers) ->
- ets:foldl(fun({Id, _, _, _, _}, none) ->
- gc_leader_data(Id, queue_coarse_metrics, Followers)
- end, none, queue_coarse_metrics).
-
-gc_leader_data(Id, Table, GbSet) ->
- case gb_sets:is_member(Id, GbSet) of
- true ->
- ets:delete(Table, Id),
- none;
- false ->
- none
- end.
+ %% remove coarse metrics for quorum queues without local leader
+ gc_leader_data().
+
+gc_leader_data() ->
+ _ = [begin
+ QName = amqqueue:get_name(Q),
+ rabbit_core_metrics:delete_queue_coarse_metrics(QName)
+ end || Q <- rabbit_amqqueue:list_local_followers()],
+ ok.
gc_global_queues() ->
GbSet = gb_sets:from_list(rabbit_amqqueue:list_names()),
diff --git a/deps/rabbit/src/rabbit_definitions.erl b/deps/rabbit/src/rabbit_definitions.erl
index 0f69b3ddf424..9027a6417342 100644
--- a/deps/rabbit/src/rabbit_definitions.erl
+++ b/deps/rabbit/src/rabbit_definitions.erl
@@ -1045,16 +1045,11 @@ list_queues() ->
queue_definition(Q) ->
#resource{virtual_host = VHost, name = Name} = amqqueue:get_name(Q),
- Type = case amqqueue:get_type(Q) of
- rabbit_classic_queue -> classic;
- rabbit_quorum_queue -> quorum;
- rabbit_stream_queue -> stream;
- T -> T
- end,
+ TypeModule = amqqueue:get_type(Q),
#{
<<"vhost">> => VHost,
<<"name">> => Name,
- <<"type">> => Type,
+ <<"type">> => rabbit_registry:lookup_type_name(queue, TypeModule),
<<"durable">> => amqqueue:is_durable(Q),
<<"auto_delete">> => amqqueue:is_auto_delete(Q),
<<"arguments">> => rabbit_misc:amqp_table(amqqueue:get_arguments(Q))
diff --git a/deps/rabbit/src/rabbit_exchange.erl b/deps/rabbit/src/rabbit_exchange.erl
index f30ff09408d5..301496f0ed9b 100644
--- a/deps/rabbit/src/rabbit_exchange.erl
+++ b/deps/rabbit/src/rabbit_exchange.erl
@@ -471,11 +471,13 @@ delete(XName, IfUnused, Username) ->
?EXCHANGE_DELETE_IN_PROGRESS_COMPONENT,
XName#resource.name, true, Username),
case rabbit_db_exchange:delete(XName, IfUnused) of
- {deleted, #exchange{name = XName} = X, Bs, Deletions} ->
+ {deleted, #exchange{name = XName, type = XType} = X, Bs, Deletions} ->
Deletions1 = rabbit_binding:add_deletion(
XName, X, deleted, Bs, Deletions),
ok = rabbit_binding:process_deletions(Deletions1),
ok = rabbit_binding:notify_deletions(Deletions1, Username),
+ {ok, XTypeModule} = rabbit_registry:lookup_type_module(exchange, XType),
+ ok = XTypeModule:delete(0, X),
ok;
{error, _} = Err ->
Err
diff --git a/deps/rabbit/src/rabbit_fifo_dlx_worker.erl b/deps/rabbit/src/rabbit_fifo_dlx_worker.erl
index 647317a35618..6fcf03d37d89 100644
--- a/deps/rabbit/src/rabbit_fifo_dlx_worker.erl
+++ b/deps/rabbit/src/rabbit_fifo_dlx_worker.erl
@@ -538,14 +538,8 @@ redeliver0(#pending{delivery = Msg0,
clients_redeliver(Qs, QTypeState) ->
lists:filter(fun(Q) ->
case rabbit_queue_type:module(Q, QTypeState) of
- {ok, rabbit_quorum_queue} ->
- % If #enqueue{} Raft command does not get applied
- % rabbit_fifo_client will resend.
- true;
- {ok, rabbit_stream_queue} ->
- true;
- _ ->
- false
+ {ok, TypeModule} -> TypeModule:can_redeliver();
+ _ -> false
end
end, Qs).
diff --git a/deps/rabbit/src/rabbit_global_counters.erl b/deps/rabbit/src/rabbit_global_counters.erl
index e1aba4ca0455..49fc9a06fe53 100644
--- a/deps/rabbit/src/rabbit_global_counters.erl
+++ b/deps/rabbit/src/rabbit_global_counters.erl
@@ -266,8 +266,8 @@ messages_dead_lettered(Reason, QueueType, DeadLetterStrategy, Num) ->
end,
counters:add(fetch(QueueType, DeadLetterStrategy), Index, Num).
-messages_dead_lettered_confirmed(rabbit_quorum_queue, at_least_once, Num) ->
- counters:add(fetch(rabbit_quorum_queue, at_least_once), ?MESSAGES_DEAD_LETTERED_CONFIRMED, Num).
+messages_dead_lettered_confirmed(QTypeModule, at_least_once, Num) ->
+ counters:add(fetch(QTypeModule, at_least_once), ?MESSAGES_DEAD_LETTERED_CONFIRMED, Num).
fetch(Protocol) ->
persistent_term:get({?MODULE, Protocol}).
diff --git a/deps/rabbit/src/rabbit_maintenance.erl b/deps/rabbit/src/rabbit_maintenance.erl
index f6ee1f340287..873bc8453d85 100644
--- a/deps/rabbit/src/rabbit_maintenance.erl
+++ b/deps/rabbit/src/rabbit_maintenance.erl
@@ -33,7 +33,6 @@
close_all_client_connections/0,
primary_replica_transfer_candidate_nodes/0,
random_primary_replica_transfer_candidate_node/2,
- transfer_leadership_of_quorum_queues/1,
table_definitions/0
]).
@@ -78,13 +77,7 @@ drain() ->
TransferCandidates = primary_replica_transfer_candidate_nodes(),
%% Note: only QQ leadership is transferred because it is a reasonably quick thing to do a lot of queues
%% in the cluster, unlike with CMQs.
- transfer_leadership_of_quorum_queues(TransferCandidates),
- stop_local_quorum_queue_followers(),
-
- case whereis(rabbit_stream_coordinator) of
- undefined -> ok;
- _Pid -> transfer_leadership_of_stream_coordinator(TransferCandidates)
- end,
+ rabbit_queue_type:drain(TransferCandidates),
transfer_leadership_of_metadata_store(TransferCandidates),
@@ -99,7 +92,7 @@ drain() ->
-spec revive() -> ok.
revive() ->
rabbit_log:info("This node is being revived from maintenance (drain) mode"),
- revive_local_quorum_queue_replicas(),
+ rabbit_queue_type:revive(),
rabbit_log:info("Resumed all listeners and will accept client connections again"),
_ = resume_all_client_listeners(),
rabbit_log:info("Resumed all listeners and will accept client connections again"),
@@ -186,32 +179,6 @@ close_all_client_connections() ->
rabbit_networking:close_connections(Pids, "Node was put into maintenance mode"),
{ok, length(Pids)}.
--spec transfer_leadership_of_quorum_queues([node()]) -> ok.
-transfer_leadership_of_quorum_queues([]) ->
- rabbit_log:warning("Skipping leadership transfer of quorum queues: no candidate "
- "(online, not under maintenance) nodes to transfer to!");
-transfer_leadership_of_quorum_queues(_TransferCandidates) ->
- %% we only transfer leadership for QQs that have local leaders
- Queues = rabbit_amqqueue:list_local_leaders(),
- rabbit_log:info("Will transfer leadership of ~b quorum queues with current leader on this node",
- [length(Queues)]),
- [begin
- Name = amqqueue:get_name(Q),
- rabbit_log:debug("Will trigger a leader election for local quorum queue ~ts",
- [rabbit_misc:rs(Name)]),
- %% we trigger an election and exclude this node from the list of candidates
- %% by simply shutting its local QQ replica (Ra server)
- RaLeader = amqqueue:get_pid(Q),
- rabbit_log:debug("Will stop Ra server ~tp", [RaLeader]),
- case rabbit_quorum_queue:stop_server(RaLeader) of
- ok ->
- rabbit_log:debug("Successfully stopped Ra server ~tp", [RaLeader]);
- {error, nodedown} ->
- rabbit_log:error("Failed to stop Ra server ~tp: target node was reported as down")
- end
- end || Q <- Queues],
- rabbit_log:info("Leadership transfer for quorum queues hosted on this node has been initiated").
-
transfer_leadership_of_metadata_store(TransferCandidates) ->
rabbit_log:info("Will transfer leadership of metadata store with current leader on this node",
[]),
@@ -224,47 +191,6 @@ transfer_leadership_of_metadata_store(TransferCandidates) ->
rabbit_log:warning("Skipping leadership transfer of metadata store: ~p", [Error])
end.
--spec transfer_leadership_of_stream_coordinator([node()]) -> ok.
-transfer_leadership_of_stream_coordinator([]) ->
- rabbit_log:warning("Skipping leadership transfer of stream coordinator: no candidate "
- "(online, not under maintenance) nodes to transfer to!");
-transfer_leadership_of_stream_coordinator(TransferCandidates) ->
- % try to transfer to the node with the lowest uptime; the assumption is that
- % nodes are usually restarted in a rolling fashion, in a consistent order;
- % therefore, the youngest node has already been restarted or (if we are draining the first node)
- % that it will be restarted last. either way, this way we limit the number of transfers
- Uptimes = rabbit_misc:append_rpc_all_nodes(TransferCandidates, erlang, statistics, [wall_clock]),
- Candidates = lists:zipwith(fun(N, {U, _}) -> {N, U} end, TransferCandidates, Uptimes),
- BestCandidate = element(1, hd(lists:keysort(2, Candidates))),
- case rabbit_stream_coordinator:transfer_leadership([BestCandidate]) of
- {ok, Node} ->
- rabbit_log:info("Leadership transfer for stream coordinator completed. The new leader is ~p", [Node]);
- Error ->
- rabbit_log:warning("Skipping leadership transfer of stream coordinator: ~p", [Error])
- end.
-
--spec stop_local_quorum_queue_followers() -> ok.
-stop_local_quorum_queue_followers() ->
- Queues = rabbit_amqqueue:list_local_followers(),
- rabbit_log:info("Will stop local follower replicas of ~b quorum queues on this node",
- [length(Queues)]),
- [begin
- Name = amqqueue:get_name(Q),
- rabbit_log:debug("Will stop a local follower replica of quorum queue ~ts",
- [rabbit_misc:rs(Name)]),
- %% shut down Ra nodes so that they are not considered for leader election
- {RegisteredName, _LeaderNode} = amqqueue:get_pid(Q),
- RaNode = {RegisteredName, node()},
- rabbit_log:debug("Will stop Ra server ~tp", [RaNode]),
- case rabbit_quorum_queue:stop_server(RaNode) of
- ok ->
- rabbit_log:debug("Successfully stopped Ra server ~tp", [RaNode]);
- {error, nodedown} ->
- rabbit_log:error("Failed to stop Ra server ~tp: target node was reported as down")
- end
- end || Q <- Queues],
- rabbit_log:info("Stopped all local replicas of quorum queues hosted on this node").
-
-spec primary_replica_transfer_candidate_nodes() -> [node()].
primary_replica_transfer_candidate_nodes() ->
filter_out_drained_nodes_consistent_read(rabbit_nodes:list_running() -- [node()]).
@@ -289,24 +215,6 @@ random_nth(Nodes) ->
Nth = erlang:phash2(erlang:monotonic_time(), length(Nodes)),
lists:nth(Nth + 1, Nodes).
-revive_local_quorum_queue_replicas() ->
- Queues = rabbit_amqqueue:list_local_followers(),
- %% NB: this function ignores the first argument so we can just pass the
- %% empty binary as the vhost name.
- {Recovered, Failed} = rabbit_quorum_queue:recover(<<>>, Queues),
- rabbit_log:debug("Successfully revived ~b quorum queue replicas",
- [length(Recovered)]),
- case length(Failed) of
- 0 ->
- ok;
- NumFailed ->
- rabbit_log:error("Failed to revive ~b quorum queue replicas",
- [NumFailed])
- end,
-
- rabbit_log:info("Restart of local quorum queue replicas is complete"),
- ok.
-
%%
%% Implementation
%%
diff --git a/deps/rabbit/src/rabbit_observer_cli.erl b/deps/rabbit/src/rabbit_observer_cli.erl
index 77c102d1f6e3..432426d8932b 100644
--- a/deps/rabbit/src/rabbit_observer_cli.erl
+++ b/deps/rabbit/src/rabbit_observer_cli.erl
@@ -7,10 +7,21 @@
-module(rabbit_observer_cli).
--export([init/0]).
+-export([init/0, add_plugin/1]).
init() ->
application:set_env(observer_cli, plugins, [
rabbit_observer_cli_classic_queues:plugin_info(),
rabbit_observer_cli_quorum_queues:plugin_info()
]).
+
+%% must be executed after observer_cli boot_step
+add_plugin(PluginInfo) ->
+ case application:get_env(observer_cli, plugins, undefined) of
+ undefined -> %% shouldn't be there, die
+ exit({rabbit_observer_cli_step_not_there, "Can't add observer_cli plugin, required boot_step wasn't executed"});
+ Plugins when is_list(Plugins) ->
+ application:set_env(observer_cli, plugins, Plugins ++ [PluginInfo]);
+ _ ->
+ exit({rabbit_observer_cli_plugins_error, "Can't add observer_cli plugin, existing entry is not a list"})
+ end.
diff --git a/deps/rabbit/src/rabbit_policy.erl b/deps/rabbit/src/rabbit_policy.erl
index 381927f36df7..f18b8cfc7569 100644
--- a/deps/rabbit/src/rabbit_policy.erl
+++ b/deps/rabbit/src/rabbit_policy.erl
@@ -493,10 +493,13 @@ matches_type(_, _) -> false.
matches_queue_type(queue, _, <<"all">>) -> true;
matches_queue_type(queue, _, <<"queues">>) -> true;
-matches_queue_type(queue, rabbit_classic_queue, <<"classic_queues">>) -> true;
-matches_queue_type(queue, rabbit_quorum_queue, <<"quorum_queues">>) -> true;
-matches_queue_type(queue, rabbit_stream_queue, <<"streams">>) -> true;
-matches_queue_type(queue, _, _) -> false.
+matches_queue_type(queue, TypeModule, Term) ->
+ %% we assume here TypeModule comes from queue struct,
+ %% therefore it is used and loaded - no need to check
+ %% with registry.
+ %% we also assume here and elsewhere that queue type
+ %% module developer implemented all needed callbacks
+ TypeModule:policy_apply_to_name() == Term.
priority_comparator(A, B) -> pget(priority, A) >= pget(priority, B).
@@ -578,9 +581,20 @@ is_proplist(L) -> length(L) =:= length([I || I = {_, _} <- L]).
apply_to_validation(_Name, <<"all">>) -> ok;
apply_to_validation(_Name, <<"exchanges">>) -> ok;
apply_to_validation(_Name, <<"queues">>) -> ok;
-apply_to_validation(_Name, <<"classic_queues">>) -> ok;
-apply_to_validation(_Name, <<"quorum_queues">>) -> ok;
apply_to_validation(_Name, <<"streams">>) -> ok;
apply_to_validation(_Name, Term) ->
- {error, "apply-to '~ts' unrecognised; should be one of: 'queues', 'classic_queues', "
- " 'quorum_queues', 'streams', 'exchanges', or 'all'", [Term]}.
+ %% as a last restort go to queue types registry
+ %% and try to find something here
+ case maybe_apply_to_queue_type(Term) of
+ true -> ok;
+ false ->
+ %% TODO: get recognized queue terms from queue types from queue type.
+ {error, "apply-to '~ts' unrecognised; should be one of: 'queues', 'classic_queues', "
+ " 'quorum_queues', 'streams', 'exchanges', or 'all'", [Term]}
+ end.
+
+maybe_apply_to_queue_type(Term) ->
+ [] =/= lists:filter(fun({_TypeName, TypeModule}) ->
+ TypeModule:policy_apply_to_name() == Term
+ end,
+ rabbit_registry:lookup_all(queue)).
diff --git a/deps/rabbit/src/rabbit_queue_location.erl b/deps/rabbit/src/rabbit_queue_location.erl
index 4c7dfe7ea0b9..0f204f97347e 100644
--- a/deps/rabbit/src/rabbit_queue_location.erl
+++ b/deps/rabbit/src/rabbit_queue_location.erl
@@ -45,7 +45,7 @@ queue_leader_locators() ->
-spec select_leader_and_followers(amqqueue:amqqueue(), pos_integer()) ->
{Leader :: node(), Followers :: [node()]}.
select_leader_and_followers(Q, Size)
- when (?amqqueue_is_quorum(Q) orelse ?amqqueue_is_stream(Q) orelse ?amqqueue_is_classic(Q)) andalso is_integer(Size) ->
+ when (?is_amqqueue_v2(Q)) andalso is_integer(Size) ->
LeaderLocator = leader_locator(Q),
QueueType = amqqueue:get_type(Q),
do_select_leader_and_followers(Size, QueueType, LeaderLocator).
@@ -109,6 +109,7 @@ leader_locator0(_) ->
%% default
<<"client-local">>.
+%% TODO: allow dispatching by queue type
-spec select_members(pos_integer(), rabbit_queue_type:queue_type(), [node(),...], [node(),...],
non_neg_integer(), non_neg_integer(), function()) ->
{[node(),...], function()}.
diff --git a/deps/rabbit/src/rabbit_queue_type.erl b/deps/rabbit/src/rabbit_queue_type.erl
index de7754442bb5..930fa824f367 100644
--- a/deps/rabbit/src/rabbit_queue_type.erl
+++ b/deps/rabbit/src/rabbit_queue_type.erl
@@ -62,7 +62,14 @@
arguments/1,
arguments/2,
notify_decorators/1,
- publish_at_most_once/2
+ publish_at_most_once/2,
+ can_redeliver/2,
+ stop/1,
+ endangered_queues/0,
+ drain/1,
+ revive/0,
+ queue_vm_stats_sups/0,
+ queue_vm_ets/0
]).
-export([
@@ -77,7 +84,7 @@
%% sequence number typically
-type correlation() :: term().
-type arguments() :: queue_arguments | consumer_arguments.
--type queue_type() :: rabbit_classic_queue | rabbit_quorum_queue | rabbit_stream_queue | module().
+-type queue_type() :: module().
%% see AMQP 1.0 §2.6.7
-type delivery_count() :: sequence_no().
-type credit() :: uint().
@@ -86,10 +93,6 @@
-define(DOWN_KEYS, [name, durable, auto_delete, arguments, pid, type, state]).
-%% TODO resolve all registered queue types from registry
--define(QUEUE_MODULES, [rabbit_classic_queue, rabbit_quorum_queue, rabbit_stream_queue]).
--define(KNOWN_QUEUE_TYPES, [<<"classic">>, <<"quorum">>, <<"stream">>]).
-
-type credit_reply_action() :: {credit_reply, rabbit_types:ctag(), delivery_count(), credit(),
Available :: non_neg_integer(), Drain :: boolean()}.
@@ -274,75 +277,59 @@
-callback notify_decorators(amqqueue:amqqueue()) ->
ok.
+-callback queue_topology(amqqueue:amqqueue()) ->
+ {Leader :: undefined | node(), Replicas :: undefined | [node(),...]}.
+
+-callback feature_flag_name() -> atom().
+
+-callback policy_apply_to_name() -> binary().
+
+%% -callback on_node_up(node()) -> ok.
+
+%% -callback on_node_down(node()) -> ok.
+
+-callback can_redeliver() -> boolean().
+
+-callback stop(rabbit_types:vhost()) -> ok.
+
+-callback is_replicated() -> boolean().
+
+-callback rebalance_module() -> module() | {error, not_supported}.
+
+-callback list_with_minimum_quorum() -> [amqqueue:amqqueue()].
+
+-callback drain([node()]) -> ok.
+
+-callback revive() -> ok.
+
+%% used by rabbit_vm to emit queue process
+%% (currently memory and binary) stats
+-callback queue_vm_stats_sups() -> {StatsKeys :: [atom()], SupsNames:: [[atom()]]}.
+
-spec discover(binary() | atom()) -> queue_type().
discover(<<"undefined">>) ->
fallback();
discover(undefined) ->
fallback();
-%% TODO: should this use a registry that's populated on boot?
-discover(<<"quorum">>) ->
- rabbit_quorum_queue;
-discover(rabbit_quorum_queue) ->
- rabbit_quorum_queue;
-discover(<<"classic">>) ->
- rabbit_classic_queue;
-discover(rabbit_classic_queue) ->
- rabbit_classic_queue;
-discover(rabbit_stream_queue) ->
- rabbit_stream_queue;
-discover(<<"stream">>) ->
- rabbit_stream_queue;
-discover(Other) when is_atom(Other) ->
- discover(rabbit_data_coercion:to_binary(Other));
-discover(Other) when is_binary(Other) ->
- T = rabbit_registry:binary_to_type(Other),
- rabbit_log:debug("Queue type discovery: will look up a module for type '~tp'", [T]),
- {ok, Mod} = rabbit_registry:lookup_module(queue, T),
- Mod.
-
--spec short_alias_of(queue_type()) -> binary().
-%% The opposite of discover/1: returns a short alias given a module name
-short_alias_of(<<"rabbit_quorum_queue">>) ->
- <<"quorum">>;
-short_alias_of(rabbit_quorum_queue) ->
- <<"quorum">>;
-%% AMQP 1.0 management client
-short_alias_of({utf8, <<"quorum">>}) ->
- <<"quorum">>;
-short_alias_of(<<"rabbit_classic_queue">>) ->
- <<"classic">>;
-short_alias_of(rabbit_classic_queue) ->
- <<"classic">>;
-%% AMQP 1.0 management client
-short_alias_of({utf8, <<"classic">>}) ->
- <<"classic">>;
-short_alias_of(<<"rabbit_stream_queue">>) ->
- <<"stream">>;
-short_alias_of(rabbit_stream_queue) ->
- <<"stream">>;
-%% AMQP 1.0 management client
-short_alias_of({utf8, <<"stream">>}) ->
- <<"stream">>;
-%% for cases where this function is used for
-%% formatting of values that already might use these
-%% short aliases
-short_alias_of(<<"quorum">>) ->
- <<"quorum">>;
-short_alias_of(<<"classic">>) ->
- <<"classic">>;
-short_alias_of(<<"stream">>) ->
- <<"stream">>;
-short_alias_of(_Other) ->
- undefined.
-
-feature_flag_name(<<"quorum">>) ->
- quorum_queue;
-feature_flag_name(<<"classic">>) ->
- undefined;
-feature_flag_name(<<"stream">>) ->
- stream_queue;
-feature_flag_name(_) ->
- undefined.
+discover(TypeDescriptor) ->
+ {ok, TypeModule} = rabbit_registry:lookup_type_module(queue, TypeDescriptor),
+ TypeModule.
+
+-spec short_alias_of(TypeDescriptor) -> Ret when
+ TypeDescriptor :: atom() | binary(),
+ Ret :: binary().
+short_alias_of(TypeDescriptor) ->
+ case rabbit_registry:lookup_type_name(queue, TypeDescriptor) of
+ {ok, TypeName} -> TypeName;
+ _ -> undefined
+ end.
+
+feature_flag_name(TypeDescriptor) ->
+ case rabbit_registry:lookup_type_module(queue, TypeDescriptor) of
+ {ok, TypeModule} ->
+ TypeModule:feature_flag_name();
+ _ -> undefined
+ end.
%% If the client does not specify the type, the virtual host does not have any
%% metadata default, and rabbit.default_queue_type is not set in the application env,
@@ -362,15 +349,15 @@ default() ->
default_alias() ->
short_alias_of(default()).
+%% used for example like this
+%% {{utf8, <<"type">>}, {utf8, rabbit_queue_type:to_binary(QType)}},
+%% so not just any binary but a type name
-spec to_binary(module()) -> binary().
-to_binary(rabbit_classic_queue) ->
- <<"classic">>;
-to_binary(rabbit_quorum_queue) ->
- <<"quorum">>;
-to_binary(rabbit_stream_queue) ->
- <<"stream">>;
-to_binary(Other) ->
- atom_to_binary(Other).
+to_binary(TypeModule) ->
+ case rabbit_registry:lookup_type_name(queue, TypeModule) of
+ {ok, TypeName} -> TypeName;
+ _ -> undefined
+ end.
%% is a specific queue type implementation enabled
-spec is_enabled(module()) -> boolean().
@@ -849,14 +836,13 @@ qref(Q) when ?is_amqqueue(Q) ->
known_queue_type_modules() ->
Registered = rabbit_registry:lookup_all(queue),
{_, Modules} = lists:unzip(Registered),
- ?QUEUE_MODULES ++ Modules.
+ Modules.
-spec known_queue_type_names() -> [binary()].
known_queue_type_names() ->
Registered = rabbit_registry:lookup_all(queue),
{QueueTypes, _} = lists:unzip(Registered),
- QTypeBins = lists:map(fun(X) -> atom_to_binary(X) end, QueueTypes),
- ?KNOWN_QUEUE_TYPES ++ QTypeBins.
+ lists:map(fun(X) -> atom_to_binary(X) end, QueueTypes).
inject_dqt(VHost) when ?is_vhost(VHost) ->
inject_dqt(vhost:to_map(VHost));
@@ -920,3 +906,46 @@ check_cluster_queue_limit(Q) ->
queue_limit_error(Reason, ReasonArgs) ->
{error, queue_limit_exceeded, Reason, ReasonArgs}.
+
+-spec can_redeliver(queue_name(), state()) ->
+ {ok, module()} | {error, not_found}.
+can_redeliver(Q, State) ->
+ case module(Q, State) of
+ {ok, TypeModule} ->
+ TypeModule:can_redeliver();
+ _ -> false
+ end.
+
+-spec stop(rabbit_types:vhost()) -> ok.
+stop(VHost) ->
+ %% original rabbit_amqqueue:stop doesn't do any catches or try after
+ _ = [TypeModule:stop(VHost) || {_Type, TypeModule} <- rabbit_registry:lookup_all(queue)],
+ ok.
+
+endangered_queues() ->
+ lists:append([TypeModule:list_with_minimum_quorum()
+ || {_Type, TypeModule} <- rabbit_registry:lookup_all(queue)]).
+
+drain(TransferCandidates) ->
+ _ = [TypeModule:drain(TransferCandidates) ||
+ {_Type, TypeModule} <- rabbit_registry:lookup_all(queue)],
+ ok.
+
+revive() ->
+ _ = [TypeModule:revive() ||
+ {_Type, TypeModule} <- rabbit_registry:lookup_all(queue)],
+ ok.
+
+queue_vm_stats_sups() ->
+ lists:foldl(fun({_TypeName, TypeModule}, {KeysAcc, SupsAcc}) ->
+ {Keys, Sups} = TypeModule:queue_vm_stats_sups(),
+ {KeysAcc ++ Keys, SupsAcc ++ Sups}
+ end,
+ {[], []}, rabbit_registry:lookup_all(queue)).
+
+queue_vm_ets() ->
+ lists:foldl(fun({_TypeName, TypeModule}, {KeysAcc, SupsAcc}) ->
+ {Keys, Tables} = TypeModule:queue_vm_ets(),
+ {KeysAcc ++ Keys, SupsAcc ++ Tables}
+ end,
+ {[], []}, rabbit_registry:lookup_all(queue)).
diff --git a/deps/rabbit/src/rabbit_quorum_queue.erl b/deps/rabbit/src/rabbit_quorum_queue.erl
index c9fb877b38dc..312e20c02533 100644
--- a/deps/rabbit/src/rabbit_quorum_queue.erl
+++ b/deps/rabbit/src/rabbit_quorum_queue.erl
@@ -77,6 +77,17 @@
force_vhost_queues_shrink_member_to_current_member/1,
force_all_queues_shrink_member_to_current_member/0]).
+-export([queue_topology/1,
+ feature_flag_name/0,
+ policy_apply_to_name/0,
+ can_redeliver/0,
+ is_replicated/0,
+ rebalance_module/0,
+ drain/1,
+ revive/0,
+ queue_vm_stats_sups/0,
+ queue_vm_ets/0]).
+
%% for backwards compatibility
-export([file_handle_leader_reservation/1,
file_handle_other_reservation/0,
@@ -95,6 +106,15 @@
-include_lib("rabbit_common/include/rabbit.hrl").
-include("amqqueue.hrl").
+-rabbit_boot_step(
+ {rabbit_quorum_queue_type,
+ [{description, "Quorum queue: queue type"},
+ {mfa, {rabbit_registry, register,
+ [queue, <<"quorum">>, ?MODULE]}},
+ {cleanup, {rabbit_registry, unregister,
+ [queue, <<"quorum">>]}},
+ {requires, rabbit_registry}]}).
+
-type msg_id() :: non_neg_integer().
-type qmsg() :: {rabbit_types:r('queue'), pid(), msg_id(), boolean(),
mc:state()}.
@@ -160,7 +180,7 @@
[operator_policy_validator, <<"target-group-size">>, ?MODULE]}},
{mfa, {rabbit_registry, register,
[policy_merge_strategy, <<"target-group-size">>, ?MODULE]}},
- {requires, rabbit_registry},
+ {requires, [rabbit_registry]},
{enables, recovery}]}).
validate_policy(Args) ->
@@ -2145,3 +2165,113 @@ file_handle_other_reservation() ->
file_handle_release_reservation() ->
ok.
+-spec queue_topology(amqqueue:amqqueue()) ->
+ {Leader :: undefined | node(), Replicas :: undefined | [node(),...]}.
+queue_topology(Q) ->
+ [{leader, Leader0},
+ {members, Members}] = rabbit_queue_type:info(Q, [leader, members]),
+ Leader = case Leader0 of
+ '' -> undefined;
+ _ -> Leader0
+ end,
+ {Leader, Members}.
+
+feature_flag_name() ->
+ quorum_queue.
+
+policy_apply_to_name() ->
+ <<"quorum_queues">>.
+
+can_redeliver() ->
+ true.
+
+is_replicated() ->
+ true.
+
+rebalance_module() ->
+ ?MODULE.
+
+-spec drain([node()]) -> ok.
+drain(TransferCandidates) ->
+ _ = transfer_leadership(TransferCandidates),
+ _ = stop_local_quorum_queue_followers(),
+ ok.
+
+transfer_leadership([]) ->
+ rabbit_log:warning("Skipping leadership transfer of quorum queues: no candidate "
+ "(online, not under maintenance) nodes to transfer to!");
+transfer_leadership(_TransferCandidates) ->
+ %% we only transfer leadership for QQs that have local leaders
+ Queues = rabbit_amqqueue:list_local_leaders(),
+ rabbit_log:info("Will transfer leadership of ~b quorum queues with current leader on this node",
+ [length(Queues)]),
+ [begin
+ Name = amqqueue:get_name(Q),
+ rabbit_log:debug("Will trigger a leader election for local quorum queue ~ts",
+ [rabbit_misc:rs(Name)]),
+ %% we trigger an election and exclude this node from the list of candidates
+ %% by simply shutting its local QQ replica (Ra server)
+ RaLeader = amqqueue:get_pid(Q),
+ rabbit_log:debug("Will stop Ra server ~tp", [RaLeader]),
+ case rabbit_quorum_queue:stop_server(RaLeader) of
+ ok ->
+ rabbit_log:debug("Successfully stopped Ra server ~tp", [RaLeader]);
+ {error, nodedown} ->
+ rabbit_log:error("Failed to stop Ra server ~tp: target node was reported as down")
+ end
+ end || Q <- Queues],
+ rabbit_log:info("Leadership transfer for quorum queues hosted on this node has been initiated").
+
+%% TODO: I just copied it over, it looks like was always called inside maintenance so...
+-spec stop_local_quorum_queue_followers() -> ok.
+stop_local_quorum_queue_followers() ->
+ Queues = rabbit_amqqueue:list_local_followers(),
+ rabbit_log:info("Will stop local follower replicas of ~b quorum queues on this node",
+ [length(Queues)]),
+ [begin
+ Name = amqqueue:get_name(Q),
+ rabbit_log:debug("Will stop a local follower replica of quorum queue ~ts",
+ [rabbit_misc:rs(Name)]),
+ %% shut down Ra nodes so that they are not considered for leader election
+ {RegisteredName, _LeaderNode} = amqqueue:get_pid(Q),
+ RaNode = {RegisteredName, node()},
+ rabbit_log:debug("Will stop Ra server ~tp", [RaNode]),
+ case rabbit_quorum_queue:stop_server(RaNode) of
+ ok ->
+ rabbit_log:debug("Successfully stopped Ra server ~tp", [RaNode]);
+ {error, nodedown} ->
+ rabbit_log:error("Failed to stop Ra server ~tp: target node was reported as down")
+ end
+ end || Q <- Queues],
+ rabbit_log:info("Stopped all local replicas of quorum queues hosted on this node").
+
+revive() ->
+ revive_local_queue_replicas().
+
+revive_local_queue_replicas() ->
+ Queues = rabbit_amqqueue:list_local_followers(),
+ %% NB: this function ignores the first argument so we can just pass the
+ %% empty binary as the vhost name.
+ {Recovered, Failed} = rabbit_quorum_queue:recover(<<>>, Queues),
+ rabbit_log:debug("Successfully revived ~b quorum queue replicas",
+ [length(Recovered)]),
+ case length(Failed) of
+ 0 ->
+ ok;
+ NumFailed ->
+ rabbit_log:error("Failed to revive ~b quorum queue replicas",
+ [NumFailed])
+ end,
+
+ rabbit_log:info("Restart of local quorum queue replicas is complete"),
+ ok.
+
+queue_vm_stats_sups() ->
+ {[quorum_queue_procs,
+ quorum_queue_dlx_procs],
+ [[ra_server_sup_sup],
+ [rabbit_fifo_dlx_sup]]}.
+
+queue_vm_ets() ->
+ {[quorum_ets],
+ [[ra_log_ets]]}.
diff --git a/deps/rabbit/src/rabbit_stream_queue.erl b/deps/rabbit/src/rabbit_stream_queue.erl
index 5c34b653b5da..8307a23a41e3 100644
--- a/deps/rabbit/src/rabbit_stream_queue.erl
+++ b/deps/rabbit/src/rabbit_stream_queue.erl
@@ -59,6 +59,18 @@
-export([check_max_segment_size_bytes/1]).
+-export([queue_topology/1,
+ feature_flag_name/0,
+ policy_apply_to_name/0,
+ can_redeliver/0,
+ stop/1,
+ is_replicated/0,
+ rebalance_module/0,
+ drain/1,
+ revive/0,
+ queue_vm_stats_sups/0,
+ queue_vm_ets/0]).
+
-include_lib("rabbit_common/include/rabbit.hrl").
-include("amqqueue.hrl").
@@ -103,6 +115,17 @@
-import(rabbit_queue_type_util, [args_policy_lookup/3]).
-import(rabbit_misc, [queue_resource/2]).
+-rabbit_boot_step(
+ {?MODULE,
+ [{description, "Stream queue: queue type"},
+ {mfa, {rabbit_registry, register,
+ [queue, <<"stream">>, ?MODULE]}},
+ %% {cleanup, {rabbit_registry, unregister,
+ %% [queue, <<"stream">>]}},
+ {requires, rabbit_registry}%%,
+ %% {enables, rabbit_stream_queue_type}
+ ]}).
+
-type client() :: #stream_client{}.
-spec is_enabled() -> boolean().
@@ -832,10 +855,6 @@ status(Vhost, QueueName) ->
%% Handle not found queues
QName = #resource{virtual_host = Vhost, name = QueueName, kind = queue},
case rabbit_amqqueue:lookup(QName) of
- {ok, Q} when ?amqqueue_is_classic(Q) ->
- {error, classic_queue_not_supported};
- {ok, Q} when ?amqqueue_is_quorum(Q) ->
- {error, quorum_queue_not_supported};
{ok, Q} when ?amqqueue_is_stream(Q) ->
[begin
[get_key(role, C),
@@ -847,6 +866,8 @@ status(Vhost, QueueName) ->
get_key(readers, C),
get_key(segments, C)]
end || C <- get_counters(Q)];
+ {ok, _Q} ->
+ {error, not_supported};
{error, not_found} = E ->
E
end.
@@ -905,10 +926,6 @@ tracking_status(Vhost, QueueName) ->
%% Handle not found queues
QName = #resource{virtual_host = Vhost, name = QueueName, kind = queue},
case rabbit_amqqueue:lookup(QName) of
- {ok, Q} when ?amqqueue_is_classic(Q) ->
- {error, classic_queue_not_supported};
- {ok, Q} when ?amqqueue_is_quorum(Q) ->
- {error, quorum_queue_not_supported};
{ok, Q} when ?amqqueue_is_stream(Q) ->
Leader = amqqueue:get_pid(Q),
Map = osiris:read_tracking(Leader),
@@ -921,6 +938,8 @@ tracking_status(Vhost, QueueName) ->
{value, TrkData}] | Acc0]
end, [], Trackings) ++ Acc
end, [], Map);
+ {ok, Q} ->
+ {error, {queue_not_supported, ?amqqueue_type(Q)}};
{error, not_found} = E->
E
end.
@@ -1021,10 +1040,6 @@ restart_stream(VHost, Queue, Options)
add_replica(VHost, Name, Node) ->
QName = queue_resource(VHost, Name),
case rabbit_amqqueue:lookup(QName) of
- {ok, Q} when ?amqqueue_is_classic(Q) ->
- {error, classic_queue_not_supported};
- {ok, Q} when ?amqqueue_is_quorum(Q) ->
- {error, quorum_queue_not_supported};
{ok, Q} when ?amqqueue_is_stream(Q) ->
case lists:member(Node, rabbit_nodes:list_running()) of
false ->
@@ -1032,6 +1047,8 @@ add_replica(VHost, Name, Node) ->
true ->
rabbit_stream_coordinator:add_replica(Q, Node)
end;
+ {ok, Q} ->
+ {error, {queue_not_supported, ?amqqueue_type(Q)}};
E ->
E
end.
@@ -1039,14 +1056,12 @@ add_replica(VHost, Name, Node) ->
delete_replica(VHost, Name, Node) ->
QName = queue_resource(VHost, Name),
case rabbit_amqqueue:lookup(QName) of
- {ok, Q} when ?amqqueue_is_classic(Q) ->
- {error, classic_queue_not_supported};
- {ok, Q} when ?amqqueue_is_quorum(Q) ->
- {error, quorum_queue_not_supported};
{ok, Q} when ?amqqueue_is_stream(Q) ->
#{name := StreamId} = amqqueue:get_type_state(Q),
{ok, Reply, _} = rabbit_stream_coordinator:delete_replica(StreamId, Node),
Reply;
+ {ok, Q} ->
+ {error, {queue_not_supported, ?amqqueue_type(Q)}};
E ->
E
end.
@@ -1393,3 +1408,76 @@ delivery_count_add(none, _) ->
none;
delivery_count_add(Count, N) ->
serial_number:add(Count, N).
+
+-spec queue_topology(amqqueue:amqqueue()) ->
+ {Leader :: undefined | node(), Replicas :: undefined | [node(),...]}.
+queue_topology(Q) ->
+ #{name := StreamId} = amqqueue:get_type_state(Q),
+ case rabbit_stream_coordinator:members(StreamId) of
+ {ok, Members} ->
+ maps:fold(fun(Node, {_Pid, writer}, {_, Replicas}) ->
+ {Node, [Node | Replicas]};
+ (Node, {_Pid, replica}, {Writer, Replicas}) ->
+ {Writer, [Node | Replicas]}
+ end, {undefined, []}, Members);
+ {error, _} ->
+ {undefined, undefined}
+ end.
+
+feature_flag_name() ->
+ stream_queue.
+
+policy_apply_to_name() ->
+ <<"streams">>.
+
+can_redeliver() ->
+ true.
+
+stop(_VHost) ->
+ ok.
+
+is_replicated() ->
+ true.
+
+rebalance_module() ->
+ ?MODULE.
+
+drain(TransferCandidates) ->
+ case whereis(rabbit_stream_coordinator) of
+ undefined -> ok;
+ _Pid -> transfer_leadership_of_stream_coordinator(TransferCandidates)
+ end.
+
+revive() ->
+ ok.
+
+-spec transfer_leadership_of_stream_coordinator([node()]) -> ok.
+transfer_leadership_of_stream_coordinator([]) ->
+ rabbit_log:warning("Skipping leadership transfer of stream coordinator: no candidate "
+ "(online, not under maintenance) nodes to transfer to!");
+transfer_leadership_of_stream_coordinator(TransferCandidates) ->
+ % try to transfer to the node with the lowest uptime; the assumption is that
+ % nodes are usually restarted in a rolling fashion, in a consistent order;
+ % therefore, the youngest node has already been restarted or (if we are draining the first node)
+ % that it will be restarted last. either way, this way we limit the number of transfers
+ Uptimes = rabbit_misc:append_rpc_all_nodes(TransferCandidates, erlang, statistics, [wall_clock]),
+ Candidates = lists:zipwith(fun(N, {U, _}) -> {N, U} end, TransferCandidates, Uptimes),
+ BestCandidate = element(1, hd(lists:keysort(2, Candidates))),
+ case rabbit_stream_coordinator:transfer_leadership([BestCandidate]) of
+ {ok, Node} ->
+ rabbit_log:info("Leadership transfer for stream coordinator completed. The new leader is ~p", [Node]);
+ Error ->
+ rabbit_log:warning("Skipping leadership transfer of stream coordinator: ~p", [Error])
+ end.
+
+queue_vm_stats_sups() ->
+ {[stream_queue_procs,
+ stream_queue_replica_reader_procs,
+ stream_queue_coordinator_procs],
+ [[osiris_server_sup],
+ [osiris_replica_reader_sup],
+ [rabbit_stream_coordinator]]}.
+
+queue_vm_ets() ->
+ {[],
+ []}.
diff --git a/deps/rabbit/src/rabbit_upgrade_preparation.erl b/deps/rabbit/src/rabbit_upgrade_preparation.erl
index ad398eba0094..2f349b6fab7c 100644
--- a/deps/rabbit/src/rabbit_upgrade_preparation.erl
+++ b/deps/rabbit/src/rabbit_upgrade_preparation.erl
@@ -56,9 +56,7 @@ endangered_critical_components() ->
do_await_safe_online_quorum(0) ->
false;
do_await_safe_online_quorum(IterationsLeft) ->
- EndangeredQueues = lists:append(
- rabbit_quorum_queue:list_with_minimum_quorum(),
- rabbit_stream_queue:list_with_minimum_quorum()),
+ EndangeredQueues = rabbit_queue_type:endangered_queues(),
case EndangeredQueues =:= [] andalso endangered_critical_components() =:= [] of
true -> true;
false ->
@@ -83,9 +81,7 @@ do_await_safe_online_quorum(IterationsLeft) ->
-spec list_with_minimum_quorum_for_cli() -> [#{binary() => term()}].
list_with_minimum_quorum_for_cli() ->
- EndangeredQueues = lists:append(
- rabbit_quorum_queue:list_with_minimum_quorum(),
- rabbit_stream_queue:list_with_minimum_quorum()),
+ EndangeredQueues = rabbit_queue_type:endangered_queues(),
[amqqueue:to_printable(Q) || Q <- EndangeredQueues] ++
[#{
<<"readable_name">> => C,
diff --git a/deps/rabbit/src/rabbit_vm.erl b/deps/rabbit/src/rabbit_vm.erl
index 451f11688505..ebcc51692667 100644
--- a/deps/rabbit/src/rabbit_vm.erl
+++ b/deps/rabbit/src/rabbit_vm.erl
@@ -7,7 +7,7 @@
-module(rabbit_vm).
--export([memory/0, binary/0, ets_tables_memory/1]).
+-export([memory/0, binary/0, ets_tables_memory/1, all_vhosts_children/1]).
-define(MAGIC_PLUGINS, ["cowboy", "ranch", "sockjs"]).
@@ -16,19 +16,37 @@
-spec memory() -> rabbit_types:infos().
memory() ->
- All = interesting_sups(),
+ %% this whole aggregation pipeline preserves sups order
+ %% [{info_key, [SupName...]}...] i.e. flattened list of
+ %% info key, sups list pairs for each queue type
+ %% example for existing info keys:
+ %% [{queue_procs, queue_sups()},
+ %% {quorum_queue_procs, [ra_server_sup_sup]},
+ %% {quorum_queue_dlx_procs, [rabbit_fifo_dlx_sup]},
+ %% {stream_queue_procs, [osiris_server_sup]},
+ %% {stream_queue_replica_reader_procs, [osiris_replica_reader_sup]},
+ %% {stream_queue_coordinator_procs, [rabbit_stream_coordinator]}]
+ {QueueSupsStatsKeys, QueueStatsSups} = rabbit_queue_type:queue_vm_stats_sups(),
+
+ %% we keep order and that means this variable queues part
+ %% has to be matched somehow - | Rest is the best.
+ All = interesting_sups() ++ QueueStatsSups,
{Sums, _Other} = sum_processes(
lists:append(All), distinguishers(), [memory]),
- [Qs, Qqs, DlxWorkers, Ssqs, Srqs, SCoor, ConnsReader, ConnsWriter, ConnsChannel,
- ConnsOther, MsgIndexProc, MgmtDbProc, Plugins] =
+ [ConnsReader, ConnsWriter, ConnsChannel,
+ ConnsOther, MsgIndexProc, MgmtDbProc, Plugins | QueueSupsStats] =
[aggregate(Names, Sums, memory, fun (X) -> X end)
- || Names <- distinguished_interesting_sups()],
+ || Names <- distinguished_interesting_sups() ++ QueueStatsSups],
+
+
+ {QueuesEtsStatsKeys, QueueStatsEtsNames} = rabbit_queue_type:queue_vm_ets(),
+
+ QueuesEtsStats = lists:map(fun ets_memory/1, QueueStatsEtsNames),
MnesiaETS = mnesia_memory(),
MsgIndexETS = ets_memory(msg_stores()),
MetricsETS = ets_memory([rabbit_metrics]),
- QuorumETS = ets_memory([ra_log_ets]),
MetricsProc = try
[{_, M}] = process_info(whereis(rabbit_metrics), [memory]),
M
@@ -63,23 +81,20 @@ memory() ->
OtherProc = Processes
- ConnsReader - ConnsWriter - ConnsChannel - ConnsOther
- - Qs - Qqs - DlxWorkers - Ssqs - Srqs - SCoor - MsgIndexProc - Plugins
+ - lists:sum(QueueSupsStats) - MsgIndexProc - Plugins
- MgmtDbProc - MetricsProc - MetadataStoreProc,
+
[
%% Connections
{connection_readers, ConnsReader},
{connection_writers, ConnsWriter},
{connection_channels, ConnsChannel},
- {connection_other, ConnsOther},
+ {connection_other, ConnsOther}] ++
%% Queues
- {queue_procs, Qs},
- {quorum_queue_procs, Qqs},
- {quorum_queue_dlx_procs, DlxWorkers},
- {stream_queue_procs, Ssqs},
- {stream_queue_replica_reader_procs, Srqs},
- {stream_queue_coordinator_procs, SCoor},
+ lists:zip(QueueSupsStatsKeys, QueueSupsStats) ++
+ [
%% Processes
{plugins, Plugins},
{metadata_store, MetadataStoreProc},
@@ -87,13 +102,16 @@ memory() ->
%% Metrics
{metrics, MetricsETS + MetricsProc},
- {mgmt_db, MgmtDbETS + MgmtDbProc},
+ {mgmt_db, MgmtDbETS + MgmtDbProc}] ++
%% ETS
+ %% queues
+ lists:zip(QueuesEtsStatsKeys, QueuesEtsStats) ++
+
+ [
{mnesia, MnesiaETS},
- {quorum_ets, QuorumETS},
{metadata_store_ets, MetadataStoreETS},
- {other_ets, ETS - MnesiaETS - MetricsETS - MgmtDbETS - MsgIndexETS - QuorumETS - MetadataStoreETS},
+ {other_ets, ETS - MnesiaETS - MetricsETS - MgmtDbETS - MsgIndexETS - MetadataStoreETS - lists:sum(QueuesEtsStats)},
%% Messages (mostly, some binaries are not messages)
{binary, Bin},
@@ -110,6 +128,7 @@ memory() ->
{rss, Rss},
{allocated, Allocated}]}
].
+
%% [1] - erlang:memory(processes) can be less than the sum of its
%% parts. Rather than display something nonsensical, just silence any
%% claims about negative memory. See
@@ -118,7 +137,9 @@ memory() ->
-spec binary() -> rabbit_types:infos().
binary() ->
- All = interesting_sups(),
+ {QueueSupsStatsKeys, QueueStatsSups} = rabbit_queue_type:queue_vm_stats_sups(),
+
+ All = interesting_sups() ++ QueueStatsSups,
{Sums, Rest} =
sum_processes(
lists:append(All),
@@ -127,10 +148,10 @@ binary() ->
sets:add_element({Ptr, Sz}, Acc0)
end, Acc, Info)
end, distinguishers(), [{binary, sets:new()}]),
- [Other, Qs, Qqs, DlxWorkers, Ssqs, Srqs, Scoor, ConnsReader, ConnsWriter,
- ConnsChannel, ConnsOther, MsgIndexProc, MgmtDbProc, Plugins] =
+ [Other, ConnsReader, ConnsWriter,
+ ConnsChannel, ConnsOther, MsgIndexProc, MgmtDbProc, Plugins | QueueSupsStats] =
[aggregate(Names, [{other, Rest} | Sums], binary, fun sum_binary/1)
- || Names <- [[other] | distinguished_interesting_sups()]],
+ || Names <- [[other] | distinguished_interesting_sups()] ++ QueueStatsSups],
MetadataStoreProc = try
[{_, B}] = process_info(whereis(rabbit_khepri:get_ra_cluster_name()), [binary]),
lists:foldl(fun({_, Sz, _}, Acc) ->
@@ -143,13 +164,10 @@ binary() ->
[{connection_readers, ConnsReader},
{connection_writers, ConnsWriter},
{connection_channels, ConnsChannel},
- {connection_other, ConnsOther},
- {queue_procs, Qs},
- {quorum_queue_procs, Qqs},
- {quorum_queue_dlx_procs, DlxWorkers},
- {stream_queue_procs, Ssqs},
- {stream_queue_replica_reader_procs, Srqs},
- {stream_queue_coordinator_procs, Scoor},
+ {connection_other, ConnsOther}] ++
+ %% Queues
+ lists:zip(QueueSupsStatsKeys, QueueSupsStats) ++
+ [
{metadata_store, MetadataStoreProc},
{plugins, Plugins},
{mgmt_db, MgmtDbProc},
@@ -194,19 +212,7 @@ bytes(Words) -> try
end.
interesting_sups() ->
- [queue_sups(), quorum_sups(), dlx_sups(),
- stream_server_sups(), stream_reader_sups(), stream_coordinator(),
- conn_sups() | interesting_sups0()].
-
-queue_sups() ->
- all_vhosts_children(rabbit_amqqueue_sup_sup).
-
-quorum_sups() -> [ra_server_sup_sup].
-
-dlx_sups() -> [rabbit_fifo_dlx_sup].
-stream_server_sups() -> [osiris_server_sup].
-stream_reader_sups() -> [osiris_replica_reader_sup].
-stream_coordinator() -> [rabbit_stream_coordinator].
+ [conn_sups() | interesting_sups0()].
msg_stores() ->
all_vhosts_children(msg_store_transient)
@@ -256,12 +262,6 @@ distinguishers() -> with(conn_sups(), fun conn_type/1).
distinguished_interesting_sups() ->
[
- queue_sups(),
- quorum_sups(),
- dlx_sups(),
- stream_server_sups(),
- stream_reader_sups(),
- stream_coordinator(),
with(conn_sups(), reader),
with(conn_sups(), writer),
with(conn_sups(), channel),
diff --git a/deps/rabbit/test/queue_utils.erl b/deps/rabbit/test/queue_utils.erl
index cbd3d1555a93..15e274686c8a 100644
--- a/deps/rabbit/test/queue_utils.erl
+++ b/deps/rabbit/test/queue_utils.erl
@@ -2,6 +2,8 @@
-include_lib("eunit/include/eunit.hrl").
+-include_lib("rabbit_common/include/rabbit.hrl").
+
-export([
wait_for_messages_ready/3,
wait_for_messages_pending_ack/3,
diff --git a/deps/rabbit/test/rabbit_stream_queue_SUITE.erl b/deps/rabbit/test/rabbit_stream_queue_SUITE.erl
index d9ff47230b6c..419625c1986f 100644
--- a/deps/rabbit/test/rabbit_stream_queue_SUITE.erl
+++ b/deps/rabbit/test/rabbit_stream_queue_SUITE.erl
@@ -550,10 +550,10 @@ add_replica(Config) ->
?assertEqual({error, node_not_running},
rpc:call(Server0, rabbit_stream_queue, add_replica,
[<<"/">>, Q, Server1])),
- ?assertEqual({error, classic_queue_not_supported},
+ ?assertEqual({error, {queue_not_supported, rabbit_classic_queue}},
rpc:call(Server0, rabbit_stream_queue, add_replica,
[<<"/">>, QClassic, Server1])),
- ?assertEqual({error, quorum_queue_not_supported},
+ ?assertEqual({error, {queue_not_supported, rabbit_quorum_queue}},
rpc:call(Server0, rabbit_stream_queue, add_replica,
[<<"/">>, QQuorum, Server1])),
@@ -561,10 +561,10 @@ add_replica(Config) ->
ok = rabbit_control_helper:command(join_cluster, Server1, [atom_to_list(Server0)], []),
rabbit_control_helper:command(start_app, Server1),
timer:sleep(1000),
- ?assertEqual({error, classic_queue_not_supported},
+ ?assertEqual({error, {queue_not_supported, rabbit_classic_queue}},
rpc:call(Server0, rabbit_stream_queue, add_replica,
[<<"/">>, QClassic, Server1])),
- ?assertEqual({error, quorum_queue_not_supported},
+ ?assertEqual({error, {queue_not_supported, rabbit_quorum_queue}},
rpc:call(Server0, rabbit_stream_queue, add_replica,
[<<"/">>, QQuorum, Server1])),
?assertEqual(ok,
@@ -748,10 +748,10 @@ delete_classic_replica(Config) ->
?assertEqual({'queue.declare_ok', Q, 0, 0},
declare(Config, Server0, Q, [{<<"x-queue-type">>, longstr, <<"classic">>}])),
%% Not a member of the cluster, what would happen?
- ?assertEqual({error, classic_queue_not_supported},
+ ?assertEqual({error, {queue_not_supported, rabbit_classic_queue}},
rpc:call(Server0, rabbit_stream_queue, delete_replica,
[<<"/">>, Q, 'zen@rabbit'])),
- ?assertEqual({error, classic_queue_not_supported},
+ ?assertEqual({error, {queue_not_supported, rabbit_classic_queue}},
rpc:call(Server0, rabbit_stream_queue, delete_replica,
[<<"/">>, Q, Server1])),
rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_testcase_queue, [Q]).
@@ -763,10 +763,10 @@ delete_quorum_replica(Config) ->
?assertEqual({'queue.declare_ok', Q, 0, 0},
declare(Config, Server0, Q, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
%% Not a member of the cluster, what would happen?
- ?assertEqual({error, quorum_queue_not_supported},
+ ?assertEqual({error, {queue_not_supported, rabbit_quorum_queue}},
rpc:call(Server0, rabbit_stream_queue, delete_replica,
[<<"/">>, Q, 'zen@rabbit'])),
- ?assertEqual({error, quorum_queue_not_supported},
+ ?assertEqual({error, {queue_not_supported, rabbit_quorum_queue}},
rpc:call(Server0, rabbit_stream_queue, delete_replica,
[<<"/">>, Q, Server1])),
rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_testcase_queue, [Q]).
diff --git a/deps/rabbit_common/src/rabbit_core_metrics.erl b/deps/rabbit_common/src/rabbit_core_metrics.erl
index e6a6ed12b8e0..781d7743af23 100644
--- a/deps/rabbit_common/src/rabbit_core_metrics.erl
+++ b/deps/rabbit_common/src/rabbit_core_metrics.erl
@@ -36,7 +36,10 @@
queue_declared/1,
queue_created/1,
queue_deleted/1,
- queues_deleted/1]).
+ queues_deleted/1,
+ %% used by ra-based queues to cleanup follower metrics,
+ %% see rabbit_core_metrics_gc for an example
+ delete_queue_coarse_metrics/1]).
-export([node_stats/2]).
@@ -321,10 +324,14 @@ partition_queues(Queues) ->
[Queues].
delete_queue_metrics(Queue) ->
- ets:delete(queue_coarse_metrics, Queue),
+ delete_queue_coarse_metrics(Queue),
ets:update_element(queue_metrics, Queue, {3, 1}),
ok.
+delete_queue_coarse_metrics(Queue) ->
+ ets:delete(queue_coarse_metrics, Queue),
+ ok.
+
delete_channel_queue_exchange_metrics(MatchSpecCondition) ->
ChannelQueueExchangeMetricsToUpdate = ets:select(
channel_queue_exchange_metrics,
diff --git a/deps/rabbit_common/src/rabbit_registry.erl b/deps/rabbit_common/src/rabbit_registry.erl
index 7cdc23dd87a8..4699fddc2019 100644
--- a/deps/rabbit_common/src/rabbit_registry.erl
+++ b/deps/rabbit_common/src/rabbit_registry.erl
@@ -15,7 +15,7 @@
code_change/3]).
-export([register/3, unregister/2,
- binary_to_type/1, lookup_module/2, lookup_all/1]).
+ binary_to_type/1, lookup_module/2, lookup_type_module/2, lookup_type_name/2, lookup_all/1]).
-define(SERVER, ?MODULE).
-define(ETS_NAME, ?MODULE).
@@ -61,6 +61,61 @@ lookup_module(Class, T) when is_atom(T) ->
{error, not_found}
end.
+
+-spec lookup_type_module(Class, TypeDescriptor) ->
+ Ret when
+ Class :: atom(),
+ TypeDescriptor :: atom() | %% can be TypeModule or Type
+ binary(), %% or whati currently called "alias" - a TypeName
+ Ret :: {ok, TypeModule} | {error, not_found},
+ TypeModule :: atom().
+lookup_type_module(Class, TypeDescriptor) ->
+ case lookup_type(Class, TypeDescriptor) of
+ {error, _} = Error ->
+ Error;
+ {ok, {_TypeName, TypeModule}} ->
+ {ok, TypeModule}
+ end.
+
+-spec lookup_type_name(Class, TypeDescriptor) ->
+ Ret when
+ Class :: atom(),
+ TypeDescriptor :: atom() | %% either full typemodule or atomized typename
+ binary(), %% typename pr typemodule in binary
+ Ret :: {ok, binary()} | {error, not_found}.
+lookup_type_name(Class, TypeDescriptor) ->
+ case lookup_type(Class, TypeDescriptor) of
+ {error, _} = Error ->
+ Error;
+ {ok, {TypeName, _TypeModule}} ->
+ {ok, atom_to_binary(TypeName)}
+ end.
+
+lookup_type(Class, TypeDescriptor)
+ when is_atom(TypeDescriptor) ->
+ case ets:lookup(?ETS_NAME, {Class, TypeDescriptor}) of
+ [{_, Module}] ->
+ {ok, {TypeDescriptor, Module}};
+ [] ->
+ %% In principle it is enough to do the same sanity check
+ %% we do when registring a type.
+ %% This however will return false positives for loaded
+ %% but unregistered modules.
+ TMMatch = ets:match(?ETS_NAME, {{Class, '$1'}, TypeDescriptor}),
+ case TMMatch of
+ [[TypeName]] -> {ok, {TypeName, TypeDescriptor}};
+ [] ->
+ {error, not_found}
+ end
+ end;
+lookup_type(Class, TypeDescriptor)
+ when is_binary(TypeDescriptor) ->
+ %% when we register a type we convert
+ %% typename to atom so we can lookup
+ %% only existing atoms.
+ lookup_type(Class, binary_to_existing_atom(TypeDescriptor)).
+
+
lookup_all(Class) ->
[{K, V} || [K, V] <- ets:match(?ETS_NAME, {{Class, '$1'}, '$2'})].
diff --git a/deps/rabbitmq_cli/lib/rabbitmq/cli/ctl/commands/delete_queue_command.ex b/deps/rabbitmq_cli/lib/rabbitmq/cli/ctl/commands/delete_queue_command.ex
index f8cdb87603a4..ccc4ba55f715 100644
--- a/deps/rabbitmq_cli/lib/rabbitmq/cli/ctl/commands/delete_queue_command.ex
+++ b/deps/rabbitmq_cli/lib/rabbitmq/cli/ctl/commands/delete_queue_command.ex
@@ -9,13 +9,13 @@ defmodule RabbitMQ.CLI.Ctl.Commands.DeleteQueueCommand do
@behaviour RabbitMQ.CLI.CommandBehaviour
- def switches(), do: [if_empty: :boolean, if_unused: :boolean, timeout: :integer]
+ def switches(), do: [if_empty: :boolean, if_unused: :boolean, force: :boolean, timeout: :integer]
def aliases(), do: [e: :if_empty, u: :if_unused, t: :timeout]
def merge_defaults(args, opts) do
{
args,
- Map.merge(%{if_empty: false, if_unused: false, vhost: "/"}, opts)
+ Map.merge(%{if_empty: false, if_unused: false, force: false, vhost: "/"}, opts)
}
end
@@ -46,27 +46,39 @@ defmodule RabbitMQ.CLI.Ctl.Commands.DeleteQueueCommand do
vhost: vhost,
if_empty: if_empty,
if_unused: if_unused,
+ force: force,
timeout: timeout
}) do
## Generate queue resource name from queue name and vhost
queue_resource = :rabbit_misc.r(vhost, :queue, qname)
+ user = if force, do: RabbitMQ.CLI.Common.internal_user, else: "cli_user"
## Lookup a queue on broker node using resource name
case :rabbit_misc.rpc_call(node, :rabbit_amqqueue, :lookup, [queue_resource]) do
{:ok, queue} ->
## Delete queue
- :rabbit_misc.rpc_call(
- node,
- :rabbit_amqqueue,
- :delete_with,
- [queue, if_unused, if_empty, "cli_user"],
- timeout
- )
+ case :rabbit_misc.rpc_call(node,
+ :rabbit_amqqueue,
+ :delete_with,
+ [queue, if_unused, if_empty, user],
+ timeout
+ ) do
+ {:ok, _} = ok -> ok
+
+ {:badrpc, {:EXIT, {:amqp_error, :resource_locked, _, :none}}} ->
+ {:error, :protected}
+
+ other_error -> other_error
+ end
{:error, _} = error ->
error
end
end
+ def output({:error, :protected}, _options) do
+ {:error, RabbitMQ.CLI.Core.ExitCodes.exit_usage(), "Queue is protected from deletion"}
+ end
+
def output({:error, :not_found}, _options) do
{:error, RabbitMQ.CLI.Core.ExitCodes.exit_usage(), "Queue not found"}
end
@@ -103,14 +115,15 @@ defmodule RabbitMQ.CLI.Ctl.Commands.DeleteQueueCommand do
Enum.join(Enum.concat([if_empty_str, if_unused_str]), "and ") <> "..."
end
- def usage(), do: "delete_queue [--vhost Message counts. Note that some messages can be in memory and on disk at the same time.',
'queue-messages-stream':
- ' Approximate message counts. Note that streams store some entries that are not user messages such as offset tracking data which is included in this count. Thus this value will never be completely correct. Approximate message counts. Note that streams store some entries that are not user messages such as offset tracking data which is included in this count. Thus this value will never be completely correct. The sum total of the sizes of the message bodies in this queue. This only counts message bodies; it does not include message properties (including headers) or metadata used by the queue. Note that some messages can be in memory and on disk at the same time. For classic queues, if a message larger than \
Each channel can have a prefetch count. The prefetch is the number of messages that will be held \
by the client. Setting a value of 0 will result in an unlimited prefetch. \
@@ -314,7 +317,7 @@ var HELP = {
',
'file-descriptors':
- ' File descriptor count and limit, as reported by the operating \
+ ' File descriptor count and limit, as reported by the operating \
system. The count includes network sockets and file handles. To optimize disk access RabbitMQ uses as many file descriptors as \
needed, so the limit must be high enough for safe operation.
(Sets the "alternate-exchange" argument.)',
+ 'If messages to this exchange cannot otherwise be routed, send them to the alternate exchange named here.
(Sets the "alternate-exchange" argument.)',
'queue-message-ttl':
'How long a message published to a queue can live before it is discarded (milliseconds).
(Sets the "x-message-ttl" argument.)',
@@ -199,76 +199,79 @@ var HELP = {
'If a consumer does not ack its delivery for more than the timeout value (30 minutes by default), its channel will be closed with a PRECONDITION_FAILED channel exception.',
'queue-expires':
- 'How long a queue can be unused for before it is automatically deleted (milliseconds).
(Sets the "x-expires" argument.)',
+ 'How long a queue can be unused for before it is automatically deleted (milliseconds).
(Sets the "x-expires" argument.)',
'queue-max-length':
- 'How many (ready) messages a queue can contain before it starts to drop them from its head.
(Sets the "x-max-length" argument.)',
+ 'How many (ready) messages a queue can contain before it starts to drop them from its head.
(Sets the "x-max-length" argument.)',
'queue-max-length-bytes':
- 'Total body size for ready messages a queue can contain before it starts to drop them from its head.
(Sets the "x-max-length-bytes" argument.)',
+ 'Total body size for ready messages a queue can contain before it starts to drop them from its head.
(Sets the "x-max-length-bytes" argument.)',
'queue-max-age':
- 'How long a message published to a stream queue can live before it is discarded.',
+ 'How long a message published to a stream queue can live before it is discarded.',
'queue-stream-filter-size-bytes':
- 'Size of the filter data attached to each stream chunk.
(Sets the x-stream-filter-size-bytes argument.)',
+ 'Size of the filter data attached to each stream chunk.
(Sets the x-stream-filter-size-bytes argument.)',
'queue-auto-delete':
- 'If yes, the queue will delete itself after at least one consumer has connected, and then all consumers have disconnected.',
+ 'If yes, the queue will delete itself after at least one consumer has connected, and then all consumers have disconnected.',
'queue-dead-letter-exchange':
- 'Optional name of an exchange to which messages will be republished if they are rejected or expire.
(Sets the "x-dead-letter-exchange" argument.)',
+ 'Optional name of an exchange to which messages will be republished if they are rejected or expire.
(Sets the "x-dead-letter-exchange" argument.)',
'queue-dead-letter-routing-key':
- 'Optional replacement routing key to use when a message is dead-lettered. If this is not set, the message\'s original routing key will be used.
(Sets the "x-dead-letter-routing-key" argument.)',
+ 'Optional replacement routing key to use when a message is dead-lettered. If this is not set, the message\'s original routing key will be used.
(Sets the "x-dead-letter-routing-key" argument.)',
'queue-dead-letter-strategy':
- 'Valid values are at-most-once or at-least-once. It defaults to at-most-once. This setting is understood only by quorum queues. If at-least-once is set, Overflow behaviour must be set to reject-publish. Otherwise, dead letter strategy will fall back to at-most-once.',
+ 'Valid values are at-most-once or at-least-once. It defaults to at-most-once. If at-least-once is set, Overflow behaviour must be set to reject-publish. Otherwise, dead letter strategy will fall back to at-most-once.',
'queue-single-active-consumer':
- 'If set, makes sure only one consumer at a time consumes from the queue and fails over to another registered consumer in case the active one is cancelled or dies.
(Sets the "x-single-active-consumer" argument.)',
+ 'If set, makes sure only one consumer at a time consumes from the queue and fails over to another registered consumer in case the active one is cancelled or dies.
(Sets the "x-single-active-consumer" argument.)',
'queue-max-priority':
- 'Maximum number of priority levels for the queue to support; if not set, the queue will not support message priorities.
(Sets the "x-max-priority" argument.)',
+ 'Maximum number of priority levels for the queue to support; if not set, the queue will not support message priorities.
(Sets the "x-max-priority" argument.)',
'queue-max-age':
- 'Sets the data retention for stream queues in time units (Y=Years, M=Months, D=Days, h=hours, m=minutes, s=seconds).
E.g. "1h" configures the stream to only keep the last 1 hour of received messages.(Sets the x-max-age argument.)',
+ 'Sets the data retention for stream queues in time units (Y=Years, M=Months, D=Days, h=hours, m=minutes, s=seconds).
E.g. "1h" configures the stream to only keep the last 1 hour of received messages.(Sets the x-max-age argument.)',
'queue-overflow':
- 'Sets the queue overflow behaviour. This determines what happens to messages when the maximum length of a queue is reached. Valid values are drop-head, reject-publish or reject-publish-dlx. The quorum queue type only supports drop-head and reject-publish.',
+ 'Sets the queue overflow behaviour. This determines what happens to messages when the maximum length of a queue is reached. Valid values are drop-head, reject-publish or reject-publish-dlx',
+
+ 'quorum-queue-overflow':
+ ' Sets the queue overflow behaviour. This determines what happens to messages when the maximum length of a queue is reached. Valid values for quorum queues are drop-head and reject-publish.',
'queue-master-locator':
- 'Deprecated: please use `queue-leader-locator` instead. Controls which node the queue will be running on.',
+ 'Deprecated: please use `queue-leader-locator` instead. Controls which node the queue will be running on.',
'queue-leader-locator':
- 'Set the rule by which the queue leader is located when declared on a cluster of nodes. Valid values are client-local (default) and balanced.',
+ 'Set the rule by which the queue leader is located when declared on a cluster of nodes. Valid values are client-local (default) and balanced.',
'queue-initial-cluster-size':
- 'Set the queue initial cluster size.',
+ 'Set the queue initial cluster size.',
'queue-type':
- 'Set the queue type, determining the type of queue to use: raft-based high availability or classic queue. Valid values are quorum or classic. It defaults to classic.
',
+ 'Set the queue type, determining the type of queue to use: raft-based high availability or classic queue. Valid values are quorum or classic. It defaults to classic.
',
'queue-messages':
'at-least-once.',
+ 'Applies to messages dead-lettered with dead-letter-strategy at-least-once.',
'queue-delivery-limit':
- 'The number of times a message can be returned to this queue before it is dead-lettered (if configured) or dropped.',
+ 'The number of times a message can be returned to this queue before it is dead-lettered (if configured) or dropped.',
'queue-message-body-bytes':
'queue_index_embed_msgs_below (4KB by default) is routed to multiple queues, its body will be stored only once and shared between queues. The value shown here does not take this optimization into account. \
',
'internal-users-only':
- 'Only users within the internal RabbitMQ database are shown here. Other users (e.g. those authenticated over LDAP) will not appear.',
+ 'Only users within the internal RabbitMQ database are shown here. Other users (e.g. those authenticated over LDAP) will not appear.',
'export-definitions':
'The definitions consist of users, virtual hosts, permissions, parameters, exchanges, queues, policies and bindings. They do not include the contents of queues. Exclusive queues will not be exported.',
@@ -285,19 +288,19 @@ var HELP = {
'The definitions exported for a single virtual host consist of exchanges, queues, bindings and policies.',
'import-definitions':
- 'The definitions that are imported will be merged with the current definitions. If an error occurs during import, any changes made will not be rolled back.',
+ 'The definitions that are imported will be merged with the current definitions. If an error occurs during import, any changes made will not be rolled back.',
'import-definitions-vhost':
'For a single virtual host, only exchanges, queues, bindings and policies are imported.',
'exchange-rates-incoming':
- 'The incoming rate is the rate at which messages are published directly to this exchange.',
+ 'The incoming rate is the rate at which messages are published directly to this exchange.',
'exchange-rates-outgoing':
- 'The outgoing rate is the rate at which messages enter queues, having been published directly to this exchange.',
+ 'The outgoing rate is the rate at which messages enter queues, having been published directly to this exchange.',
'channel-mode':
- 'Channel guarantee mode. Can be one of the following, or neither:
\
+ 'Channel guarantee mode. Can be one of the following, or neither:
\
\
',
'channel-prefetch':
- 'Channel prefetch count.\
+ 'Channel prefetch count.\
The memory \
+ ' The memory \
alarm for this node has gone off. It will block \
incoming network traffic until the memory usage drops below \
the watermark.
Clicking "Get Message(s)" will consume messages from the queue. \ + '
Clicking "Get Message(s)" will consume messages from the queue. \ If requeue is set the message will be put back into the queue in place, \ but "redelivered" will be set on the message.
\If requeue is not set messages will be removed from the queue.
\Furthermore, message payloads will be truncated to 50000 bytes.
', 'message-publish-headers': - 'Headers can have any name. Only long string headers can be set here.', + 'Headers can have any name. Only long string headers can be set here.', 'message-publish-properties': - 'You can set other message properties here (delivery mode and headers \ + '
You can set other message properties here (delivery mode and headers \ are pulled out as the most common cases).
\Invalid properties will be ignored. Valid properties are:
\- Binary statistics not available. -
++ Binary statistics not available. +
<% } else { %> -<% - var sections = {'queue_procs' : ['classic', 'Classic queues'], - 'quorum_queue_procs' : ['quorum', 'Quorum queues'], - 'quorum_queue_dlx_procs' : ['quorum', 'Dead letter workers'], - 'stream_queue_procs' : ['stream', 'Stream queues'], - 'stream_queue_replica_reader_procs' : ['stream', 'Stream queues (replica reader)'], - 'stream_queue_coordinator_procs' : ['stream', 'Stream queues (coordinator)'], - 'connection_readers' : ['conn', 'Connection readers'], - 'connection_writers' : ['conn', 'Connection writers'], - 'connection_channels' : ['conn', 'Connection channels'], - 'connection_other' : ['conn', 'Connections (other)'], - 'msg_index' : ['table', 'Message store index'], - 'mgmt_db' : ['table', 'Management database'], - 'plugins' : ['proc', 'Plugins'], - 'metadata_store' : ['metadata_store', 'Metadata store'], - 'other' : ['system', 'Other binary references']}; - var total_out = []; -%> -<%= format('memory-bar', {sections: sections, memory: binary, total_out: total_out}) %> - -| State | +<%= fmt_object_state(queue) %> | +
|---|---|
| Consumers | +<%= fmt_string(queue.consumers) %> | +
| Consumers | +<%= fmt_string(queue.consumer_details.length) %> | +
| Consumer capacity | +<%= fmt_percent(queue.consumer_capacity) %> | +
| Publishers | +<%= fmt_string(queue.publishers) %> | +
| + | Total | +Ready | +Unacked | +In memory | +Persistent | +Transient, Paged Out | +
|---|---|---|---|---|---|---|
| + Messages + + | ++ <%= fmt_num_thousands(queue.messages) %> + | ++ <%= fmt_num_thousands(queue.messages_ready) %> + | ++ <%= fmt_num_thousands(queue.messages_unacknowledged) %> + | ++ <%= fmt_num_thousands(queue.messages_ram) %> + | ++ <%= fmt_num_thousands(queue.messages_persistent) %> + | ++ <%= fmt_num_thousands(queue.messages_paged_out) %> + | +
| + Message body bytes + + | ++ <%= fmt_bytes(queue.message_bytes) %> + | ++ <%= fmt_bytes(queue.message_bytes_ready) %> + | ++ <%= fmt_bytes(queue.message_bytes_unacknowledged) %> + | ++ <%= fmt_bytes(queue.message_bytes_ram) %> + | ++ <%= fmt_bytes(queue.message_bytes_persistent) %> + | ++ <%= fmt_bytes(queue.message_bytes_paged_out) %> + | +
| + Process memory + + | +<%= fmt_bytes(queue.memory) %> | +
- Memory statistics not available. -
++ Memory statistics not available. +
<% } else { %> -<% - var sections = {'queue_procs' : ['classic', 'Classic queues'], - 'quorum_queue_procs' : ['quorum', 'Quorum queues'], - 'quorum_queue_dlx_procs' : ['quorum', 'Dead letter workers'], - 'stream_queue_procs' : ['stream', 'Stream queues'], - 'stream_queue_replica_reader_procs' : ['stream', 'Stream queues (replica reader)'], - 'stream_queue_coordinator_procs' : ['stream', 'Stream queues (coordinator)'], - 'binary' : ['binary', 'Binaries'], - 'connection_readers' : ['conn', 'Connection readers'], - 'connection_writers' : ['conn', 'Connection writers'], - 'connection_channels' : ['conn', 'Connection channels'], - 'connection_other' : ['conn', 'Connections (other)'], - 'mnesia' : ['table', 'Mnesia'], - 'msg_index' : ['table', 'Message store index'], - 'mgmt_db' : ['table', 'Management database'], - 'quorum_ets' : ['table', 'Quorum queue ETS tables'], - 'other_ets' : ['table', 'Other ETS tables'], - 'plugins' : ['proc', 'Plugins'], - 'other_proc' : ['proc', 'Other process memory'], - 'code' : ['system', 'Code'], - 'atom' : ['system', 'Atoms'], - 'other_system' : ['system', 'Other system'], - 'allocated_unused' : ['unused', 'Allocated unused'], - 'reserved_unallocated': ['unused', 'Unallocated reserved by the OS']}; -%> -<%= format('memory-bar', {sections: sections, memory: memory, total_out: []}) %> + <%= format('memory-bar', {sections: MEMORY_STATISTICS.sections, memory: memory, total_out: []}) %>| Queues [Classic] | -- Auto expire | - Max length | - Max length bytes | - Message TTL | - | - Length limit overflow behaviour - | -||
| Queues [Quorum] | -
- Delivery limit
- |
- Auto expire |
- Max in-memory bytes |
- Max in-memory length - Max length | - Max length bytes | - Message TTL - | - Target group size | - Length limit overflow behaviour - |
- ||
| Queues [Streams] | Max length bytes diff --git a/deps/rabbitmq_management/priv/www/js/tmpl/queue.ejs b/deps/rabbitmq_management/priv/www/js/tmpl/queue.ejs index c605b8b68019..9cbdcc0e412d 100644 --- a/deps/rabbitmq_management/priv/www/js/tmpl/queue.ejs +++ b/deps/rabbitmq_management/priv/www/js/tmpl/queue.ejs @@ -37,212 +37,12 @@ | ||
| Leader | - <% } else { %> -Node | - <% } %> - <% if (queue.leader) { %> -<%= fmt_node(queue.leader) %> | - <% } else { %> -<%= fmt_node(queue.node) %> | - <% } %> -
|---|---|---|---|
| Online | -
- <%
- for (var i in queue.online) {
- %>
- <%= fmt_node(queue.online[i]) %>
- - <% } %> - |
- Members | -
- <%
- for (var i in queue.members) {
- %>
- <%= fmt_node(queue.members[i]) %>
- - <% } %> - |
-
- <% } %>
+ <%= format(QUEUE_TYPE(queue).tmpl.node_details, {queue: queue}) %>
<% } %>
| State | -<%= fmt_object_state(queue) %> | -
|---|---|
| Consumers | -<%= fmt_string(queue.consumers) %> | -
| Consumers | -<%= fmt_string(queue.consumer_details.length) %> | -
| Consumer capacity | -<%= fmt_percent(queue.consumer_capacity) %> | -
| Publishers | -<%= fmt_string(queue.publishers) %> | -
| Open files | -<%= fmt_table_short(queue.open_files) %> | -
| Delivery limit | -<%= fmt_string(queue.delivery_limit) %> | -
| Readers | -<%= fmt_table_short(queue.readers) %> | -
| Segments | -<%= fmt_string(queue.segments) %> | -
| - | Total | - <% if (!is_stream(queue)) { %> -Ready | -Unacked | - <% } %> - <% if (is_quorum(queue)) { %> -High priority | -Normal priority | -Returned | -Dead-lettered - - | - <% } %> - <% if (is_classic(queue)) { %> -In memory | -Persistent | -Transient | - <% } %> -
|---|---|---|---|---|---|---|---|---|---|---|
| - Messages - <% if (is_stream(queue)) { %> - - <% } else { %> - - <% } %> - | -- <%= fmt_num_thousands(queue.messages) %> - | - <% if (!is_stream(queue)) { %> -- <%= fmt_num_thousands(queue.messages_ready) %> - | -- <%= fmt_num_thousands(queue.messages_unacknowledged) %> - | - <% } %> - <% if (is_quorum(queue)) { %> -- <%= fmt_num_thousands(queue.messages_ready_high) %> - | -- <%= fmt_num_thousands(queue.messages_ready_normal) %> - | -- <%= fmt_num_thousands(queue.messages_ready_returned) %> - | -- <%= fmt_num_thousands(queue.messages_dlx) %> - | - <% } %> - <% if (is_classic(queue)) { %> -- <%= fmt_num_thousands(queue.messages_ram) %> - | -- <%= fmt_num_thousands(queue.messages_persistent) %> - | -- <%= fmt_num_thousands(queue.messages_paged_out) %> - | - <% } %> -
| - Message body bytes - - | -- <%= fmt_bytes(queue.message_bytes) %> - | -- <%= fmt_bytes(queue.message_bytes_ready) %> - | -- <%= fmt_bytes(queue.message_bytes_unacknowledged) %> - | - <% } %> - <% if (is_quorum(queue)) { %> -- | -- | -- | -- <%= fmt_bytes(queue.message_bytes_dlx) %> - | - <% } %> - <% if (is_classic(queue)) { %> -- <%= fmt_bytes(queue.message_bytes_ram) %> - | -- <%= fmt_bytes(queue.message_bytes_persistent) %> - | -- <%= fmt_bytes(queue.message_bytes_paged_out) %> - | - <% } %> -
| - Process memory - - | -<%= fmt_bytes(queue.memory) %> | -
| State | +<%= fmt_object_state(queue) %> | +
|---|---|
| Consumers | +<%= fmt_string(queue.consumers) %> | +
| Consumers | +<%= fmt_string(queue.consumer_details.length) %> | +
| Publishers | +<%= fmt_string(queue.publishers) %> | +
| Open files | +<%= fmt_table_short(queue.open_files) %> | +
| Delivery limit | +<%= fmt_string(queue.delivery_limit) %> | +
| + | Total | +Ready | +Unacked | +High priority | +Normal priority | +Returned | +Dead-lettered + + | +
|---|---|---|---|---|---|---|---|
| + Messages + + | ++ <%= fmt_num_thousands(queue.messages) %> + | ++ <%= fmt_num_thousands(queue.messages_ready) %> + | ++ <%= fmt_num_thousands(queue.messages_unacknowledged) %> + | ++ <%= fmt_num_thousands(queue.messages_ready_high) %> + | ++ <%= fmt_num_thousands(queue.messages_ready_normal) %> + | ++ <%= fmt_num_thousands(queue.messages_ready_returned) %> + | ++ <%= fmt_num_thousands(queue.messages_dlx) %> + | +
| + Message body bytes + + | ++ <%= fmt_bytes(queue.message_bytes) %> + | ++ <%= fmt_bytes(queue.message_bytes_ready) %> + | ++ <%= fmt_bytes(queue.message_bytes_unacknowledged) %> + | ++ | ++ | ++ | ++ <%= fmt_bytes(queue.message_bytes_dlx) %> + | +
| + Process memory + + | +<%= fmt_bytes(queue.memory) %> | +
| State | +<%= fmt_object_state(queue) %> | +
|---|---|
| Consumers | +<%= fmt_string(queue.consumers) %> | +
| Consumers | +<%= fmt_string(queue.consumer_details.length) %> | +
| Publishers | +<%= fmt_string(queue.publishers) %> | +
| Readers | +<%= fmt_table_short(queue.readers) %> | +
| Segments | +<%= fmt_string(queue.segments) %> | +
| + | Total | +
|---|---|
| + Messages + + | ++ <%= fmt_num_thousands(queue.messages) %> + | +
| + Process memory + + | +<%= fmt_bytes(queue.memory) %> | +