diff --git a/deps/rabbit/src/amqqueue.erl b/deps/rabbit/src/amqqueue.erl index 4d95dc81908e..38c9065c657d 100644 --- a/deps/rabbit/src/amqqueue.erl +++ b/deps/rabbit/src/amqqueue.erl @@ -28,7 +28,8 @@ set_decorators/2, % exclusive_owner get_exclusive_owner/1, - get_leader/1, + get_leader_node/1, + get_nodes/1, % name (#resource) get_name/1, set_name/2, @@ -387,9 +388,21 @@ set_decorators(#amqqueue{} = Queue, Decorators) -> get_exclusive_owner(#amqqueue{exclusive_owner = Owner}) -> Owner. --spec get_leader(amqqueue_v2()) -> node(). +-spec get_leader_node(amqqueue_v2()) -> node() | none. -get_leader(#amqqueue{type = rabbit_quorum_queue, pid = {_, Leader}}) -> Leader. +get_leader_node(#amqqueue{pid = {_, Leader}}) -> Leader; +get_leader_node(#amqqueue{pid = none}) -> none; +get_leader_node(#amqqueue{pid = Pid}) -> node(Pid). + +-spec get_nodes(amqqueue_v2()) -> [node(),...]. + +get_nodes(Q) -> + case amqqueue:get_type_state(Q) of + #{nodes := Nodes} -> + Nodes; + _ -> + [get_leader_node(Q)] + end. % operator_policy diff --git a/deps/rabbit/src/rabbit_amqp_management.erl b/deps/rabbit/src/rabbit_amqp_management.erl index cc02a704939f..7e7fb84da6fa 100644 --- a/deps/rabbit/src/rabbit_amqp_management.erl +++ b/deps/rabbit/src/rabbit_amqp_management.erl @@ -463,20 +463,8 @@ encode_queue(Q, NumMsgs, NumConsumers) -> -spec queue_topology(amqqueue:amqqueue()) -> {Leader :: node() | none, Replicas :: [node(),...]}. queue_topology(Q) -> - Leader = case amqqueue:get_pid(Q) of - {_RaName, Node} -> - Node; - none -> - none; - Pid -> - node(Pid) - end, - Replicas = case amqqueue:get_type_state(Q) of - #{nodes := Nodes} -> - Nodes; - _ -> - [Leader] - end, + Leader = amqqueue:get_leader_node(Q), + Replicas = amqqueue:get_nodes(Q), {Leader, Replicas}. decode_exchange({map, KVList}) -> diff --git a/deps/rabbit/src/rabbit_amqqueue.erl b/deps/rabbit/src/rabbit_amqqueue.erl index b6e9ede763f7..27830791281e 100644 --- a/deps/rabbit/src/rabbit_amqqueue.erl +++ b/deps/rabbit/src/rabbit_amqqueue.erl @@ -37,7 +37,7 @@ -export([update/2, store_queue/1, update_decorators/2, policy_changed/2]). -export([emit_unresponsive/6, emit_unresponsive_local/5, is_unresponsive/2]). -export([is_match/2, is_in_virtual_host/2]). --export([is_replicated/1, is_exclusive/1, is_not_exclusive/1, is_dead_exclusive/1]). +-export([is_replicable/1, is_exclusive/1, is_not_exclusive/1, is_dead_exclusive/1]). -export([list_local_quorum_queues/0, list_local_quorum_queue_names/0, list_local_stream_queues/0, list_stream_queues_on/1, list_local_leaders/0, list_local_followers/0, get_quorum_nodes/1, @@ -150,11 +150,7 @@ filter_pid_per_type(QPids) -> -spec stop(rabbit_types:vhost()) -> 'ok'. stop(VHost) -> - %% Classic queues - ok = rabbit_amqqueue_sup_sup:stop_for_vhost(VHost), - {ok, BQ} = application:get_env(rabbit, backing_queue_module), - ok = BQ:stop(VHost), - rabbit_quorum_queue:stop(VHost). + rabbit_queue_type:stop(VHost). -spec start([amqqueue:amqqueue()]) -> 'ok'. @@ -424,6 +420,8 @@ rebalance(Type, VhostSpec, QueueSpec) -> %% We have not yet acquired the rebalance_queues global lock. maybe_rebalance(get_rebalance_lock(self()), Type, VhostSpec, QueueSpec). +%% TODO: classic queues do not support rebalancing, it looks like they are simply +%% filtered out with is_replicable(Q). Maybe error instead? maybe_rebalance({true, Id}, Type, VhostSpec, QueueSpec) -> rabbit_log:info("Starting queue rebalance operation: '~ts' for vhosts matching '~ts' and queues matching '~ts'", [Type, VhostSpec, QueueSpec]), @@ -431,7 +429,7 @@ maybe_rebalance({true, Id}, Type, VhostSpec, QueueSpec) -> NumRunning = length(Running), ToRebalance = [Q || Q <- list(), filter_per_type(Type, Q), - is_replicated(Q), + is_replicable(Q), is_match(amqqueue:get_vhost(Q), VhostSpec) andalso is_match(get_resource_name(amqqueue:get_name(Q)), QueueSpec)], NumToRebalance = length(ToRebalance), @@ -459,10 +457,20 @@ filter_per_type(stream, Q) -> filter_per_type(classic, Q) -> ?amqqueue_is_classic(Q). -rebalance_module(Q) when ?amqqueue_is_quorum(Q) -> - rabbit_quorum_queue; -rebalance_module(Q) when ?amqqueue_is_stream(Q) -> - rabbit_stream_queue. +%% TODO: note that it can return {error, not_supported}. +%% this will result in a badmatch. However that's fine +%% for now because the original function will fail with +%% bad clause if called with classical queue. +%% The assumption is all non-replicated queues +%% are filtered before calling this with is_replicable/0 +rebalance_module(Q) -> + case rabbit_queue_type:rebalance_module(Q) of + undefined -> + rabbit_log:error("Undefined rebalance module for queue type: ~s", [amqqueue:get_type(Q)]), + {error, not_supported}; + RBModule -> + RBModule + end. get_resource_name(#resource{name = Name}) -> Name. @@ -487,13 +495,19 @@ iterative_rebalance(ByNode, MaxQueuesDesired) -> maybe_migrate(ByNode, MaxQueuesDesired) -> maybe_migrate(ByNode, MaxQueuesDesired, maps:keys(ByNode)). +%% TODO: unfortunate part - UI bits mixed deep inside logic. +%% I will not be moving this inside queue type. Instead +%% an attempt to generate something more readable than +%% Other made. column_name(rabbit_classic_queue) -> <<"Number of replicated classic queues">>; column_name(rabbit_quorum_queue) -> <<"Number of quorum queues">>; column_name(rabbit_stream_queue) -> <<"Number of streams">>; -column_name(Other) -> Other. +column_name(TypeModule) -> + Alias = rabbit_queue_type:short_alias_of(TypeModule), + <<"Number of \"", Alias/binary, "\" queues">>. maybe_migrate(ByNode, _, []) -> - ByNodeAndType = maps:map(fun(_Node, Queues) -> maps:groups_from_list(fun({_, Q, _}) -> column_name(?amqqueue_v2_field_type(Q)) end, Queues) end, ByNode), + ByNodeAndType = maps:map(fun(_Node, Queues) -> maps:groups_from_list(fun({_, Q, _}) -> column_name(amqqueue:get_type(Q)) end, Queues) end, ByNode), CountByNodeAndType = maps:map(fun(_Node, Type) -> maps:map(fun (_, Qs)-> length(Qs) end, Type) end, ByNodeAndType), {ok, maps:values(maps:map(fun(Node,Counts) -> [{<<"Node name">>, Node} | maps:to_list(Counts)] end, CountByNodeAndType))}; maybe_migrate(ByNode, MaxQueuesDesired, [N | Nodes]) -> @@ -1281,14 +1295,12 @@ list_durable() -> -spec list_by_type(atom()) -> [amqqueue:amqqueue()]. -list_by_type(classic) -> list_by_type(rabbit_classic_queue); -list_by_type(quorum) -> list_by_type(rabbit_quorum_queue); -list_by_type(stream) -> list_by_type(rabbit_stream_queue); -list_by_type(Type) -> - rabbit_db_queue:get_all_durable_by_type(Type). +list_by_type(TypeDescriptor) -> + TypeModule = rabbit_queue_type:discover(TypeDescriptor), + rabbit_db_queue:get_all_durable_by_type(TypeModule). +%% TODO: looks unused -spec list_local_quorum_queue_names() -> [name()]. - list_local_quorum_queue_names() -> [ amqqueue:get_name(Q) || Q <- list_by_type(quorum), amqqueue:get_state(Q) =/= crashed, @@ -1313,18 +1325,19 @@ list_stream_queues_on(Node) when is_atom(Node) -> list_local_leaders() -> [ Q || Q <- list(), amqqueue:is_quorum(Q), - amqqueue:get_state(Q) =/= crashed, amqqueue:get_leader(Q) =:= node()]. + amqqueue:get_state(Q) =/= crashed, amqqueue:get_leader_node(Q) =:= node()]. -spec list_local_followers() -> [amqqueue:amqqueue()]. list_local_followers() -> [Q || Q <- list(), amqqueue:is_quorum(Q), - amqqueue:get_leader(Q) =/= node(), + amqqueue:get_leader_node(Q) =/= node(), lists:member(node(), get_quorum_nodes(Q)), rabbit_quorum_queue:is_recoverable(Q) ]. +%% TODO: looks unused -spec list_local_quorum_queues_with_name_matching(binary()) -> [amqqueue:amqqueue()]. list_local_quorum_queues_with_name_matching(Pattern) -> [ Q || Q <- list_by_type(quorum), @@ -1909,13 +1922,10 @@ forget_node_for_queue(Q) -> run_backing_queue(QPid, Mod, Fun) -> gen_server2:cast(QPid, {run_backing_queue, Mod, Fun}). --spec is_replicated(amqqueue:amqqueue()) -> boolean(). +-spec is_replicable(amqqueue:amqqueue()) -> boolean(). -is_replicated(Q) when ?amqqueue_is_classic(Q) -> - false; -is_replicated(_Q) -> - %% streams and quorum queues are all replicated - true. +is_replicable(Q) -> + rabbit_queue_type:is_replicable(Q). is_exclusive(Q) when ?amqqueue_exclusive_owner_is(Q, none) -> false; @@ -1985,7 +1995,7 @@ filter_transient_queues_to_delete(Node) -> amqqueue:qnode(Q) == Node andalso not rabbit_process:is_process_alive(amqqueue:get_pid(Q)) andalso (not amqqueue:is_classic(Q) orelse not amqqueue:is_durable(Q)) - andalso (not is_replicated(Q) + andalso (not is_replicable(Q) orelse is_dead_exclusive(Q)) andalso amqqueue:get_type(Q) =/= rabbit_mqtt_qos0_queue end. diff --git a/deps/rabbit/src/rabbit_boot_steps.erl b/deps/rabbit/src/rabbit_boot_steps.erl index 701dbcea3a30..e4116ffa886e 100644 --- a/deps/rabbit/src/rabbit_boot_steps.erl +++ b/deps/rabbit/src/rabbit_boot_steps.erl @@ -4,6 +4,8 @@ %% %% Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved. %% +%% README: https://github.com/rabbitmq/internals/blob/master/rabbit_boot_process.md +%% -module(rabbit_boot_steps). diff --git a/deps/rabbit/src/rabbit_channel.erl b/deps/rabbit/src/rabbit_channel.erl index 38614fc4de72..d28072d01438 100644 --- a/deps/rabbit/src/rabbit_channel.erl +++ b/deps/rabbit/src/rabbit_channel.erl @@ -1279,11 +1279,7 @@ handle_method(#'basic.get'{queue = QueueNameBin, no_ack = NoAck}, ?INCR_STATS(queue_stats, QueueName, 1, get_empty, State), {reply, #'basic.get_empty'{}, State#ch{queue_states = QueueStates}}; {error, {unsupported, single_active_consumer}} -> - rabbit_misc:protocol_error( - resource_locked, - "cannot obtain access to locked ~ts. basic.get operations " - "are not supported by quorum queues with single active consumer", - [rabbit_misc:rs(QueueName)]); + rabbit_amqqueue:with_or_die(QueueName, fun unsupported_single_active_consumer_error/1); {error, Reason} -> %% TODO add queue type to error message rabbit_misc:protocol_error(internal_error, @@ -1996,6 +1992,7 @@ foreach_per_queue(_F, [], Acc) -> foreach_per_queue(F, [#pending_ack{tag = CTag, queue = QName, msg_id = MsgId}], Acc) -> + %% TODO: fix this abstraction leak %% quorum queue, needs the consumer tag F({QName, CTag}, [MsgId], Acc); foreach_per_queue(F, UAL, Acc) -> @@ -2023,6 +2020,7 @@ notify_limiter(Limiter, Acked) -> case rabbit_limiter:is_active(Limiter) of false -> ok; true -> case lists:foldl(fun (#pending_ack{tag = CTag}, Acc) when is_integer(CTag) -> + %% TODO: fix absctraction leak %% Quorum queues use integer CTags %% classic queues use binaries %% Quorum queues do not interact @@ -2787,3 +2785,12 @@ maybe_decrease_global_publishers(#ch{publishing_mode = true}) -> is_global_qos_permitted() -> rabbit_deprecated_features:is_permitted(global_qos). + +-spec unsupported_single_active_consumer_error(amqqueue:amqqueue()) -> no_return(). +unsupported_single_active_consumer_error(Q) -> + rabbit_misc:protocol_error( + resource_locked, + "cannot obtain access to locked ~ts. basic.get operations " + "are not supported by ~p queues with single active consumer", + [rabbit_misc:rs(amqqueue:get_name(Q)), + rabbit_queue_type:short_alias_of(amqqueue:get_type(Q))]). diff --git a/deps/rabbit/src/rabbit_classic_queue.erl b/deps/rabbit/src/rabbit_classic_queue.erl index 5c79b6804615..3a0d72863245 100644 --- a/deps/rabbit/src/rabbit_classic_queue.erl +++ b/deps/rabbit/src/rabbit_classic_queue.erl @@ -64,8 +64,26 @@ send_drained_credit_api_v1/4, send_credit_reply/7]). +-export([policy_apply_to_name/0, + stop/1, + list_with_minimum_quorum/0, + drain/1, + revive/0, + queue_vm_stats_sups/0, + queue_vm_ets/0]). + -export([validate_policy/1]). +-rabbit_boot_step( + {rabbit_classic_queue_type, + [{description, "Classic queue: queue type"}, + {mfa, {rabbit_registry, register, + [queue, <<"classic">>, ?MODULE]}}, + {cleanup, {rabbit_registry, unregister, + [queue, <<"classic">>]}}, + {requires, rabbit_registry}, + {enables, ?MODULE}]}). + -rabbit_boot_step( {?MODULE, [{description, "Deprecated queue-master-locator support." @@ -74,7 +92,7 @@ [policy_validator, <<"queue-master-locator">>, ?MODULE]}}, {mfa, {rabbit_registry, register, [operator_policy_validator, <<"queue-master-locator">>, ?MODULE]}}, - {requires, rabbit_registry}, + {requires, [rabbit_classic_queue_type]}, {enables, recovery}]}). validate_policy(Args) -> @@ -590,7 +608,11 @@ capabilities() -> false -> [] end, consumer_arguments => [<<"x-priority">>], - server_named => true}. + server_named => true, + rebalance_module => undefined, + can_redeliver => false, + is_replicable => false + }. notify_decorators(Q) when ?is_amqqueue(Q) -> QPid = amqqueue:get_pid(Q), @@ -678,3 +700,33 @@ send_credit_reply(Pid, QName, Ctag, DeliveryCount, Credit, Available, Drain) -> send_queue_event(Pid, QName, Event) -> gen_server:cast(Pid, {queue_event, QName, Event}). + +policy_apply_to_name() -> + <<"classic_queues">>. + +stop(VHost) -> + ok = rabbit_amqqueue_sup_sup:stop_for_vhost(VHost), + {ok, BQ} = application:get_env(rabbit, backing_queue_module), + ok = BQ:stop(VHost). + +list_with_minimum_quorum() -> + []. + +drain(_TransferCandidates) -> + ok. + +revive() -> + ok. + +queue_vm_stats_sups() -> + {[queue_procs], [rabbit_vm:all_vhosts_children(rabbit_amqqueue_sup_sup)]}. + +%% return nothing because of this line in rabbit_vm: +%% {msg_index, MsgIndexETS + MsgIndexProc}, +%% it mixes procs and ets, +%% TODO: maybe instead of separating sups and ets +%% I need vm_memory callback that just +%% returns proplist? And rabbit_vm calculates +%% Other as usual by substraction. +queue_vm_ets() -> + {[], []}. diff --git a/deps/rabbit/src/rabbit_definitions.erl b/deps/rabbit/src/rabbit_definitions.erl index 257f76232e10..884466a81787 100644 --- a/deps/rabbit/src/rabbit_definitions.erl +++ b/deps/rabbit/src/rabbit_definitions.erl @@ -1045,16 +1045,11 @@ list_queues() -> queue_definition(Q) -> #resource{virtual_host = VHost, name = Name} = amqqueue:get_name(Q), - Type = case amqqueue:get_type(Q) of - rabbit_classic_queue -> classic; - rabbit_quorum_queue -> quorum; - rabbit_stream_queue -> stream; - T -> T - end, + TypeModule = amqqueue:get_type(Q), #{ <<"vhost">> => VHost, <<"name">> => Name, - <<"type">> => Type, + <<"type">> => rabbit_registry:lookup_type_name(queue, TypeModule), <<"durable">> => amqqueue:is_durable(Q), <<"auto_delete">> => amqqueue:is_auto_delete(Q), <<"arguments">> => rabbit_misc:amqp_table(amqqueue:get_arguments(Q)) diff --git a/deps/rabbit/src/rabbit_fifo_dlx_worker.erl b/deps/rabbit/src/rabbit_fifo_dlx_worker.erl index 647317a35618..6dc08d9f66bc 100644 --- a/deps/rabbit/src/rabbit_fifo_dlx_worker.erl +++ b/deps/rabbit/src/rabbit_fifo_dlx_worker.erl @@ -537,16 +537,7 @@ redeliver0(#pending{delivery = Msg0, [rabbit_amqqueue:name()]. clients_redeliver(Qs, QTypeState) -> lists:filter(fun(Q) -> - case rabbit_queue_type:module(Q, QTypeState) of - {ok, rabbit_quorum_queue} -> - % If #enqueue{} Raft command does not get applied - % rabbit_fifo_client will resend. - true; - {ok, rabbit_stream_queue} -> - true; - _ -> - false - end + rabbit_queue_type:can_redeliver(Q, QTypeState) end, Qs). maybe_set_timer(#state{timer = TRef} = State) diff --git a/deps/rabbit/src/rabbit_global_counters.erl b/deps/rabbit/src/rabbit_global_counters.erl index e1aba4ca0455..49fc9a06fe53 100644 --- a/deps/rabbit/src/rabbit_global_counters.erl +++ b/deps/rabbit/src/rabbit_global_counters.erl @@ -266,8 +266,8 @@ messages_dead_lettered(Reason, QueueType, DeadLetterStrategy, Num) -> end, counters:add(fetch(QueueType, DeadLetterStrategy), Index, Num). -messages_dead_lettered_confirmed(rabbit_quorum_queue, at_least_once, Num) -> - counters:add(fetch(rabbit_quorum_queue, at_least_once), ?MESSAGES_DEAD_LETTERED_CONFIRMED, Num). +messages_dead_lettered_confirmed(QTypeModule, at_least_once, Num) -> + counters:add(fetch(QTypeModule, at_least_once), ?MESSAGES_DEAD_LETTERED_CONFIRMED, Num). fetch(Protocol) -> persistent_term:get({?MODULE, Protocol}). diff --git a/deps/rabbit/src/rabbit_maintenance.erl b/deps/rabbit/src/rabbit_maintenance.erl index f6ee1f340287..873bc8453d85 100644 --- a/deps/rabbit/src/rabbit_maintenance.erl +++ b/deps/rabbit/src/rabbit_maintenance.erl @@ -33,7 +33,6 @@ close_all_client_connections/0, primary_replica_transfer_candidate_nodes/0, random_primary_replica_transfer_candidate_node/2, - transfer_leadership_of_quorum_queues/1, table_definitions/0 ]). @@ -78,13 +77,7 @@ drain() -> TransferCandidates = primary_replica_transfer_candidate_nodes(), %% Note: only QQ leadership is transferred because it is a reasonably quick thing to do a lot of queues %% in the cluster, unlike with CMQs. - transfer_leadership_of_quorum_queues(TransferCandidates), - stop_local_quorum_queue_followers(), - - case whereis(rabbit_stream_coordinator) of - undefined -> ok; - _Pid -> transfer_leadership_of_stream_coordinator(TransferCandidates) - end, + rabbit_queue_type:drain(TransferCandidates), transfer_leadership_of_metadata_store(TransferCandidates), @@ -99,7 +92,7 @@ drain() -> -spec revive() -> ok. revive() -> rabbit_log:info("This node is being revived from maintenance (drain) mode"), - revive_local_quorum_queue_replicas(), + rabbit_queue_type:revive(), rabbit_log:info("Resumed all listeners and will accept client connections again"), _ = resume_all_client_listeners(), rabbit_log:info("Resumed all listeners and will accept client connections again"), @@ -186,32 +179,6 @@ close_all_client_connections() -> rabbit_networking:close_connections(Pids, "Node was put into maintenance mode"), {ok, length(Pids)}. --spec transfer_leadership_of_quorum_queues([node()]) -> ok. -transfer_leadership_of_quorum_queues([]) -> - rabbit_log:warning("Skipping leadership transfer of quorum queues: no candidate " - "(online, not under maintenance) nodes to transfer to!"); -transfer_leadership_of_quorum_queues(_TransferCandidates) -> - %% we only transfer leadership for QQs that have local leaders - Queues = rabbit_amqqueue:list_local_leaders(), - rabbit_log:info("Will transfer leadership of ~b quorum queues with current leader on this node", - [length(Queues)]), - [begin - Name = amqqueue:get_name(Q), - rabbit_log:debug("Will trigger a leader election for local quorum queue ~ts", - [rabbit_misc:rs(Name)]), - %% we trigger an election and exclude this node from the list of candidates - %% by simply shutting its local QQ replica (Ra server) - RaLeader = amqqueue:get_pid(Q), - rabbit_log:debug("Will stop Ra server ~tp", [RaLeader]), - case rabbit_quorum_queue:stop_server(RaLeader) of - ok -> - rabbit_log:debug("Successfully stopped Ra server ~tp", [RaLeader]); - {error, nodedown} -> - rabbit_log:error("Failed to stop Ra server ~tp: target node was reported as down") - end - end || Q <- Queues], - rabbit_log:info("Leadership transfer for quorum queues hosted on this node has been initiated"). - transfer_leadership_of_metadata_store(TransferCandidates) -> rabbit_log:info("Will transfer leadership of metadata store with current leader on this node", []), @@ -224,47 +191,6 @@ transfer_leadership_of_metadata_store(TransferCandidates) -> rabbit_log:warning("Skipping leadership transfer of metadata store: ~p", [Error]) end. --spec transfer_leadership_of_stream_coordinator([node()]) -> ok. -transfer_leadership_of_stream_coordinator([]) -> - rabbit_log:warning("Skipping leadership transfer of stream coordinator: no candidate " - "(online, not under maintenance) nodes to transfer to!"); -transfer_leadership_of_stream_coordinator(TransferCandidates) -> - % try to transfer to the node with the lowest uptime; the assumption is that - % nodes are usually restarted in a rolling fashion, in a consistent order; - % therefore, the youngest node has already been restarted or (if we are draining the first node) - % that it will be restarted last. either way, this way we limit the number of transfers - Uptimes = rabbit_misc:append_rpc_all_nodes(TransferCandidates, erlang, statistics, [wall_clock]), - Candidates = lists:zipwith(fun(N, {U, _}) -> {N, U} end, TransferCandidates, Uptimes), - BestCandidate = element(1, hd(lists:keysort(2, Candidates))), - case rabbit_stream_coordinator:transfer_leadership([BestCandidate]) of - {ok, Node} -> - rabbit_log:info("Leadership transfer for stream coordinator completed. The new leader is ~p", [Node]); - Error -> - rabbit_log:warning("Skipping leadership transfer of stream coordinator: ~p", [Error]) - end. - --spec stop_local_quorum_queue_followers() -> ok. -stop_local_quorum_queue_followers() -> - Queues = rabbit_amqqueue:list_local_followers(), - rabbit_log:info("Will stop local follower replicas of ~b quorum queues on this node", - [length(Queues)]), - [begin - Name = amqqueue:get_name(Q), - rabbit_log:debug("Will stop a local follower replica of quorum queue ~ts", - [rabbit_misc:rs(Name)]), - %% shut down Ra nodes so that they are not considered for leader election - {RegisteredName, _LeaderNode} = amqqueue:get_pid(Q), - RaNode = {RegisteredName, node()}, - rabbit_log:debug("Will stop Ra server ~tp", [RaNode]), - case rabbit_quorum_queue:stop_server(RaNode) of - ok -> - rabbit_log:debug("Successfully stopped Ra server ~tp", [RaNode]); - {error, nodedown} -> - rabbit_log:error("Failed to stop Ra server ~tp: target node was reported as down") - end - end || Q <- Queues], - rabbit_log:info("Stopped all local replicas of quorum queues hosted on this node"). - -spec primary_replica_transfer_candidate_nodes() -> [node()]. primary_replica_transfer_candidate_nodes() -> filter_out_drained_nodes_consistent_read(rabbit_nodes:list_running() -- [node()]). @@ -289,24 +215,6 @@ random_nth(Nodes) -> Nth = erlang:phash2(erlang:monotonic_time(), length(Nodes)), lists:nth(Nth + 1, Nodes). -revive_local_quorum_queue_replicas() -> - Queues = rabbit_amqqueue:list_local_followers(), - %% NB: this function ignores the first argument so we can just pass the - %% empty binary as the vhost name. - {Recovered, Failed} = rabbit_quorum_queue:recover(<<>>, Queues), - rabbit_log:debug("Successfully revived ~b quorum queue replicas", - [length(Recovered)]), - case length(Failed) of - 0 -> - ok; - NumFailed -> - rabbit_log:error("Failed to revive ~b quorum queue replicas", - [NumFailed]) - end, - - rabbit_log:info("Restart of local quorum queue replicas is complete"), - ok. - %% %% Implementation %% diff --git a/deps/rabbit/src/rabbit_observer_cli.erl b/deps/rabbit/src/rabbit_observer_cli.erl index 77c102d1f6e3..432426d8932b 100644 --- a/deps/rabbit/src/rabbit_observer_cli.erl +++ b/deps/rabbit/src/rabbit_observer_cli.erl @@ -7,10 +7,21 @@ -module(rabbit_observer_cli). --export([init/0]). +-export([init/0, add_plugin/1]). init() -> application:set_env(observer_cli, plugins, [ rabbit_observer_cli_classic_queues:plugin_info(), rabbit_observer_cli_quorum_queues:plugin_info() ]). + +%% must be executed after observer_cli boot_step +add_plugin(PluginInfo) -> + case application:get_env(observer_cli, plugins, undefined) of + undefined -> %% shouldn't be there, die + exit({rabbit_observer_cli_step_not_there, "Can't add observer_cli plugin, required boot_step wasn't executed"}); + Plugins when is_list(Plugins) -> + application:set_env(observer_cli, plugins, Plugins ++ [PluginInfo]); + _ -> + exit({rabbit_observer_cli_plugins_error, "Can't add observer_cli plugin, existing entry is not a list"}) + end. diff --git a/deps/rabbit/src/rabbit_policy.erl b/deps/rabbit/src/rabbit_policy.erl index 381927f36df7..72706a2a1c72 100644 --- a/deps/rabbit/src/rabbit_policy.erl +++ b/deps/rabbit/src/rabbit_policy.erl @@ -493,10 +493,13 @@ matches_type(_, _) -> false. matches_queue_type(queue, _, <<"all">>) -> true; matches_queue_type(queue, _, <<"queues">>) -> true; -matches_queue_type(queue, rabbit_classic_queue, <<"classic_queues">>) -> true; -matches_queue_type(queue, rabbit_quorum_queue, <<"quorum_queues">>) -> true; -matches_queue_type(queue, rabbit_stream_queue, <<"streams">>) -> true; -matches_queue_type(queue, _, _) -> false. +matches_queue_type(queue, TypeModule, Term) -> + %% we assume here TypeModule comes from queue struct, + %% therefore it is used and loaded - no need to check + %% with registry. + %% we also assume here and elsewhere that queue type + %% module developer implemented all needed callbacks + TypeModule:policy_apply_to_name() == Term. priority_comparator(A, B) -> pget(priority, A) >= pget(priority, B). @@ -578,9 +581,19 @@ is_proplist(L) -> length(L) =:= length([I || I = {_, _} <- L]). apply_to_validation(_Name, <<"all">>) -> ok; apply_to_validation(_Name, <<"exchanges">>) -> ok; apply_to_validation(_Name, <<"queues">>) -> ok; -apply_to_validation(_Name, <<"classic_queues">>) -> ok; -apply_to_validation(_Name, <<"quorum_queues">>) -> ok; -apply_to_validation(_Name, <<"streams">>) -> ok; apply_to_validation(_Name, Term) -> - {error, "apply-to '~ts' unrecognised; should be one of: 'queues', 'classic_queues', " - " 'quorum_queues', 'streams', 'exchanges', or 'all'", [Term]}. + %% as a last restort go to queue types registry + %% and try to find something here + case maybe_apply_to_queue_type(Term) of + true -> ok; + false -> + %% TODO: get recognized queue terms from queue types from queue type. + {error, "apply-to '~ts' unrecognised; should be one of: 'queues', 'classic_queues', " + " 'quorum_queues', 'streams', 'exchanges', or 'all'", [Term]} + end. + +maybe_apply_to_queue_type(Term) -> + lists:any(fun({_TypeName, TypeModule}) -> + TypeModule:policy_apply_to_name() == Term + end, + rabbit_registry:lookup_all(queue)). diff --git a/deps/rabbit/src/rabbit_queue_location.erl b/deps/rabbit/src/rabbit_queue_location.erl index 4c7dfe7ea0b9..0f204f97347e 100644 --- a/deps/rabbit/src/rabbit_queue_location.erl +++ b/deps/rabbit/src/rabbit_queue_location.erl @@ -45,7 +45,7 @@ queue_leader_locators() -> -spec select_leader_and_followers(amqqueue:amqqueue(), pos_integer()) -> {Leader :: node(), Followers :: [node()]}. select_leader_and_followers(Q, Size) - when (?amqqueue_is_quorum(Q) orelse ?amqqueue_is_stream(Q) orelse ?amqqueue_is_classic(Q)) andalso is_integer(Size) -> + when (?is_amqqueue_v2(Q)) andalso is_integer(Size) -> LeaderLocator = leader_locator(Q), QueueType = amqqueue:get_type(Q), do_select_leader_and_followers(Size, QueueType, LeaderLocator). @@ -109,6 +109,7 @@ leader_locator0(_) -> %% default <<"client-local">>. +%% TODO: allow dispatching by queue type -spec select_members(pos_integer(), rabbit_queue_type:queue_type(), [node(),...], [node(),...], non_neg_integer(), non_neg_integer(), function()) -> {[node(),...], function()}. diff --git a/deps/rabbit/src/rabbit_queue_type.erl b/deps/rabbit/src/rabbit_queue_type.erl index 4ddf31780538..d11b1ec14fa8 100644 --- a/deps/rabbit/src/rabbit_queue_type.erl +++ b/deps/rabbit/src/rabbit_queue_type.erl @@ -60,7 +60,16 @@ arguments/1, arguments/2, notify_decorators/1, - publish_at_most_once/2 + publish_at_most_once/2, + can_redeliver/2, + rebalance_module/1, + is_replicable/1, + stop/1, + list_with_minimum_quorum/0, + drain/1, + revive/0, + queue_vm_stats_sups/0, + queue_vm_ets/0 ]). -export([ @@ -75,7 +84,7 @@ %% sequence number typically -type correlation() :: term(). -type arguments() :: queue_arguments | consumer_arguments. --type queue_type() :: rabbit_classic_queue | rabbit_quorum_queue | rabbit_stream_queue | module(). +-type queue_type() :: module(). %% see AMQP 1.0 §2.6.7 -type delivery_count() :: sequence_no(). -type credit() :: uint(). @@ -84,10 +93,6 @@ -define(DOWN_KEYS, [name, durable, auto_delete, arguments, pid, type, state]). -%% TODO resolve all registered queue types from registry --define(QUEUE_MODULES, [rabbit_classic_queue, rabbit_quorum_queue, rabbit_stream_queue]). --define(KNOWN_QUEUE_TYPES, [<<"classic">>, <<"quorum">>, <<"stream">>]). - -type credit_reply_action() :: {credit_reply, rabbit_types:ctag(), delivery_count(), credit(), Available :: non_neg_integer(), Drain :: boolean()}. @@ -265,72 +270,60 @@ -callback format(amqqueue:amqqueue(), Context :: map()) -> [{atom(), term()}]. +%% TODO: replace binary() with real types? -callback capabilities() -> - #{atom() := term()}. + #{unsupported_policies := [binary()], + queue_arguments := [binary()], + consumer_arguments := [binary()], + amqp_capabilities => [binary()], + server_named := boolean(), + rebalance_module := module() | undefined, + can_redeliver := boolean(), + is_replicable := boolean()}. -callback notify_decorators(amqqueue:amqqueue()) -> ok. +-callback policy_apply_to_name() -> binary(). + +%% -callback on_node_up(node()) -> ok. + +%% -callback on_node_down(node()) -> ok. + +-callback stop(rabbit_types:vhost()) -> ok. + +-callback list_with_minimum_quorum() -> [amqqueue:amqqueue()]. + +-callback drain([node()]) -> ok. + +-callback revive() -> ok. + +%% used by rabbit_vm to emit queue process +%% (currently memory and binary) stats +-callback queue_vm_stats_sups() -> {StatsKeys :: [atom()], SupsNames:: [[atom()]]}. + +-callback queue_vm_ets() -> {StatsKeys :: [atom()], ETSNames:: [[atom()]]}. + -spec discover(binary() | atom()) -> queue_type(). discover(<<"undefined">>) -> fallback(); discover(undefined) -> fallback(); -%% TODO: should this use a registry that's populated on boot? -discover(<<"quorum">>) -> - rabbit_quorum_queue; -discover(rabbit_quorum_queue) -> - rabbit_quorum_queue; -discover(<<"classic">>) -> - rabbit_classic_queue; -discover(rabbit_classic_queue) -> - rabbit_classic_queue; -discover(rabbit_stream_queue) -> - rabbit_stream_queue; -discover(<<"stream">>) -> - rabbit_stream_queue; -discover(Other) when is_atom(Other) -> - discover(rabbit_data_coercion:to_binary(Other)); -discover(Other) when is_binary(Other) -> - T = rabbit_registry:binary_to_type(Other), - rabbit_log:debug("Queue type discovery: will look up a module for type '~tp'", [T]), - {ok, Mod} = rabbit_registry:lookup_module(queue, T), - Mod. - --spec short_alias_of(queue_type()) -> undefined | binary(). -%% The opposite of discover/1: returns a short alias given a module name -short_alias_of(<<"rabbit_quorum_queue">>) -> - <<"quorum">>; -short_alias_of(rabbit_quorum_queue) -> - <<"quorum">>; -%% AMQP 1.0 management client -short_alias_of({utf8, <<"quorum">>}) -> - <<"quorum">>; -short_alias_of(<<"rabbit_classic_queue">>) -> - <<"classic">>; -short_alias_of(rabbit_classic_queue) -> - <<"classic">>; -%% AMQP 1.0 management client -short_alias_of({utf8, <<"classic">>}) -> - <<"classic">>; -short_alias_of(<<"rabbit_stream_queue">>) -> - <<"stream">>; -short_alias_of(rabbit_stream_queue) -> - <<"stream">>; +discover(TypeDescriptor) -> + {ok, TypeModule} = rabbit_registry:lookup_type_module(queue, TypeDescriptor), + TypeModule. + +-spec short_alias_of(TypeDescriptor) -> Ret when + TypeDescriptor :: {utf8, binary()} | atom() | binary(), + Ret :: binary() | undefined. %% AMQP 1.0 management client -short_alias_of({utf8, <<"stream">>}) -> - <<"stream">>; -%% for cases where this function is used for -%% formatting of values that already might use these -%% short aliases -short_alias_of(<<"quorum">>) -> - <<"quorum">>; -short_alias_of(<<"classic">>) -> - <<"classic">>; -short_alias_of(<<"stream">>) -> - <<"stream">>; -short_alias_of(_Other) -> - undefined. +short_alias_of({utf8, TypeName}) -> + short_alias_of(TypeName); +short_alias_of(TypeDescriptor) -> + case rabbit_registry:lookup_type_name(queue, TypeDescriptor) of + {ok, TypeName} -> TypeName; + _ -> undefined + end. %% If the client does not specify the type, the virtual host does not have any %% metadata default, and rabbit.default_queue_type is not set in the application env, @@ -826,14 +819,13 @@ qref(Q) when ?is_amqqueue(Q) -> known_queue_type_modules() -> Registered = rabbit_registry:lookup_all(queue), {_, Modules} = lists:unzip(Registered), - ?QUEUE_MODULES ++ Modules. + Modules. -spec known_queue_type_names() -> [binary()]. known_queue_type_names() -> Registered = rabbit_registry:lookup_all(queue), {QueueTypes, _} = lists:unzip(Registered), - QTypeBins = lists:map(fun(X) -> atom_to_binary(X) end, QueueTypes), - ?KNOWN_QUEUE_TYPES ++ QTypeBins. + lists:map(fun(X) -> atom_to_binary(X) end, QueueTypes). inject_dqt(VHost) when ?is_vhost(VHost) -> inject_dqt(vhost:to_map(VHost)); @@ -897,3 +889,59 @@ check_cluster_queue_limit(Q) -> queue_limit_error(Reason, ReasonArgs) -> {error, queue_limit_exceeded, Reason, ReasonArgs}. + +-spec can_redeliver(queue_name(), state()) -> + {ok, module()} | {error, not_found}. +can_redeliver(Q, State) -> + case module(Q, State) of + {ok, TypeModule} -> + Capabilities = TypeModule:capabilities(), + maps:get(can_redeliver, Capabilities, false); + _ -> false + end. + +-spec rebalance_module(amqqueue:amqqueue()) -> undefine | module(). +rebalance_module(Q) -> + TypeModule = amqqueue:get_type(Q), + Capabilities = TypeModule:capabilities(), + maps:get(rebalance_module, Capabilities, undefined). + +-spec is_replicable(amqqueue:amqqueue()) -> undefine | module(). +is_replicable(Q) -> + TypeModule = amqqueue:get_type(Q), + Capabilities = TypeModule:capabilities(), + maps:get(is_replicable, Capabilities, false). + +-spec stop(rabbit_types:vhost()) -> ok. +stop(VHost) -> + %% original rabbit_amqqueue:stop doesn't do any catches or try after + _ = [TypeModule:stop(VHost) || {_Type, TypeModule} <- rabbit_registry:lookup_all(queue)], + ok. + +list_with_minimum_quorum() -> + lists:append([TypeModule:list_with_minimum_quorum() + || {_Type, TypeModule} <- rabbit_registry:lookup_all(queue)]). + +drain(TransferCandidates) -> + _ = [TypeModule:drain(TransferCandidates) || + {_Type, TypeModule} <- rabbit_registry:lookup_all(queue)], + ok. + +revive() -> + _ = [TypeModule:revive() || + {_Type, TypeModule} <- rabbit_registry:lookup_all(queue)], + ok. + +queue_vm_stats_sups() -> + lists:foldl(fun({_TypeName, TypeModule}, {KeysAcc, SupsAcc}) -> + {Keys, Sups} = TypeModule:queue_vm_stats_sups(), + {KeysAcc ++ Keys, SupsAcc ++ Sups} + end, + {[], []}, rabbit_registry:lookup_all(queue)). + +queue_vm_ets() -> + lists:foldl(fun({_TypeName, TypeModule}, {KeysAcc, SupsAcc}) -> + {Keys, Tables} = TypeModule:queue_vm_ets(), + {KeysAcc ++ Keys, SupsAcc ++ Tables} + end, + {[], []}, rabbit_registry:lookup_all(queue)). diff --git a/deps/rabbit/src/rabbit_quorum_queue.erl b/deps/rabbit/src/rabbit_quorum_queue.erl index 4a013bbe70d3..08cb89ccee90 100644 --- a/deps/rabbit/src/rabbit_quorum_queue.erl +++ b/deps/rabbit/src/rabbit_quorum_queue.erl @@ -77,6 +77,12 @@ force_vhost_queues_shrink_member_to_current_member/1, force_all_queues_shrink_member_to_current_member/0]). +-export([policy_apply_to_name/0, + drain/1, + revive/0, + queue_vm_stats_sups/0, + queue_vm_ets/0]). + %% for backwards compatibility -export([file_handle_leader_reservation/1, file_handle_other_reservation/0, @@ -98,6 +104,15 @@ -include_lib("rabbit_common/include/rabbit.hrl"). -include("amqqueue.hrl"). +-rabbit_boot_step( + {rabbit_quorum_queue_type, + [{description, "Quorum queue: queue type"}, + {mfa, {rabbit_registry, register, + [queue, <<"quorum">>, ?MODULE]}}, + {cleanup, {rabbit_registry, unregister, + [queue, <<"quorum">>]}}, + {requires, rabbit_registry}]}). + -type msg_id() :: non_neg_integer(). -type qmsg() :: {rabbit_types:r('queue'), pid(), msg_id(), boolean(), mc:state()}. @@ -166,7 +181,7 @@ [operator_policy_validator, <<"target-group-size">>, ?MODULE]}}, {mfa, {rabbit_registry, register, [policy_merge_strategy, <<"target-group-size">>, ?MODULE]}}, - {requires, rabbit_registry}, + {requires, [rabbit_registry]}, {enables, recovery}]}). validate_policy(Args) -> @@ -542,7 +557,11 @@ capabilities() -> <<"x-quorum-initial-group-size">>, <<"x-delivery-limit">>, <<"x-message-ttl">>, <<"x-queue-leader-locator">>], consumer_arguments => [<<"x-priority">>], - server_named => false}. + server_named => false, + rebalance_module => ?MODULE, + can_redeliver => true, + is_replicable => true + }. rpc_delete_metrics(QName) -> ets:delete(queue_coarse_metrics, QName), @@ -2232,3 +2251,91 @@ maybe_log_leader_health_check_result([]) -> ok; maybe_log_leader_health_check_result(Result) -> Qs = lists:map(fun(R) -> catch maps:get(<<"readable_name">>, R) end, Result), rabbit_log:warning("Leader health check result (unhealthy leaders detected): ~tp", [Qs]). + +policy_apply_to_name() -> + <<"quorum_queues">>. + +-spec drain([node()]) -> ok. +drain(TransferCandidates) -> + _ = transfer_leadership(TransferCandidates), + _ = stop_local_quorum_queue_followers(), + ok. + +transfer_leadership([]) -> + rabbit_log:warning("Skipping leadership transfer of quorum queues: no candidate " + "(online, not under maintenance) nodes to transfer to!"); +transfer_leadership(_TransferCandidates) -> + %% we only transfer leadership for QQs that have local leaders + Queues = rabbit_amqqueue:list_local_leaders(), + rabbit_log:info("Will transfer leadership of ~b quorum queues with current leader on this node", + [length(Queues)]), + [begin + Name = amqqueue:get_name(Q), + rabbit_log:debug("Will trigger a leader election for local quorum queue ~ts", + [rabbit_misc:rs(Name)]), + %% we trigger an election and exclude this node from the list of candidates + %% by simply shutting its local QQ replica (Ra server) + RaLeader = amqqueue:get_pid(Q), + rabbit_log:debug("Will stop Ra server ~tp", [RaLeader]), + case rabbit_quorum_queue:stop_server(RaLeader) of + ok -> + rabbit_log:debug("Successfully stopped Ra server ~tp", [RaLeader]); + {error, nodedown} -> + rabbit_log:error("Failed to stop Ra server ~tp: target node was reported as down") + end + end || Q <- Queues], + rabbit_log:info("Leadership transfer for quorum queues hosted on this node has been initiated"). + +%% TODO: I just copied it over, it looks like was always called inside maintenance so... +-spec stop_local_quorum_queue_followers() -> ok. +stop_local_quorum_queue_followers() -> + Queues = rabbit_amqqueue:list_local_followers(), + rabbit_log:info("Will stop local follower replicas of ~b quorum queues on this node", + [length(Queues)]), + [begin + Name = amqqueue:get_name(Q), + rabbit_log:debug("Will stop a local follower replica of quorum queue ~ts", + [rabbit_misc:rs(Name)]), + %% shut down Ra nodes so that they are not considered for leader election + {RegisteredName, _LeaderNode} = amqqueue:get_pid(Q), + RaNode = {RegisteredName, node()}, + rabbit_log:debug("Will stop Ra server ~tp", [RaNode]), + case rabbit_quorum_queue:stop_server(RaNode) of + ok -> + rabbit_log:debug("Successfully stopped Ra server ~tp", [RaNode]); + {error, nodedown} -> + rabbit_log:error("Failed to stop Ra server ~tp: target node was reported as down") + end + end || Q <- Queues], + rabbit_log:info("Stopped all local replicas of quorum queues hosted on this node"). + +revive() -> + revive_local_queue_members(). + +revive_local_queue_members() -> + Queues = rabbit_amqqueue:list_local_followers(), + %% NB: this function ignores the first argument so we can just pass the + %% empty binary as the vhost name. + {Recovered, Failed} = rabbit_quorum_queue:recover(<<>>, Queues), + rabbit_log:debug("Successfully revived ~b quorum queue replicas", + [length(Recovered)]), + case length(Failed) of + 0 -> + ok; + NumFailed -> + rabbit_log:error("Failed to revive ~b quorum queue replicas", + [NumFailed]) + end, + + rabbit_log:info("Restart of local quorum queue replicas is complete"), + ok. + +queue_vm_stats_sups() -> + {[quorum_queue_procs, + quorum_queue_dlx_procs], + [[ra_server_sup_sup], + [rabbit_fifo_dlx_sup]]}. + +queue_vm_ets() -> + {[quorum_ets], + [[ra_log_ets]]}. diff --git a/deps/rabbit/src/rabbit_stream_queue.erl b/deps/rabbit/src/rabbit_stream_queue.erl index 0b7c1c0bbba9..047b385765bb 100644 --- a/deps/rabbit/src/rabbit_stream_queue.erl +++ b/deps/rabbit/src/rabbit_stream_queue.erl @@ -59,6 +59,13 @@ -export([check_max_segment_size_bytes/1]). +-export([policy_apply_to_name/0, + stop/1, + drain/1, + revive/0, + queue_vm_stats_sups/0, + queue_vm_ets/0]). + -include_lib("rabbit_common/include/rabbit.hrl"). -include("amqqueue.hrl"). @@ -103,6 +110,16 @@ -import(rabbit_queue_type_util, [args_policy_lookup/3]). -import(rabbit_misc, [queue_resource/2]). +-rabbit_boot_step( + {?MODULE, + [{description, "Stream queue: queue type"}, + {mfa, {rabbit_registry, register, + [queue, <<"stream">>, ?MODULE]}}, + {cleanup, {rabbit_registry, unregister, + [queue, <<"stream">>]}}, + {requires, rabbit_registry} + ]}). + -type client() :: #stream_client{}. -spec is_enabled() -> boolean(). @@ -853,6 +870,8 @@ status(Vhost, QueueName) -> get_key(readers, C), get_key(segments, C)] end || C <- get_counters(Q)]; + {ok, _Q} -> + {error, not_supported}; {error, not_found} = E -> E end. @@ -927,6 +946,8 @@ tracking_status(Vhost, QueueName) -> {value, TrkData}] | Acc0] end, [], Trackings) ++ Acc end, [], Map); + {ok, Q} -> + {error, {queue_not_supported, amqqueue:get_type(Q)}}; {error, not_found} = E-> E end. @@ -1038,6 +1059,8 @@ add_replica(VHost, Name, Node) -> true -> rabbit_stream_coordinator:add_replica(Q, Node) end; + {ok, Q} -> + {error, {queue_not_supported, amqqueue:get_type(Q)}}; E -> E end. @@ -1053,6 +1076,8 @@ delete_replica(VHost, Name, Node) -> #{name := StreamId} = amqqueue:get_type_state(Q), {ok, Reply, _} = rabbit_stream_coordinator:delete_replica(StreamId, Node), Reply; + {ok, Q} -> + {error, {queue_not_supported, amqqueue:get_type(Q)}}; E -> E end. @@ -1325,7 +1350,11 @@ capabilities() -> %% AMQP property filter expressions %% https://groups.oasis-open.org/higherlogic/ws/public/document?document_id=66227 amqp_capabilities => [<<"AMQP_FILTEX_PROP_V1_0">>], - server_named => false}. + server_named => false, + rebalance_module => ?MODULE, + can_redeliver => true, + is_replicable => true + }. notify_decorators(Q) when ?is_amqqueue(Q) -> %% Not supported @@ -1399,3 +1428,49 @@ delivery_count_add(none, _) -> none; delivery_count_add(Count, N) -> serial_number:add(Count, N). + +policy_apply_to_name() -> + <<"streams">>. + +stop(_VHost) -> + ok. + +drain(TransferCandidates) -> + case whereis(rabbit_stream_coordinator) of + undefined -> ok; + _Pid -> transfer_leadership_of_stream_coordinator(TransferCandidates) + end. + +revive() -> + ok. + +-spec transfer_leadership_of_stream_coordinator([node()]) -> ok. +transfer_leadership_of_stream_coordinator([]) -> + rabbit_log:warning("Skipping leadership transfer of stream coordinator: no candidate " + "(online, not under maintenance) nodes to transfer to!"); +transfer_leadership_of_stream_coordinator(TransferCandidates) -> + % try to transfer to the node with the lowest uptime; the assumption is that + % nodes are usually restarted in a rolling fashion, in a consistent order; + % therefore, the youngest node has already been restarted or (if we are draining the first node) + % that it will be restarted last. either way, this way we limit the number of transfers + Uptimes = rabbit_misc:append_rpc_all_nodes(TransferCandidates, erlang, statistics, [wall_clock]), + Candidates = lists:zipwith(fun(N, {U, _}) -> {N, U} end, TransferCandidates, Uptimes), + BestCandidate = element(1, hd(lists:keysort(2, Candidates))), + case rabbit_stream_coordinator:transfer_leadership([BestCandidate]) of + {ok, Node} -> + rabbit_log:info("Leadership transfer for stream coordinator completed. The new leader is ~p", [Node]); + Error -> + rabbit_log:warning("Skipping leadership transfer of stream coordinator: ~p", [Error]) + end. + +queue_vm_stats_sups() -> + {[stream_queue_procs, + stream_queue_replica_reader_procs, + stream_queue_coordinator_procs], + [[osiris_server_sup], + [osiris_replica_reader_sup], + [rabbit_stream_coordinator]]}. + +queue_vm_ets() -> + {[], + []}. diff --git a/deps/rabbit/src/rabbit_upgrade_preparation.erl b/deps/rabbit/src/rabbit_upgrade_preparation.erl index ad398eba0094..a6df3572d8de 100644 --- a/deps/rabbit/src/rabbit_upgrade_preparation.erl +++ b/deps/rabbit/src/rabbit_upgrade_preparation.erl @@ -56,9 +56,7 @@ endangered_critical_components() -> do_await_safe_online_quorum(0) -> false; do_await_safe_online_quorum(IterationsLeft) -> - EndangeredQueues = lists:append( - rabbit_quorum_queue:list_with_minimum_quorum(), - rabbit_stream_queue:list_with_minimum_quorum()), + EndangeredQueues = rabbit_queue_type:list_with_minimum_quorum(), case EndangeredQueues =:= [] andalso endangered_critical_components() =:= [] of true -> true; false -> @@ -83,9 +81,7 @@ do_await_safe_online_quorum(IterationsLeft) -> -spec list_with_minimum_quorum_for_cli() -> [#{binary() => term()}]. list_with_minimum_quorum_for_cli() -> - EndangeredQueues = lists:append( - rabbit_quorum_queue:list_with_minimum_quorum(), - rabbit_stream_queue:list_with_minimum_quorum()), + EndangeredQueues = rabbit_queue_type:list_with_minimum_quorum(), [amqqueue:to_printable(Q) || Q <- EndangeredQueues] ++ [#{ <<"readable_name">> => C, diff --git a/deps/rabbit/src/rabbit_vm.erl b/deps/rabbit/src/rabbit_vm.erl index 451f11688505..3a13a1479a8d 100644 --- a/deps/rabbit/src/rabbit_vm.erl +++ b/deps/rabbit/src/rabbit_vm.erl @@ -7,7 +7,7 @@ -module(rabbit_vm). --export([memory/0, binary/0, ets_tables_memory/1]). +-export([memory/0, binary/0, ets_tables_memory/1, all_vhosts_children/1]). -define(MAGIC_PLUGINS, ["cowboy", "ranch", "sockjs"]). @@ -16,19 +16,43 @@ -spec memory() -> rabbit_types:infos(). memory() -> - All = interesting_sups(), + %% this whole aggregation pipeline preserves sups order + %% [{info_key, [SupName...]}...] i.e. flattened list of + %% info key, sups list pairs for each queue type + %% example for existing info keys: + %% [{queue_procs, queue_sups()}, + %% {quorum_queue_procs, [ra_server_sup_sup]}, + %% {quorum_queue_dlx_procs, [rabbit_fifo_dlx_sup]}, + %% {stream_queue_procs, [osiris_server_sup]}, + %% {stream_queue_replica_reader_procs, [osiris_replica_reader_sup]}, + %% {stream_queue_coordinator_procs, [rabbit_stream_coordinator]}] + {QueueSupsStatsKeys, QueueStatsSups} = case rabbit:is_running() of + true -> rabbit_queue_type:queue_vm_stats_sups(); + false -> {[], []} + end, + + %% we keep order and that means this variable queues part + %% has to be matched somehow - | Rest is the best. + All = interesting_sups() ++ QueueStatsSups, {Sums, _Other} = sum_processes( lists:append(All), distinguishers(), [memory]), - [Qs, Qqs, DlxWorkers, Ssqs, Srqs, SCoor, ConnsReader, ConnsWriter, ConnsChannel, - ConnsOther, MsgIndexProc, MgmtDbProc, Plugins] = + [ConnsReader, ConnsWriter, ConnsChannel, + ConnsOther, MsgIndexProc, MgmtDbProc, Plugins | QueueSupsStats] = [aggregate(Names, Sums, memory, fun (X) -> X end) - || Names <- distinguished_interesting_sups()], + || Names <- distinguished_interesting_sups() ++ QueueStatsSups], + + + {QueuesEtsStatsKeys, QueueStatsEtsNames} = case rabbit:is_running() of + true -> rabbit_queue_type:queue_vm_ets(); + false -> {[], []} + end, + + QueuesEtsStats = lists:map(fun ets_memory/1, QueueStatsEtsNames), MnesiaETS = mnesia_memory(), MsgIndexETS = ets_memory(msg_stores()), MetricsETS = ets_memory([rabbit_metrics]), - QuorumETS = ets_memory([ra_log_ets]), MetricsProc = try [{_, M}] = process_info(whereis(rabbit_metrics), [memory]), M @@ -63,23 +87,20 @@ memory() -> OtherProc = Processes - ConnsReader - ConnsWriter - ConnsChannel - ConnsOther - - Qs - Qqs - DlxWorkers - Ssqs - Srqs - SCoor - MsgIndexProc - Plugins + - lists:sum(QueueSupsStats) - MsgIndexProc - Plugins - MgmtDbProc - MetricsProc - MetadataStoreProc, + [ %% Connections {connection_readers, ConnsReader}, {connection_writers, ConnsWriter}, {connection_channels, ConnsChannel}, - {connection_other, ConnsOther}, + {connection_other, ConnsOther}] ++ %% Queues - {queue_procs, Qs}, - {quorum_queue_procs, Qqs}, - {quorum_queue_dlx_procs, DlxWorkers}, - {stream_queue_procs, Ssqs}, - {stream_queue_replica_reader_procs, Srqs}, - {stream_queue_coordinator_procs, SCoor}, + lists:zip(QueueSupsStatsKeys, QueueSupsStats) ++ + [ %% Processes {plugins, Plugins}, {metadata_store, MetadataStoreProc}, @@ -87,13 +108,16 @@ memory() -> %% Metrics {metrics, MetricsETS + MetricsProc}, - {mgmt_db, MgmtDbETS + MgmtDbProc}, + {mgmt_db, MgmtDbETS + MgmtDbProc}] ++ %% ETS + %% queues + lists:zip(QueuesEtsStatsKeys, QueuesEtsStats) ++ + + [ {mnesia, MnesiaETS}, - {quorum_ets, QuorumETS}, {metadata_store_ets, MetadataStoreETS}, - {other_ets, ETS - MnesiaETS - MetricsETS - MgmtDbETS - MsgIndexETS - QuorumETS - MetadataStoreETS}, + {other_ets, ETS - MnesiaETS - MetricsETS - MgmtDbETS - MsgIndexETS - MetadataStoreETS - lists:sum(QueuesEtsStats)}, %% Messages (mostly, some binaries are not messages) {binary, Bin}, @@ -110,6 +134,7 @@ memory() -> {rss, Rss}, {allocated, Allocated}]} ]. + %% [1] - erlang:memory(processes) can be less than the sum of its %% parts. Rather than display something nonsensical, just silence any %% claims about negative memory. See @@ -118,7 +143,9 @@ memory() -> -spec binary() -> rabbit_types:infos(). binary() -> - All = interesting_sups(), + {QueueSupsStatsKeys, QueueStatsSups} = rabbit_queue_type:queue_vm_stats_sups(), + + All = interesting_sups() ++ QueueStatsSups, {Sums, Rest} = sum_processes( lists:append(All), @@ -127,10 +154,10 @@ binary() -> sets:add_element({Ptr, Sz}, Acc0) end, Acc, Info) end, distinguishers(), [{binary, sets:new()}]), - [Other, Qs, Qqs, DlxWorkers, Ssqs, Srqs, Scoor, ConnsReader, ConnsWriter, - ConnsChannel, ConnsOther, MsgIndexProc, MgmtDbProc, Plugins] = + [Other, ConnsReader, ConnsWriter, + ConnsChannel, ConnsOther, MsgIndexProc, MgmtDbProc, Plugins | QueueSupsStats] = [aggregate(Names, [{other, Rest} | Sums], binary, fun sum_binary/1) - || Names <- [[other] | distinguished_interesting_sups()]], + || Names <- [[other] | distinguished_interesting_sups()] ++ QueueStatsSups], MetadataStoreProc = try [{_, B}] = process_info(whereis(rabbit_khepri:get_ra_cluster_name()), [binary]), lists:foldl(fun({_, Sz, _}, Acc) -> @@ -143,13 +170,10 @@ binary() -> [{connection_readers, ConnsReader}, {connection_writers, ConnsWriter}, {connection_channels, ConnsChannel}, - {connection_other, ConnsOther}, - {queue_procs, Qs}, - {quorum_queue_procs, Qqs}, - {quorum_queue_dlx_procs, DlxWorkers}, - {stream_queue_procs, Ssqs}, - {stream_queue_replica_reader_procs, Srqs}, - {stream_queue_coordinator_procs, Scoor}, + {connection_other, ConnsOther}] ++ + %% Queues + lists:zip(QueueSupsStatsKeys, QueueSupsStats) ++ + [ {metadata_store, MetadataStoreProc}, {plugins, Plugins}, {mgmt_db, MgmtDbProc}, @@ -194,19 +218,7 @@ bytes(Words) -> try end. interesting_sups() -> - [queue_sups(), quorum_sups(), dlx_sups(), - stream_server_sups(), stream_reader_sups(), stream_coordinator(), - conn_sups() | interesting_sups0()]. - -queue_sups() -> - all_vhosts_children(rabbit_amqqueue_sup_sup). - -quorum_sups() -> [ra_server_sup_sup]. - -dlx_sups() -> [rabbit_fifo_dlx_sup]. -stream_server_sups() -> [osiris_server_sup]. -stream_reader_sups() -> [osiris_replica_reader_sup]. -stream_coordinator() -> [rabbit_stream_coordinator]. + [conn_sups() | interesting_sups0()]. msg_stores() -> all_vhosts_children(msg_store_transient) @@ -256,12 +268,6 @@ distinguishers() -> with(conn_sups(), fun conn_type/1). distinguished_interesting_sups() -> [ - queue_sups(), - quorum_sups(), - dlx_sups(), - stream_server_sups(), - stream_reader_sups(), - stream_coordinator(), with(conn_sups(), reader), with(conn_sups(), writer), with(conn_sups(), channel), diff --git a/deps/rabbit/test/quorum_queue_SUITE.erl b/deps/rabbit/test/quorum_queue_SUITE.erl index 463445b9f474..af0ef43e84d3 100644 --- a/deps/rabbit/test/quorum_queue_SUITE.erl +++ b/deps/rabbit/test/quorum_queue_SUITE.erl @@ -4270,7 +4270,7 @@ leader_health_check(Config) -> [Q1Data, Q2Data, Q3Data, Q4Data, Q5Data, Q6Data] = QQ_Data = [begin rabbit_ct_broker_helpers:rpc(Config, 0, ra_leaderboard, clear, [Q_ClusterName]), - _QData = amqqueue:to_printable(Q_Res, rabbit_quorum_queue) + rabbit_ct_broker_helpers:rpc(Config, 0, amqqueue, to_printable, [Q_Res, rabbit_quorum_queue]) end || {Q_ClusterName, Q_Res} <- QQ_Clusters], diff --git a/deps/rabbit_common/src/rabbit_registry.erl b/deps/rabbit_common/src/rabbit_registry.erl index 7cdc23dd87a8..59d36a2921b5 100644 --- a/deps/rabbit_common/src/rabbit_registry.erl +++ b/deps/rabbit_common/src/rabbit_registry.erl @@ -15,7 +15,7 @@ code_change/3]). -export([register/3, unregister/2, - binary_to_type/1, lookup_module/2, lookup_all/1]). + binary_to_type/1, lookup_module/2, lookup_type_module/2, lookup_type_name/2, lookup_all/1]). -define(SERVER, ?MODULE). -define(ETS_NAME, ?MODULE). @@ -61,6 +61,61 @@ lookup_module(Class, T) when is_atom(T) -> {error, not_found} end. + +-spec lookup_type_module(Class, TypeDescriptor) -> + Ret when + Class :: atom(), + TypeDescriptor :: atom() | %% can be TypeModule or Type + binary(), %% or what is currently called "alias" - a TypeName + Ret :: {ok, TypeModule} | {error, not_found}, + TypeModule :: atom(). +lookup_type_module(Class, TypeDescriptor) -> + case lookup_type(Class, TypeDescriptor) of + {error, _} = Error -> + Error; + {ok, {_TypeName, TypeModule}} -> + {ok, TypeModule} + end. + +-spec lookup_type_name(Class, TypeDescriptor) -> + Ret when + Class :: atom(), + TypeDescriptor :: atom() | %% either full typemodule or atomized typename + binary(), %% typename or typemodule in binary + Ret :: {ok, binary()} | {error, not_found}. +lookup_type_name(Class, TypeDescriptor) -> + case lookup_type(Class, TypeDescriptor) of + {error, _} = Error -> + Error; + {ok, {TypeName, _TypeModule}} -> + {ok, atom_to_binary(TypeName)} + end. + +lookup_type(Class, TypeDescriptor) + when is_atom(TypeDescriptor) -> + case ets:lookup(?ETS_NAME, {Class, TypeDescriptor}) of + [{_, Module}] -> + {ok, {TypeDescriptor, Module}}; + [] -> + %% In principle it is enough to do the same sanity check + %% we do when registring a type. + %% This however will return false positives for loaded + %% but unregistered modules. + TMMatch = ets:match(?ETS_NAME, {{Class, '$1'}, TypeDescriptor}), + case TMMatch of + [[TypeName]] -> {ok, {TypeName, TypeDescriptor}}; + [] -> + {error, not_found} + end + end; +lookup_type(Class, TypeDescriptor) + when is_binary(TypeDescriptor) -> + %% when we register a type we convert + %% typename to atom so we can lookup + %% only existing atoms. + lookup_type(Class, binary_to_existing_atom(TypeDescriptor)). + + lookup_all(Class) -> [{K, V} || [K, V] <- ets:match(?ETS_NAME, {{Class, '$1'}, '$2'})]. diff --git a/deps/rabbit_common/test/rabbit_registry_SUITE.erl b/deps/rabbit_common/test/rabbit_registry_SUITE.erl new file mode 100644 index 000000000000..fd4b0527297a --- /dev/null +++ b/deps/rabbit_common/test/rabbit_registry_SUITE.erl @@ -0,0 +1,85 @@ +%% This Source Code Form is subject to the terms of the Mozilla Public +%% License, v. 2.0. If a copy of the MPL was not distributed with this +%% file, You can obtain one at https://mozilla.org/MPL/2.0/. +%% +%% Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved. +%% + +-module(rabbit_registry_SUITE). + +-include_lib("eunit/include/eunit.hrl"). +-include_lib("common_test/include/ct.hrl"). + +-compile(export_all). + +all() -> + [ + {group, lookup} + ]. + +groups() -> + [ + {lookup, [], [lookup_type_module, + lookup_type_name + ]} + ]. + +%% ------------------------------------------------------------------- +%% Testsuite setup/teardown. +%% ------------------------------------------------------------------- + +init_per_suite(Config) -> + Config. + +end_per_suite(_Config) -> + ok. + +init_per_group(_, Config) -> + Config. + +end_per_group(_, _Config) -> + ok. + +init_per_testcase(_Testcase, Config) -> + {ok, RPid} = rabbit_registry:start_link(), + [{registry_pid, RPid} | Config]. + +end_per_testcase(_Testcase, Config) -> + RPid = ?config(registry_pid, Config), + gen_server:stop(RPid), + ok. + +%% ------------------------------------------------------------------- +%% Testcases. +%% ------------------------------------------------------------------- + +lookup_type_module(_Config) -> + ?assertMatch({error, not_found}, rabbit_registry:lookup_type_module(runtime_parameter, <<"param">>)), + ?assertMatch({error, not_found}, rabbit_registry:lookup_type_module(runtime_parameter, param)), + ?assertMatch({error, not_found}, rabbit_registry:lookup_type_module(runtime_parameter, runtime_parameter_test)), + + ok = rabbit_registry:register(runtime_parameter, <<"param">>, rabbit_runtime_parameter_registry_test), + + ?assertMatch({ok, rabbit_runtime_parameter_registry_test}, rabbit_registry:lookup_type_module(runtime_parameter, <<"param">>)), + ?assertMatch({ok, rabbit_runtime_parameter_registry_test}, rabbit_registry:lookup_type_module(runtime_parameter, param)), + ?assertMatch({ok, rabbit_runtime_parameter_registry_test}, rabbit_registry:lookup_type_module(runtime_parameter, rabbit_runtime_parameter_registry_test)), + + ?assertMatch({error, not_found}, rabbit_registry:lookup_type_module(runtime_parameter, another_param)). + +lookup_type_name(_Config) -> + ?assertMatch({error, not_found}, rabbit_registry:lookup_type_name(runtime_parameter, <<"param">>)), + ?assertMatch({error, not_found}, rabbit_registry:lookup_type_module(runtime_parameter, param)), + ?assertMatch({error, not_found}, rabbit_registry:lookup_type_module(runtime_parameter, runtime_parameter_test)), + + ok = rabbit_registry:register(runtime_parameter, <<"param">>, rabbit_runtime_parameter_registry_test), + + ?assertMatch({ok, <<"param">>}, rabbit_registry:lookup_type_name(runtime_parameter, <<"param">>)), + ?assertMatch({ok, <<"param">>}, rabbit_registry:lookup_type_name(runtime_parameter, param)), + ?assertMatch({ok, <<"param">>}, rabbit_registry:lookup_type_name(runtime_parameter, rabbit_runtime_parameter_registry_test)), + + ?assertMatch({error, not_found}, rabbit_registry:lookup_type_name(runtime_parameter, another_param)). + + +%% ------------------------------------------------------------------- +%% Utility. +%% ------------------------------------------------------------------- diff --git a/deps/rabbit_common/test/rabbit_runtime_parameter_registry_test.erl b/deps/rabbit_common/test/rabbit_runtime_parameter_registry_test.erl new file mode 100644 index 000000000000..5b94929b137d --- /dev/null +++ b/deps/rabbit_common/test/rabbit_runtime_parameter_registry_test.erl @@ -0,0 +1,18 @@ +-module(rabbit_runtime_parameter_registry_test). + +-behaviour(rabbit_runtime_parameter). + +-export([ + validate/5, + notify/5, + notify_clear/4 + ]). + +validate(_, _, _, _, _) -> + ok. + +notify(_, _, _, _, _) -> + ok. + +notify_clear(_, _, _, _) -> + ok. diff --git a/deps/rabbitmq_mqtt/src/rabbit_mqtt_qos0_queue.erl b/deps/rabbitmq_mqtt/src/rabbit_mqtt_qos0_queue.erl index d0201e7a7d9f..785a88a9aea3 100644 --- a/deps/rabbitmq_mqtt/src/rabbit_mqtt_qos0_queue.erl +++ b/deps/rabbitmq_mqtt/src/rabbit_mqtt_qos0_queue.erl @@ -41,6 +41,16 @@ notify_decorators/1 ]). +-export([queue_topology/1, + feature_flag_name/0, + policy_apply_to_name/0, + stop/1, + list_with_minimum_quorum/0, + drain/1, + revive/0, + queue_vm_stats_sups/0, + queue_vm_ets/0]). + %% Stateful rabbit_queue_type callbacks are unsupported by this queue type. -define(STATEFUL_CALLBACKS, [ @@ -222,10 +232,14 @@ format(Q, _Ctx) -> [{type, ?MODULE}, {state, amqqueue:get_state(Q)}]. --spec capabilities() -> - #{atom() := term()}. capabilities() -> - #{}. + #{can_redeliver => false, + consumer_arguments => [], + is_replicable => false, + queue_arguments => [], + rebalance_module => undefined, + server_named => true, + unsupported_policies => []}. -spec info(amqqueue:amqqueue(), all_keys | rabbit_types:info_keys()) -> rabbit_types:infos(). @@ -301,3 +315,34 @@ dequeue(A1,A2,A3,A4,A5) -> state_info(A1) -> ?UNSUPPORTED([A1]). + +-spec queue_topology(amqqueue:amqqueue()) -> + {Leader :: undefined | node(), Replicas :: undefined | [node(),...]}. +queue_topology(Q) -> + Pid = amqqueue:get_pid(Q), + Node = node(Pid), + {Node, [Node]}. + +feature_flag_name() -> + undefined. + +policy_apply_to_name() -> + <<"qos0_queues">>. + +stop(_VHost) -> + ok. + +list_with_minimum_quorum() -> + []. + +drain(_TransferCandidates) -> + ok. + +revive() -> + ok. + +queue_vm_stats_sups() -> + {[], []}. + +queue_vm_ets() -> + {[], []}. diff --git a/deps/rabbitmq_mqtt/test/mqtt_shared_SUITE.erl b/deps/rabbitmq_mqtt/test/mqtt_shared_SUITE.erl index 09bae18c37fe..be11044f7f4b 100644 --- a/deps/rabbitmq_mqtt/test/mqtt_shared_SUITE.erl +++ b/deps/rabbitmq_mqtt/test/mqtt_shared_SUITE.erl @@ -205,6 +205,12 @@ init_per_testcase(T, Config) T =:= management_plugin_enable -> inets:start(), init_per_testcase0(T, Config); +init_per_testcase(T, Config) + when T =:= clean_session_disconnect_client; + T =:= clean_session_node_restart; + T =:= clean_session_node_kill -> + ok = rpc(Config, rabbit_registry, register, [queue, <<"qos0">>, rabbit_mqtt_qos0_queue]), + init_per_testcase0(T, Config); init_per_testcase(Testcase, Config) -> init_per_testcase0(Testcase, Config). @@ -216,6 +222,12 @@ end_per_testcase(T, Config) T =:= management_plugin_enable -> ok = inets:stop(), end_per_testcase0(T, Config); +end_per_testcase(T, Config) + when T =:= clean_session_disconnect_client; + T =:= clean_session_node_restart; + T =:= clean_session_node_kill -> + ok = rpc(Config, rabbit_registry, unregister, [queue, <<"qos0">>]), + end_per_testcase0(T, Config); end_per_testcase(Testcase, Config) -> end_per_testcase0(Testcase, Config).