Skip to content

Commit 8d36b89

Browse files
committed
Make queue commands generic - i.e. make them work with any queue type
1 parent d685875 commit 8d36b89

24 files changed

+344
-85
lines changed

deps/rabbit/src/rabbit_classic_queue.erl

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
-module(rabbit_classic_queue).
22
-behaviour(rabbit_queue_type).
3+
-behaviour(rabbit_queue_commands).
34
-behaviour(rabbit_policy_validator).
45

56
-include("amqqueue.hrl").
@@ -74,6 +75,15 @@
7475

7576
-export([validate_policy/1]).
7677

78+
%% commands
79+
-export([add_member/5,
80+
list_with_local_promotable/0,
81+
delete_member/3,
82+
peek/2,
83+
status/1,
84+
reclaim_memory/1,
85+
shrink_all/1]).
86+
7787
-rabbit_boot_step(
7888
{rabbit_classic_queue_type,
7989
[{description, "Classic queue: queue type"},
@@ -730,3 +740,24 @@ queue_vm_stats_sups() ->
730740
%% Other as usual by substraction.
731741
queue_vm_ets() ->
732742
{[], []}.
743+
744+
add_member(_VHost, _Name, _Node, _Membership, _Timeout) ->
745+
{error, classic_queue_not_supported}.
746+
747+
list_with_local_promotable() ->
748+
{error, classic_queue_not_supported}.
749+
750+
delete_member(_VHost, _Name, _Node) ->
751+
{error, classic_queue_not_supported}.
752+
753+
peek(_Pos, _QName) ->
754+
{error, classic_queue_not_supported}.
755+
756+
status(_QName) ->
757+
{error, classic_queue_not_supported}.
758+
759+
reclaim_memory(_QName) ->
760+
{error, classic_queue_not_supported}.
761+
762+
shrink_all(_Node) ->
763+
{error, classic_queue_not_supported}.

deps/rabbit/src/rabbit_observer_cli_quorum_queues.erl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -138,7 +138,7 @@ sheet_body(PrevState) ->
138138
empty_row(Name);
139139
_ ->
140140
QQCounters = maps:get({QName, node()}, ra_counters:overview()),
141-
{ok, InternalName} = rabbit_queue_type_util:qname_to_internal_name(#resource{virtual_host = Vhost, name= Name}),
141+
{ok, InternalName} = rabbit_queue_type_util:qname_to_internal_name(rabbit_misc:queue_resource(Vhost, Name)),
142142
[{_, CT, SnapIdx, LA, CI, LW, CL}] = ets:lookup(ra_metrics, R),
143143
[
144144
Pid,
Lines changed: 146 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,146 @@
1+
-module(rabbit_queue_commands).
2+
3+
-export([add_member/5,
4+
list_with_local_promotable/1,
5+
delete_member/3,
6+
grow/5,
7+
peek/2,
8+
status/1,
9+
reclaim_memory/2,
10+
shrink_all/2]).
11+
12+
-include_lib("rabbit_common/include/resource.hrl").
13+
14+
%% TODO: membership is a subset of ra_membership() with 'unknown' removed.
15+
-type membership() :: voter | non_voter | promotable.
16+
17+
-callback add_member(VHost :: rabbit_types:vhost(), Name :: resource_name(), Node :: node(), Membership :: membership(), Timeout :: timeout()) -> ok | {error, term()}.
18+
19+
-callback list_with_local_promotable() -> [amqqueue:amqqueue()] | {error, term()}.
20+
21+
-callback delete_member(VHost :: rabbit_types:vhost(), Name :: resource_name(), Node :: node()) -> ok | {error, term()}.
22+
23+
-callback peek(Pos :: non_neg_integer(), QName :: rabbit_amqqueue:name()) -> {ok, [{binary(), term()}]} | {error, term()}.
24+
25+
-callback status(QName :: rabbit_amqqueue:name()) -> [[{binary(), term()}]] | {error, term()}.
26+
27+
-callback reclaim_memory(QName :: rabbit_amqqueue:name()) -> ok | {error, term()}.
28+
29+
-callback shrink_all(Node :: node()) -> [{rabbit_amqqueue:name(),
30+
{ok, pos_integer()} | {error, pos_integer(), term()}}] | {error, term()}.
31+
32+
-spec add_member(rabbit_types:vhost(), resource_name(), node(), membership(), timeout()) -> ok | {error, term()}.
33+
add_member(VHost, Name, Node, Membership, Timeout) ->
34+
case rabbit_amqqueue:lookup(rabbit_misc:queue_resource(VHost, Name)) of
35+
{ok, Q} ->
36+
Type = amqqueue:get_type(Q),
37+
Type:add_member(VHost, Name, Node, Membership, Timeout);
38+
{error, not_found} = E ->
39+
E
40+
end.
41+
42+
-spec list_with_local_promotable(atom() | binary()) -> [amqqueue:amqqueue()] | {error, term()}.
43+
list_with_local_promotable(TypeDescriptor) ->
44+
case rabbit_queue_type:lookup(TypeDescriptor) of
45+
{ok, Type} ->
46+
{ok, Type:list_with_local_promotable()};
47+
{error, not_found} ->
48+
{error, {unknown_queue_type, TypeDescriptor}}
49+
end.
50+
51+
-spec delete_member(rabbit_types:vhost(), resource_name(), node()) -> ok | {error, term()}.
52+
delete_member(VHost, Name, Node) ->
53+
case rabbit_amqqueue:lookup(rabbit_misc:queue_resource(VHost, Name)) of
54+
{ok, Q} ->
55+
Type = amqqueue:get_type(Q),
56+
Type:delete_member(VHost, Name, Node);
57+
{error, not_found} = E ->
58+
E
59+
end.
60+
61+
-spec grow(node(), binary(), binary(), all | even, membership()) ->
62+
[{rabbit_amqqueue:name(),
63+
{ok, pos_integer()} | {error, pos_integer(), term()}}].
64+
grow(Node, VhostSpec, QueueSpec, Strategy, Membership) ->
65+
Running = rabbit_nodes:list_running(),
66+
[begin
67+
Size = length(amqqueue:get_nodes(Q)),
68+
QName = amqqueue:get_name(Q),
69+
rabbit_log:info("~ts: adding a new member (replica) on node ~w",
70+
[rabbit_misc:rs(QName), Node]),
71+
Type = amqqueue:get_type(Q),
72+
case Type:add_member(Q, Node, Membership) of
73+
ok ->
74+
{QName, {ok, Size + 1}};
75+
{error, Err} ->
76+
rabbit_log:warning(
77+
"~ts: failed to add member (replica) on node ~w, error: ~w",
78+
[rabbit_misc:rs(QName), Node, Err]),
79+
{QName, {error, Size, Err}}
80+
end
81+
end
82+
|| Q <- rabbit_amqqueue:list(),
83+
rabbit_queue_type:is_replicable(Q),
84+
%% don't add a member if there is already one on the node
85+
not lists:member(Node, amqqueue:get_nodes(Q)),
86+
%% node needs to be running
87+
lists:member(Node, Running),
88+
matches_strategy(Strategy, amqqueue:get_nodes(Q)),
89+
is_match(amqqueue:get_vhost(Q), VhostSpec) andalso
90+
is_match(get_resource_name(amqqueue:get_name(Q)), QueueSpec) ].
91+
92+
-spec peek(Pos :: non_neg_integer(), QName :: rabbit_amqqueue:name()) -> {ok, [{binary(), term()}]} | {error, term()}.
93+
peek(Pos, QName) ->
94+
case rabbit_amqqueue:lookup(QName) of
95+
{ok, Q} ->
96+
Type = amqqueue:get_type(Q),
97+
Type:peek(Pos, QName);
98+
{error, not_found} = E ->
99+
E
100+
end.
101+
102+
-spec status(QName :: rabbit_amqqueue:name()) -> [[{binary(), term()}]] | {error, term()}.
103+
status(QName) ->
104+
case rabbit_amqqueue:lookup(QName) of
105+
{ok, Q} ->
106+
Type = amqqueue:get_type(Q),
107+
Type:status(QName);
108+
{error, not_found} = E ->
109+
E
110+
end.
111+
112+
-spec reclaim_memory(rabbit_types:vhost(), Name :: rabbit_misc:resource_name()) -> ok | {error, term()}.
113+
reclaim_memory(VHost, QueueName) ->
114+
QName = #resource{virtual_host = VHost, name = QueueName, kind = queue},
115+
case rabbit_amqqueue:lookup(QName) of
116+
{ok, Q} ->
117+
Type = amqqueue:get_type(Q),
118+
Type:reclaim_memory(QName);
119+
{error, not_found} = E ->
120+
E
121+
end.
122+
123+
-spec shrink_all(Node :: node(), TypeDescriptor :: rabbit_registry:type_descriptor()) ->
124+
{ok, pos_integer()} |
125+
{error, pos_integer(), term()} |
126+
{error, {unknown_queue_type, rabbit_registry:type_descriptor()}} |
127+
{error, atom()}. %% to cover not_supported
128+
shrink_all(Node, <<"all">>) ->
129+
lists:flatten([Type:shrink_all(Node) || Type <- rabbit_queue_type:list_replicable()]);
130+
shrink_all(Node, TypeDescriptor) ->
131+
case rabbit_queue_type:lookup(TypeDescriptor) of
132+
{ok, Type} ->
133+
{ok, Type:shrink_all(Node)};
134+
{error, not_found} ->
135+
{error, {unknown_queue_type, TypeDescriptor}}
136+
end.
137+
138+
matches_strategy(all, _) -> true;
139+
matches_strategy(even, Members) ->
140+
length(Members) rem 2 == 0.
141+
142+
is_match(Subj, E) ->
143+
nomatch /= re:run(Subj, E).
144+
145+
get_resource_name(#resource{name = Name}) ->
146+
Name.

deps/rabbit/src/rabbit_queue_type.erl

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
init/0,
2020
close/1,
2121
discover/1,
22+
lookup/1,
2223
short_alias_of/1,
2324
default/0,
2425
default_alias/0,
@@ -64,6 +65,7 @@
6465
can_redeliver/2,
6566
rebalance_module/1,
6667
is_replicable/1,
68+
list_replicable/0,
6769
stop/1,
6870
list_with_minimum_quorum/0,
6971
drain/1,
@@ -313,6 +315,14 @@ discover(TypeDescriptor) ->
313315
{ok, TypeModule} = rabbit_registry:lookup_type_module(queue, TypeDescriptor),
314316
TypeModule.
315317

318+
-spec lookup(binary() | atom()) -> {ok, queue_type()} | {error, not_found}.
319+
lookup(<<"undefined">>) ->
320+
fallback();
321+
lookup(undefined) ->
322+
fallback();
323+
lookup(TypeDescriptor) ->
324+
rabbit_registry:lookup_type_module(queue, TypeDescriptor).
325+
316326
-spec short_alias_of(TypeDescriptor) -> Ret when
317327
TypeDescriptor :: {utf8, binary()} | atom() | binary(),
318328
Ret :: binary() | undefined.
@@ -906,12 +916,18 @@ rebalance_module(Q) ->
906916
Capabilities = TypeModule:capabilities(),
907917
maps:get(rebalance_module, Capabilities, undefined).
908918

909-
-spec is_replicable(amqqueue:amqqueue()) -> undefine | module().
910-
is_replicable(Q) ->
919+
-spec is_replicable(amqqueue:amqqueue() | queue_type()) -> false | module().
920+
is_replicable(Q) when ?is_amqqueue(Q) ->
911921
TypeModule = amqqueue:get_type(Q),
922+
is_replicable(TypeModule);
923+
is_replicable(TypeModule) when is_atom(TypeModule) ->
912924
Capabilities = TypeModule:capabilities(),
913925
maps:get(is_replicable, Capabilities, false).
914926

927+
list_replicable() ->
928+
_ = [TypeModule ||
929+
{_Type, TypeModule} <- rabbit_registry:lookup_all(queue), is_replicable(TypeModule)].
930+
915931
-spec stop(rabbit_types:vhost()) -> ok.
916932
stop(VHost) ->
917933
%% original rabbit_amqqueue:stop doesn't do any catches or try after

deps/rabbit/src/rabbit_quorum_queue.erl

Lines changed: 7 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
-feature(maybe_expr, enable).
1010

1111
-behaviour(rabbit_queue_type).
12+
-behaviour(rabbit_queue_commands).
1213
-behaviour(rabbit_policy_validator).
1314
-behaviour(rabbit_policy_merge_strategy).
1415

@@ -31,7 +32,7 @@
3132
-export([purge/1]).
3233
-export([deliver/3]).
3334
-export([dead_letter_publish/5]).
34-
-export([cluster_state/1, status/2]).
35+
-export([cluster_state/1, status/1]).
3536
-export([update_consumer_handler/8, update_consumer/9]).
3637
-export([cancel_consumer_handler/2, cancel_consumer/3]).
3738
-export([become_leader/2, handle_tick/3, spawn_deleter/1]).
@@ -55,14 +56,13 @@
5556
-export([transfer_leadership/2, get_replicas/1, queue_length/1]).
5657
-export([list_with_minimum_quorum/0,
5758
list_with_local_promotable/0,
58-
list_with_local_promotable_for_cli/0,
5959
filter_quorum_critical/3,
6060
all_replica_states/0]).
6161
-export([capabilities/0]).
6262
-export([repair_amqqueue_nodes/1,
6363
repair_amqqueue_nodes/2
6464
]).
65-
-export([reclaim_memory/2,
65+
-export([reclaim_memory/1,
6666
wal_force_roll_over/1]).
6767
-export([notify_decorators/1,
6868
notify_decorators/3,
@@ -501,11 +501,6 @@ list_with_local_promotable() ->
501501
#{node() := ReplicaStates} = get_replica_states([node()]),
502502
filter_promotable(Queues, ReplicaStates).
503503

504-
-spec list_with_local_promotable_for_cli() -> [#{binary() => any()}].
505-
list_with_local_promotable_for_cli() ->
506-
Qs = list_with_local_promotable(),
507-
lists:map(fun amqqueue:to_printable/1, Qs).
508-
509504
-spec get_replica_states([node()]) -> #{node() => replica_states()}.
510505
get_replica_states(Nodes) ->
511506
maps:from_list(
@@ -1246,11 +1241,10 @@ key_metrics_rpc(ServerId) ->
12461241
Metrics = ra:key_metrics(ServerId),
12471242
Metrics#{machine_version => rabbit_fifo:version()}.
12481243

1249-
-spec status(rabbit_types:vhost(), Name :: rabbit_misc:resource_name()) ->
1244+
-spec status(rabbit_types:r(queue)) ->
12501245
[[{binary(), term()}]] | {error, term()}.
1251-
status(Vhost, QueueName) ->
1246+
status(QName) ->
12521247
%% Handle not found queues
1253-
QName = #resource{virtual_host = Vhost, name = QueueName, kind = queue},
12541248
case rabbit_amqqueue:lookup(QName) of
12551249
{ok, Q} when ?amqqueue_is_classic(Q) ->
12561250
{error, classic_queue_not_supported};
@@ -1618,9 +1612,8 @@ matches_strategy(even, Members) ->
16181612
is_match(Subj, E) ->
16191613
nomatch /= re:run(Subj, E).
16201614

1621-
-spec reclaim_memory(rabbit_types:vhost(), Name :: rabbit_misc:resource_name()) -> ok | {error, term()}.
1622-
reclaim_memory(Vhost, QueueName) ->
1623-
QName = #resource{virtual_host = Vhost, name = QueueName, kind = queue},
1615+
-spec reclaim_memory(rabbit_amqqueue:name()) -> ok | {error, term()}.
1616+
reclaim_memory(QName) ->
16241617
case rabbit_amqqueue:lookup(QName) of
16251618
{ok, Q} when ?amqqueue_is_classic(Q) ->
16261619
{error, classic_queue_not_supported};

deps/rabbit/src/rabbit_stream_queue.erl

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,9 @@
6868
queue_vm_stats_sups/0,
6969
queue_vm_ets/0]).
7070

71+
%% queues commands
72+
-export([shrink_all/1]).
73+
7174
-include_lib("rabbit_common/include/rabbit.hrl").
7275
-include("amqqueue.hrl").
7376

@@ -1471,3 +1474,6 @@ queue_vm_stats_sups() ->
14711474
queue_vm_ets() ->
14721475
{[],
14731476
[]}.
1477+
1478+
shrink_all(Node) ->
1479+
delete_all_replicas(Node).

deps/rabbit/src/rabbit_upgrade_preparation.erl

Lines changed: 8 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
-module(rabbit_upgrade_preparation).
99

1010
-export([await_online_quorum_plus_one/1,
11-
list_with_minimum_quorum_for_cli/0]).
11+
with_minimum_quorum/0]).
1212

1313
%%
1414
%% API
@@ -56,8 +56,8 @@ endangered_critical_components() ->
5656
do_await_safe_online_quorum(0) ->
5757
false;
5858
do_await_safe_online_quorum(IterationsLeft) ->
59-
EndangeredQueues = rabbit_queue_type:list_with_minimum_quorum(),
60-
case EndangeredQueues =:= [] andalso endangered_critical_components() =:= [] of
59+
{EndangeredQueues, EndangeredCriticalComponents} = with_minimum_quorum(),
60+
case EndangeredQueues =:= [] andalso EndangeredCriticalComponents =:= [] of
6161
true -> true;
6262
false ->
6363
case IterationsLeft rem ?LOGGING_FREQUENCY of
@@ -67,7 +67,7 @@ do_await_safe_online_quorum(IterationsLeft) ->
6767
N -> rabbit_log:info("Waiting for ~p queues and streams to have quorum+1 replicas online. "
6868
"You can list them with `rabbitmq-diagnostics check_if_node_is_quorum_critical`", [N])
6969
end,
70-
case endangered_critical_components() of
70+
case EndangeredCriticalComponents of
7171
[] -> ok;
7272
_ -> rabbit_log:info("Waiting for the following critical components to have quorum+1 replicas online: ~p.",
7373
[endangered_critical_components()])
@@ -79,13 +79,8 @@ do_await_safe_online_quorum(IterationsLeft) ->
7979
do_await_safe_online_quorum(IterationsLeft - 1)
8080
end.
8181

82-
-spec list_with_minimum_quorum_for_cli() -> [#{binary() => term()}].
83-
list_with_minimum_quorum_for_cli() ->
82+
-spec with_minimum_quorum() -> {[amqqueue:amqqueue()], [atom()]}.
83+
with_minimum_quorum() ->
8484
EndangeredQueues = rabbit_queue_type:list_with_minimum_quorum(),
85-
[amqqueue:to_printable(Q) || Q <- EndangeredQueues] ++
86-
[#{
87-
<<"readable_name">> => C,
88-
<<"name">> => C,
89-
<<"virtual_host">> => <<"(not applicable)">>,
90-
<<"type">> => process
91-
} || C <- endangered_critical_components()].
85+
EndangeredCriticalComponents = endangered_critical_components(),
86+
{EndangeredQueues, EndangeredCriticalComponents}.

0 commit comments

Comments
 (0)