Skip to content

Commit 3c1a890

Browse files
committed
Enable adding queues with plugins, core
1 parent dc0d473 commit 3c1a890

25 files changed

+702
-353
lines changed

deps/rabbit/include/amqqueue.hrl

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,10 @@
4848
(?is_amqqueue_v2(Q) andalso
4949
?amqqueue_v2_field_type(Q) =:= Type)).
5050

51+
-define(amqqueue_type(Q),
52+
(?is_amqqueue_v2(Q) andalso
53+
?amqqueue_v2_field_type(Q))).
54+
5155
-define(amqqueue_has_valid_pid(Q),
5256
(?is_amqqueue_v2(Q) andalso
5357
is_pid(?amqqueue_v2_field_pid(Q)))).

deps/rabbit/src/amqqueue.erl

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -368,6 +368,7 @@ get_exclusive_owner(#amqqueue{exclusive_owner = Owner}) ->
368368

369369
-spec get_leader(amqqueue_v2()) -> node().
370370

371+
%% TODO: not only qqs can have leaders, dispatch via queue type
371372
get_leader(#amqqueue{type = rabbit_quorum_queue, pid = {_, Leader}}) -> Leader.
372373

373374
% operator_policy

deps/rabbit/src/rabbit_amqp_management.erl

Lines changed: 2 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -461,32 +461,8 @@ encode_queue(Q, NumMsgs, NumConsumers) ->
461461
-spec queue_topology(amqqueue:amqqueue()) ->
462462
{Leader :: undefined | node(), Replicas :: undefined | [node(),...]}.
463463
queue_topology(Q) ->
464-
case amqqueue:get_type(Q) of
465-
rabbit_quorum_queue ->
466-
[{leader, Leader0},
467-
{members, Members}] = rabbit_queue_type:info(Q, [leader, members]),
468-
Leader = case Leader0 of
469-
'' -> undefined;
470-
_ -> Leader0
471-
end,
472-
{Leader, Members};
473-
rabbit_stream_queue ->
474-
#{name := StreamId} = amqqueue:get_type_state(Q),
475-
case rabbit_stream_coordinator:members(StreamId) of
476-
{ok, Members} ->
477-
maps:fold(fun(Node, {_Pid, writer}, {_, Replicas}) ->
478-
{Node, [Node | Replicas]};
479-
(Node, {_Pid, replica}, {Writer, Replicas}) ->
480-
{Writer, [Node | Replicas]}
481-
end, {undefined, []}, Members);
482-
{error, _} ->
483-
{undefined, undefined}
484-
end;
485-
_ ->
486-
Pid = amqqueue:get_pid(Q),
487-
Node = node(Pid),
488-
{Node, [Node]}
489-
end.
464+
Type = amqqueue:get_type(Q),
465+
Type:queue_topology(Q).
490466

491467
decode_exchange({map, KVList}) ->
492468
M = lists:foldl(

deps/rabbit/src/rabbit_amqqueue.erl

Lines changed: 28 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -150,11 +150,7 @@ filter_pid_per_type(QPids) ->
150150

151151
-spec stop(rabbit_types:vhost()) -> 'ok'.
152152
stop(VHost) ->
153-
%% Classic queues
154-
ok = rabbit_amqqueue_sup_sup:stop_for_vhost(VHost),
155-
{ok, BQ} = application:get_env(rabbit, backing_queue_module),
156-
ok = BQ:stop(VHost),
157-
rabbit_quorum_queue:stop(VHost).
153+
rabbit_queue_type:stop(VHost).
158154

159155
-spec start([amqqueue:amqqueue()]) -> 'ok'.
160156

@@ -424,6 +420,8 @@ rebalance(Type, VhostSpec, QueueSpec) ->
424420
%% We have not yet acquired the rebalance_queues global lock.
425421
maybe_rebalance(get_rebalance_lock(self()), Type, VhostSpec, QueueSpec).
426422

423+
%% TODO: classic queues do not support rebalancing, it looks like they are simply
424+
%% filtered out with is_replicated(Q). Maybe error instead?
427425
maybe_rebalance({true, Id}, Type, VhostSpec, QueueSpec) ->
428426
rabbit_log:info("Starting queue rebalance operation: '~ts' for vhosts matching '~ts' and queues matching '~ts'",
429427
[Type, VhostSpec, QueueSpec]),
@@ -459,10 +457,15 @@ filter_per_type(stream, Q) ->
459457
filter_per_type(classic, Q) ->
460458
?amqqueue_is_classic(Q).
461459

462-
rebalance_module(Q) when ?amqqueue_is_quorum(Q) ->
463-
rabbit_quorum_queue;
464-
rebalance_module(Q) when ?amqqueue_is_stream(Q) ->
465-
rabbit_stream_queue.
460+
%% TODO: note that it can return {error, not_supported}.
461+
%% this will result in a badmatch. However that's fine
462+
%% for now because the original function will fail with
463+
%% bad clause if called with classical queue.
464+
%% The assumption is all non-replicated queues
465+
%% are filtered before calling this with is_replicated/0
466+
rebalance_module(Q) ->
467+
TypeModule = ?amqqueue_type(Q),
468+
TypeModule:rebalance_module().
466469

467470
get_resource_name(#resource{name = Name}) ->
468471
Name.
@@ -487,13 +490,19 @@ iterative_rebalance(ByNode, MaxQueuesDesired) ->
487490
maybe_migrate(ByNode, MaxQueuesDesired) ->
488491
maybe_migrate(ByNode, MaxQueuesDesired, maps:keys(ByNode)).
489492

493+
%% TODO: unfortunate part - UI bits mixed deep inside logic.
494+
%% I will not be moving this inside queue type. Instead
495+
%% an attempt to generate something more readable than
496+
%% Other made.
490497
column_name(rabbit_classic_queue) -> <<"Number of replicated classic queues">>;
491498
column_name(rabbit_quorum_queue) -> <<"Number of quorum queues">>;
492499
column_name(rabbit_stream_queue) -> <<"Number of streams">>;
493-
column_name(Other) -> Other.
500+
column_name(TypeModule) ->
501+
Alias = rabbit_queue_type:short_alias_of(TypeModule),
502+
<<"Number of \"", Alias/binary, "\" queues">>.
494503

495504
maybe_migrate(ByNode, _, []) ->
496-
ByNodeAndType = maps:map(fun(_Node, Queues) -> maps:groups_from_list(fun({_, Q, _}) -> column_name(?amqqueue_v2_field_type(Q)) end, Queues) end, ByNode),
505+
ByNodeAndType = maps:map(fun(_Node, Queues) -> maps:groups_from_list(fun({_, Q, _}) -> column_name(?amqqueue_type(Q)) end, Queues) end, ByNode),
497506
CountByNodeAndType = maps:map(fun(_Node, Type) -> maps:map(fun (_, Qs)-> length(Qs) end, Type) end, ByNodeAndType),
498507
{ok, maps:values(maps:map(fun(Node,Counts) -> [{<<"Node name">>, Node} | maps:to_list(Counts)] end, CountByNodeAndType))};
499508
maybe_migrate(ByNode, MaxQueuesDesired, [N | Nodes]) ->
@@ -1252,14 +1261,12 @@ list_durable() ->
12521261

12531262
-spec list_by_type(atom()) -> [amqqueue:amqqueue()].
12541263

1255-
list_by_type(classic) -> list_by_type(rabbit_classic_queue);
1256-
list_by_type(quorum) -> list_by_type(rabbit_quorum_queue);
1257-
list_by_type(stream) -> list_by_type(rabbit_stream_queue);
1258-
list_by_type(Type) ->
1259-
rabbit_db_queue:get_all_durable_by_type(Type).
1264+
list_by_type(TypeDescriptor) ->
1265+
TypeModule = rabbit_queue_type:discover(TypeDescriptor),
1266+
rabbit_db_queue:get_all_durable_by_type(TypeModule).
12601267

1268+
%% TODO: looks unused
12611269
-spec list_local_quorum_queue_names() -> [name()].
1262-
12631270
list_local_quorum_queue_names() ->
12641271
[ amqqueue:get_name(Q) || Q <- list_by_type(quorum),
12651272
amqqueue:get_state(Q) =/= crashed,
@@ -1296,6 +1303,7 @@ list_local_followers() ->
12961303
rabbit_quorum_queue:is_recoverable(Q)
12971304
].
12981305

1306+
%% TODO: looks unused
12991307
-spec list_local_quorum_queues_with_name_matching(binary()) -> [amqqueue:amqqueue()].
13001308
list_local_quorum_queues_with_name_matching(Pattern) ->
13011309
[ Q || Q <- list_by_type(quorum),
@@ -1882,11 +1890,9 @@ run_backing_queue(QPid, Mod, Fun) ->
18821890

18831891
-spec is_replicated(amqqueue:amqqueue()) -> boolean().
18841892

1885-
is_replicated(Q) when ?amqqueue_is_classic(Q) ->
1886-
false;
1887-
is_replicated(_Q) ->
1888-
%% streams and quorum queues are all replicated
1889-
true.
1893+
is_replicated(Q) ->
1894+
TypeModule = ?amqqueue_type(Q),
1895+
TypeModule:is_replicated().
18901896

18911897
is_exclusive(Q) when ?amqqueue_exclusive_owner_is(Q, none) ->
18921898
false;

deps/rabbit/src/rabbit_boot_steps.erl

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@
44
%%
55
%% Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.
66
%%
7+
%% README: https://github.com/rabbitmq/internals/blob/master/rabbit_boot_process.md
8+
%%
79

810
-module(rabbit_boot_steps).
911

deps/rabbit/src/rabbit_channel.erl

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1265,11 +1265,7 @@ handle_method(#'basic.get'{queue = QueueNameBin, no_ack = NoAck},
12651265
?INCR_STATS(queue_stats, QueueName, 1, get_empty, State),
12661266
{reply, #'basic.get_empty'{}, State#ch{queue_states = QueueStates}};
12671267
{error, {unsupported, single_active_consumer}} ->
1268-
rabbit_misc:protocol_error(
1269-
resource_locked,
1270-
"cannot obtain access to locked ~ts. basic.get operations "
1271-
"are not supported by quorum queues with single active consumer",
1272-
[rabbit_misc:rs(QueueName)]);
1268+
rabbit_amqqueue:with_or_die(QueueName, fun unsupported_single_active_consumer_error/1);
12731269
{error, Reason} ->
12741270
%% TODO add queue type to error message
12751271
rabbit_misc:protocol_error(internal_error,
@@ -2005,6 +2001,7 @@ foreach_per_queue(_F, [], Acc) ->
20052001
foreach_per_queue(F, [#pending_ack{tag = CTag,
20062002
queue = QName,
20072003
msg_id = MsgId}], Acc) ->
2004+
%% TODO: fix this abstraction leak
20082005
%% quorum queue, needs the consumer tag
20092006
F({QName, CTag}, [MsgId], Acc);
20102007
foreach_per_queue(F, UAL, Acc) ->
@@ -2032,6 +2029,7 @@ notify_limiter(Limiter, Acked) ->
20322029
case rabbit_limiter:is_active(Limiter) of
20332030
false -> ok;
20342031
true -> case lists:foldl(fun (#pending_ack{tag = CTag}, Acc) when is_integer(CTag) ->
2032+
%% TODO: fix absctraction leak
20352033
%% Quorum queues use integer CTags
20362034
%% classic queues use binaries
20372035
%% Quorum queues do not interact
@@ -2792,3 +2790,12 @@ maybe_decrease_global_publishers(#ch{publishing_mode = true}) ->
27922790

27932791
is_global_qos_permitted() ->
27942792
rabbit_deprecated_features:is_permitted(global_qos).
2793+
2794+
-spec unsupported_single_active_consumer_error(amqqueue:amqqueue()) -> no_return().
2795+
unsupported_single_active_consumer_error(Q) ->
2796+
rabbit_misc:protocol_error(
2797+
resource_locked,
2798+
"cannot obtain access to locked ~ts. basic.get operations "
2799+
"are not supported by ~p queues with single active consumer",
2800+
[rabbit_misc:rs(amqqueue:get_name(Q)),
2801+
rabbit_queue_type:short_alias_of(amqqueue:get_type(Q))]).

deps/rabbit/src/rabbit_classic_queue.erl

Lines changed: 78 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,8 +64,32 @@
6464
send_drained_credit_api_v1/4,
6565
send_credit_reply/7]).
6666

67+
-export([queue_topology/1,
68+
feature_flag_name/0,
69+
policy_apply_to_name/0,
70+
can_redeliver/0,
71+
stop/1,
72+
is_replicated/0,
73+
rebalance_module/0,
74+
list_with_minimum_quorum/0,
75+
drain/1,
76+
revive/0,
77+
queue_vm_stats_sups/0,
78+
queue_vm_ets/0,
79+
dir_base/0]).
80+
6781
-export([validate_policy/1]).
6882

83+
-rabbit_boot_step(
84+
{rabbit_classic_queue_type,
85+
[{description, "Classic queue: queue type"},
86+
{mfa, {rabbit_registry, register,
87+
[queue, <<"classic">>, ?MODULE]}},
88+
{cleanup, {rabbit_registry, unregister,
89+
[queue, <<"classic">>]}},
90+
{requires, rabbit_registry},
91+
{enables, ?MODULE}]}).
92+
6993
-rabbit_boot_step(
7094
{?MODULE,
7195
[{description, "Deprecated queue-master-locator support."
@@ -74,7 +98,7 @@
7498
[policy_validator, <<"queue-master-locator">>, ?MODULE]}},
7599
{mfa, {rabbit_registry, register,
76100
[operator_policy_validator, <<"queue-master-locator">>, ?MODULE]}},
77-
{requires, rabbit_registry},
101+
{requires, [rabbit_classic_queue_type]},
78102
{enables, recovery}]}).
79103

80104
validate_policy(Args) ->
@@ -674,3 +698,56 @@ send_credit_reply(Pid, QName, Ctag, DeliveryCount, Credit, Available, Drain) ->
674698

675699
send_queue_event(Pid, QName, Event) ->
676700
gen_server:cast(Pid, {queue_event, QName, Event}).
701+
702+
-spec queue_topology(amqqueue:amqqueue()) ->
703+
{Leader :: undefined | node(), Replicas :: undefined | [node(),...]}.
704+
queue_topology(Q) ->
705+
Pid = amqqueue:get_pid(Q),
706+
Node = node(Pid),
707+
{Node, [Node]}.
708+
709+
feature_flag_name() ->
710+
undefined.
711+
712+
policy_apply_to_name() ->
713+
<<"classic_queues">>.
714+
715+
can_redeliver() ->
716+
true.
717+
718+
stop(VHost) ->
719+
ok = rabbit_amqqueue_sup_sup:stop_for_vhost(VHost),
720+
{ok, BQ} = application:get_env(rabbit, backing_queue_module),
721+
ok = BQ:stop(VHost).
722+
723+
is_replicated() ->
724+
false.
725+
726+
rebalance_module() ->
727+
{error, not_supported}.
728+
729+
list_with_minimum_quorum() ->
730+
[].
731+
732+
drain(_TransferCandidates) ->
733+
ok.
734+
735+
revive() ->
736+
ok.
737+
738+
queue_vm_stats_sups() ->
739+
{[queue_procs], [rabbit_vm:all_vhosts_children(rabbit_amqqueue_sup_sup)]}.
740+
741+
%% return nothing because of this line in rabbit_vm:
742+
%% {msg_index, MsgIndexETS + MsgIndexProc},
743+
%% it mixes procs and ets,
744+
%% TODO: maybe instead of separating sups and ets
745+
%% I need vm_memory callback that just
746+
%% returns proplist? And rabbit_vm calculates
747+
%% Other as usual by substraction.
748+
queue_vm_ets() ->
749+
{[],
750+
[]}.
751+
752+
dir_base() ->
753+
[rabbit_vhost:msg_store_dir_base()].

deps/rabbit/src/rabbit_core_metrics_gc.erl

Lines changed: 9 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -74,22 +74,15 @@ gc_local_queues() ->
7474
GbSetDown = gb_sets:from_list(QueuesDown),
7575
gc_queue_metrics(GbSet, GbSetDown),
7676
gc_entity(queue_coarse_metrics, GbSet),
77-
Followers = gb_sets:from_list([amqqueue:get_name(Q) || Q <- rabbit_amqqueue:list_local_followers() ]),
78-
gc_leader_data(Followers).
79-
80-
gc_leader_data(Followers) ->
81-
ets:foldl(fun({Id, _, _, _, _}, none) ->
82-
gc_leader_data(Id, queue_coarse_metrics, Followers)
83-
end, none, queue_coarse_metrics).
84-
85-
gc_leader_data(Id, Table, GbSet) ->
86-
case gb_sets:is_member(Id, GbSet) of
87-
true ->
88-
ets:delete(Table, Id),
89-
none;
90-
false ->
91-
none
92-
end.
77+
%% remove coarse metrics for quorum queues without local leader
78+
gc_leader_data().
79+
80+
gc_leader_data() ->
81+
_ = [begin
82+
QName = amqqueue:get_name(Q),
83+
rabbit_core_metrics:delete_queue_coarse_metrics(QName)
84+
end || Q <- rabbit_amqqueue:list_local_followers()],
85+
ok.
9386

9487
gc_global_queues() ->
9588
GbSet = gb_sets:from_list(rabbit_amqqueue:list_names()),

deps/rabbit/src/rabbit_definitions.erl

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1045,16 +1045,11 @@ list_queues() ->
10451045

10461046
queue_definition(Q) ->
10471047
#resource{virtual_host = VHost, name = Name} = amqqueue:get_name(Q),
1048-
Type = case amqqueue:get_type(Q) of
1049-
rabbit_classic_queue -> classic;
1050-
rabbit_quorum_queue -> quorum;
1051-
rabbit_stream_queue -> stream;
1052-
T -> T
1053-
end,
1048+
TypeModule = amqqueue:get_type(Q),
10541049
#{
10551050
<<"vhost">> => VHost,
10561051
<<"name">> => Name,
1057-
<<"type">> => Type,
1052+
<<"type">> => rabbit_registry:lookup_type_name(queue, TypeModule),
10581053
<<"durable">> => amqqueue:is_durable(Q),
10591054
<<"auto_delete">> => amqqueue:is_auto_delete(Q),
10601055
<<"arguments">> => rabbit_misc:amqp_table(amqqueue:get_arguments(Q))

deps/rabbit/src/rabbit_fifo_dlx_worker.erl

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -538,14 +538,8 @@ redeliver0(#pending{delivery = Msg0,
538538
clients_redeliver(Qs, QTypeState) ->
539539
lists:filter(fun(Q) ->
540540
case rabbit_queue_type:module(Q, QTypeState) of
541-
{ok, rabbit_quorum_queue} ->
542-
% If #enqueue{} Raft command does not get applied
543-
% rabbit_fifo_client will resend.
544-
true;
545-
{ok, rabbit_stream_queue} ->
546-
true;
547-
_ ->
548-
false
541+
{ok, TypeModule} -> TypeModule:can_redeliver();
542+
_ -> false
549543
end
550544
end, Qs).
551545

0 commit comments

Comments
 (0)