Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions deps/rabbit/include/amqqueue.hrl
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,10 @@
(?is_amqqueue_v2(Q) andalso
?amqqueue_v2_field_type(Q) =:= Type)).

-define(amqqueue_type(Q),
(?is_amqqueue_v2(Q) andalso
?amqqueue_v2_field_type(Q))).

-define(amqqueue_has_valid_pid(Q),
(?is_amqqueue_v2(Q) andalso
is_pid(?amqqueue_v2_field_pid(Q)))).
Expand Down
52 changes: 42 additions & 10 deletions deps/rabbit/src/amqqueue.erl
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,10 @@
is_exclusive/1,
is_classic/1,
is_quorum/1,
is_internal/1,
internal_owner/1,
make_internal/1,
make_internal/2,
pattern_match_all/0,
pattern_match_on_name/1,
pattern_match_on_type/1,
Expand All @@ -76,6 +80,8 @@
-define(is_backwards_compat_classic(T),
(T =:= classic orelse T =:= ?amqqueue_v1_type)).

-type amqqueue_options() :: map() | ets:match_pattern().

-record(amqqueue, {
%% immutable
name :: rabbit_amqqueue:name() | ets:match_pattern(),
Expand Down Expand Up @@ -106,7 +112,7 @@
slave_pids_pending_shutdown = [], %% reserved
%% secondary index
vhost :: rabbit_types:vhost() | undefined | ets:match_pattern(),
options = #{} :: map() | ets:match_pattern(),
options = #{} :: amqqueue_options(),
type = ?amqqueue_v1_type :: module() | ets:match_pattern(),
type_state = #{} :: map() | ets:match_pattern()
}).
Expand Down Expand Up @@ -349,6 +355,19 @@ get_arguments(#amqqueue{arguments = Args}) ->
set_arguments(#amqqueue{} = Queue, Args) ->
Queue#amqqueue{arguments = Args}.

% options

-spec get_options(amqqueue()) -> amqqueue_options().

get_options(#amqqueue{options = Options}) ->
Options.

-spec set_options(amqqueue(), amqqueue_options()) -> amqqueue().

set_options(#amqqueue{} = Queue, Options) ->
Queue#amqqueue{options = Options}.


% decorators

-spec get_decorators(amqqueue()) -> [atom()] | none | undefined.
Expand All @@ -368,6 +387,7 @@ get_exclusive_owner(#amqqueue{exclusive_owner = Owner}) ->

-spec get_leader(amqqueue_v2()) -> node().

%% TODO: not only qqs can have leaders, dispatch via queue type
get_leader(#amqqueue{type = rabbit_quorum_queue, pid = {_, Leader}}) -> Leader.

% operator_policy
Expand All @@ -393,15 +413,6 @@ get_name(#amqqueue{name = Name}) -> Name.
set_name(#amqqueue{} = Queue, Name) ->
Queue#amqqueue{name = Name}.

-spec get_options(amqqueue()) -> map().

get_options(#amqqueue{options = Options}) -> Options.

-spec set_options(amqqueue(), map()) -> amqqueue().

set_options(#amqqueue{} = Queue, Options) ->
Queue#amqqueue{options = Options}.

% pid

-spec get_pid(amqqueue_v2()) -> pid() | ra_server_id() | none.
Expand Down Expand Up @@ -495,6 +506,27 @@ is_classic(Queue) ->
is_quorum(Queue) ->
get_type(Queue) =:= rabbit_quorum_queue.

-spec is_internal(amqqueue()) -> boolean().

is_internal(#amqqueue{options = #{internal := true}}) -> true;
is_internal(#amqqueue{}) -> false.

-spec internal_owner(amqqueue()) -> #resource{}.

internal_owner(#amqqueue{options = #{internal := true,
internal_owner := IOwner}}) ->
IOwner;
internal_owner(#amqqueue{}) ->
undefined.

make_internal(Q = #amqqueue{options = Options}) when is_map(Options) ->
Q#amqqueue{options = maps:merge(Options, #{internal => true,
internal_owner => undefined})}.
make_internal(Q = #amqqueue{options = Options}, Owner)
when is_map(Options) andalso is_record(Owner, resource) ->
Q#amqqueue{options = maps:merge(Options, #{internal => true,
interna_owner => Owner})}.

fields() ->
fields(?record_version).

Expand Down
28 changes: 2 additions & 26 deletions deps/rabbit/src/rabbit_amqp_management.erl
Original file line number Diff line number Diff line change
Expand Up @@ -461,32 +461,8 @@ encode_queue(Q, NumMsgs, NumConsumers) ->
-spec queue_topology(amqqueue:amqqueue()) ->
{Leader :: undefined | node(), Replicas :: undefined | [node(),...]}.
queue_topology(Q) ->
case amqqueue:get_type(Q) of
rabbit_quorum_queue ->
[{leader, Leader0},
{members, Members}] = rabbit_queue_type:info(Q, [leader, members]),
Leader = case Leader0 of
'' -> undefined;
_ -> Leader0
end,
{Leader, Members};
rabbit_stream_queue ->
#{name := StreamId} = amqqueue:get_type_state(Q),
case rabbit_stream_coordinator:members(StreamId) of
{ok, Members} ->
maps:fold(fun(Node, {_Pid, writer}, {_, Replicas}) ->
{Node, [Node | Replicas]};
(Node, {_Pid, replica}, {Writer, Replicas}) ->
{Writer, [Node | Replicas]}
end, {undefined, []}, Members);
{error, _} ->
{undefined, undefined}
end;
_ ->
Pid = amqqueue:get_pid(Q),
Node = node(Pid),
{Node, [Node]}
end.
Type = amqqueue:get_type(Q),
Type:queue_topology(Q).

decode_exchange({map, KVList}) ->
M = lists:foldl(
Expand Down
80 changes: 58 additions & 22 deletions deps/rabbit/src/rabbit_amqqueue.erl
Original file line number Diff line number Diff line change
Expand Up @@ -150,11 +150,7 @@ filter_pid_per_type(QPids) ->

-spec stop(rabbit_types:vhost()) -> 'ok'.
stop(VHost) ->
%% Classic queues
ok = rabbit_amqqueue_sup_sup:stop_for_vhost(VHost),
{ok, BQ} = application:get_env(rabbit, backing_queue_module),
ok = BQ:stop(VHost),
rabbit_quorum_queue:stop(VHost).
rabbit_queue_type:stop(VHost).

-spec start([amqqueue:amqqueue()]) -> 'ok'.

Expand Down Expand Up @@ -424,6 +420,8 @@ rebalance(Type, VhostSpec, QueueSpec) ->
%% We have not yet acquired the rebalance_queues global lock.
maybe_rebalance(get_rebalance_lock(self()), Type, VhostSpec, QueueSpec).

%% TODO: classic queues do not support rebalancing, it looks like they are simply
%% filtered out with is_replicated(Q). Maybe error instead?
maybe_rebalance({true, Id}, Type, VhostSpec, QueueSpec) ->
rabbit_log:info("Starting queue rebalance operation: '~ts' for vhosts matching '~ts' and queues matching '~ts'",
[Type, VhostSpec, QueueSpec]),
Expand Down Expand Up @@ -459,10 +457,15 @@ filter_per_type(stream, Q) ->
filter_per_type(classic, Q) ->
?amqqueue_is_classic(Q).

rebalance_module(Q) when ?amqqueue_is_quorum(Q) ->
rabbit_quorum_queue;
rebalance_module(Q) when ?amqqueue_is_stream(Q) ->
rabbit_stream_queue.
%% TODO: note that it can return {error, not_supported}.
%% this will result in a badmatch. However that's fine
%% for now because the original function will fail with
%% bad clause if called with classical queue.
%% The assumption is all non-replicated queues
%% are filtered before calling this with is_replicated/0
rebalance_module(Q) ->
TypeModule = ?amqqueue_type(Q),
TypeModule:rebalance_module().

get_resource_name(#resource{name = Name}) ->
Name.
Expand All @@ -487,13 +490,19 @@ iterative_rebalance(ByNode, MaxQueuesDesired) ->
maybe_migrate(ByNode, MaxQueuesDesired) ->
maybe_migrate(ByNode, MaxQueuesDesired, maps:keys(ByNode)).

%% TODO: unfortunate part - UI bits mixed deep inside logic.
%% I will not be moving this inside queue type. Instead
%% an attempt to generate something more readable than
%% Other made.
column_name(rabbit_classic_queue) -> <<"Number of replicated classic queues">>;
column_name(rabbit_quorum_queue) -> <<"Number of quorum queues">>;
column_name(rabbit_stream_queue) -> <<"Number of streams">>;
column_name(Other) -> Other.
column_name(TypeModule) ->
Alias = rabbit_queue_type:short_alias_of(TypeModule),
<<"Number of \"", Alias/binary, "\" queues">>.

maybe_migrate(ByNode, _, []) ->
ByNodeAndType = maps:map(fun(_Node, Queues) -> maps:groups_from_list(fun({_, Q, _}) -> column_name(?amqqueue_v2_field_type(Q)) end, Queues) end, ByNode),
ByNodeAndType = maps:map(fun(_Node, Queues) -> maps:groups_from_list(fun({_, Q, _}) -> column_name(?amqqueue_type(Q)) end, Queues) end, ByNode),
CountByNodeAndType = maps:map(fun(_Node, Type) -> maps:map(fun (_, Qs)-> length(Qs) end, Type) end, ByNodeAndType),
{ok, maps:values(maps:map(fun(Node,Counts) -> [{<<"Node name">>, Node} | maps:to_list(Counts)] end, CountByNodeAndType))};
maybe_migrate(ByNode, MaxQueuesDesired, [N | Nodes]) ->
Expand Down Expand Up @@ -811,6 +820,35 @@ check_exclusive_access(Q, _ReaderPid, _MatchType) ->
"match that of the original declaration.",
[rabbit_misc:rs(QueueName)]).

-spec check_internal(amqqueue:amqqueue(), rabbit_types:username()) ->
'ok' | rabbit_types:channel_exit().
check_internal(Q, Username) ->
case amqqueue:is_internal(Q) of
true ->
case Username of
%% note cli delete command uses "cli_user"
?INTERNAL_USER ->
ok;
_ ->
QueueName = amqqueue:get_name(Q),
case amqqueue:internal_owner(Q) of
undefined ->
rabbit_misc:protocol_error(
resource_locked,
"Cannot delete protected ~ts.",
[rabbit_misc:rs(QueueName)]);
IOwner ->
rabbit_misc:protocol_error(
resource_locked,
"Cannot delete protected ~ts. It was "
"declared as an protected and can be deleted only by deleting the owner entity: ~ts",
[rabbit_misc:rs(QueueName), rabbit_misc:rs(IOwner)])
end
end;
false ->
ok
end.

-spec with_exclusive_access_or_die(name(), pid(), qfun(A)) ->
A | rabbit_types:channel_exit().
with_exclusive_access_or_die(Name, ReaderPid, F) ->
Expand Down Expand Up @@ -1252,14 +1290,12 @@ list_durable() ->

-spec list_by_type(atom()) -> [amqqueue:amqqueue()].

list_by_type(classic) -> list_by_type(rabbit_classic_queue);
list_by_type(quorum) -> list_by_type(rabbit_quorum_queue);
list_by_type(stream) -> list_by_type(rabbit_stream_queue);
list_by_type(Type) ->
rabbit_db_queue:get_all_durable_by_type(Type).
list_by_type(TypeDescriptor) ->
TypeModule = rabbit_queue_type:discover(TypeDescriptor),
rabbit_db_queue:get_all_durable_by_type(TypeModule).

%% TODO: looks unused
-spec list_local_quorum_queue_names() -> [name()].

list_local_quorum_queue_names() ->
[ amqqueue:get_name(Q) || Q <- list_by_type(quorum),
amqqueue:get_state(Q) =/= crashed,
Expand Down Expand Up @@ -1296,6 +1332,7 @@ list_local_followers() ->
rabbit_quorum_queue:is_recoverable(Q)
].

%% TODO: looks unused
-spec list_local_quorum_queues_with_name_matching(binary()) -> [amqqueue:amqqueue()].
list_local_quorum_queues_with_name_matching(Pattern) ->
[ Q || Q <- list_by_type(quorum),
Expand Down Expand Up @@ -1681,6 +1718,7 @@ delete_with(QueueName, ConnPid, IfUnused, IfEmpty, Username, CheckExclusive) whe
case with(
QueueName,
fun (Q) ->
ok = check_internal(Q, Username),
if CheckExclusive ->
check_exclusive_access(Q, ConnPid);
true ->
Expand Down Expand Up @@ -1882,11 +1920,9 @@ run_backing_queue(QPid, Mod, Fun) ->

-spec is_replicated(amqqueue:amqqueue()) -> boolean().

is_replicated(Q) when ?amqqueue_is_classic(Q) ->
false;
is_replicated(_Q) ->
%% streams and quorum queues are all replicated
true.
is_replicated(Q) ->
TypeModule = ?amqqueue_type(Q),
TypeModule:is_replicated().

is_exclusive(Q) when ?amqqueue_exclusive_owner_is(Q, none) ->
false;
Expand Down
2 changes: 2 additions & 0 deletions deps/rabbit/src/rabbit_boot_steps.erl
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
%%
%% Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.
%%
%% README: https://github.com/rabbitmq/internals/blob/master/rabbit_boot_process.md
%%

-module(rabbit_boot_steps).

Expand Down
17 changes: 12 additions & 5 deletions deps/rabbit/src/rabbit_channel.erl
Original file line number Diff line number Diff line change
Expand Up @@ -1265,11 +1265,7 @@ handle_method(#'basic.get'{queue = QueueNameBin, no_ack = NoAck},
?INCR_STATS(queue_stats, QueueName, 1, get_empty, State),
{reply, #'basic.get_empty'{}, State#ch{queue_states = QueueStates}};
{error, {unsupported, single_active_consumer}} ->
rabbit_misc:protocol_error(
resource_locked,
"cannot obtain access to locked ~ts. basic.get operations "
"are not supported by quorum queues with single active consumer",
[rabbit_misc:rs(QueueName)]);
rabbit_amqqueue:with_or_die(QueueName, fun unsupported_single_active_consumer_error/1);
{error, Reason} ->
%% TODO add queue type to error message
rabbit_misc:protocol_error(internal_error,
Expand Down Expand Up @@ -2005,6 +2001,7 @@ foreach_per_queue(_F, [], Acc) ->
foreach_per_queue(F, [#pending_ack{tag = CTag,
queue = QName,
msg_id = MsgId}], Acc) ->
%% TODO: fix this abstraction leak
%% quorum queue, needs the consumer tag
F({QName, CTag}, [MsgId], Acc);
foreach_per_queue(F, UAL, Acc) ->
Expand Down Expand Up @@ -2032,6 +2029,7 @@ notify_limiter(Limiter, Acked) ->
case rabbit_limiter:is_active(Limiter) of
false -> ok;
true -> case lists:foldl(fun (#pending_ack{tag = CTag}, Acc) when is_integer(CTag) ->
%% TODO: fix absctraction leak
%% Quorum queues use integer CTags
%% classic queues use binaries
%% Quorum queues do not interact
Expand Down Expand Up @@ -2792,3 +2790,12 @@ maybe_decrease_global_publishers(#ch{publishing_mode = true}) ->

is_global_qos_permitted() ->
rabbit_deprecated_features:is_permitted(global_qos).

-spec unsupported_single_active_consumer_error(amqqueue:amqqueue()) -> no_return().
unsupported_single_active_consumer_error(Q) ->
rabbit_misc:protocol_error(
resource_locked,
"cannot obtain access to locked ~ts. basic.get operations "
"are not supported by ~p queues with single active consumer",
[rabbit_misc:rs(amqqueue:get_name(Q)),
rabbit_queue_type:short_alias_of(amqqueue:get_type(Q))]).
Loading
Loading