Skip to content

Commit 103fd11

Browse files
committed
WIP: Add per-queue-type disk limits
This is an extension of the idea of the free disk space alarm. That option enables blocking publishers when the free disk space on the data dir's disk falls below some threshold. This feature blocks publishers of individual queue types when the disk space taken by the queue type exceeds the configured threshold. This change is incomplete: it only affects QQs and streams and AMQP 0-9-1 so far.
1 parent 20f89b2 commit 103fd11

File tree

10 files changed

+396
-24
lines changed

10 files changed

+396
-24
lines changed

deps/rabbit/priv/schema/rabbit.schema

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1241,6 +1241,26 @@ fun(Conf) ->
12411241
end
12421242
end}.
12431243

1244+
%%
1245+
%% Per-queue-type disk limits
1246+
%% =====================
1247+
%%
1248+
1249+
%% TODO: do relative limits make sense - what would they be relative to? The
1250+
%% full disk size?
1251+
%% {mapping, "quorum_queue_disk_limit.relative", "rabbit.quorum_queue_disk_limit", [
1252+
%% {datatype, float}]}.
1253+
1254+
{mapping, "quorum_queue_disk_limit.absolute", "rabbit.quorum_queue_disk_limit", [
1255+
{datatype, [integer, string]},
1256+
{validators, ["is_supported_information_unit"]}
1257+
]}.
1258+
1259+
{mapping, "stream_queue_disk_limit.absolute", "rabbit.stream_queue_disk_limit", [
1260+
{datatype, [integer, string]},
1261+
{validators, ["is_supported_information_unit"]}
1262+
]}.
1263+
12441264
%%
12451265
%% Clustering
12461266
%% =====================

deps/rabbit/src/rabbit_alarm.erl

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@
4646

4747
-export_type([alarm/0]).
4848
-type local_alarm() :: 'file_descriptor_limit'.
49-
-type resource_alarm_source() :: 'disk' | 'memory'.
49+
-type resource_alarm_source() :: 'disk' | 'memory' | {queue_type_disk, atom()}.
5050
-type resource_alarm() :: {resource_limit, resource_alarm_source(), node()}.
5151
-type alarm() :: local_alarm() | resource_alarm().
5252
-type resource_alert() :: {WasAlarmSetForNode :: boolean(),
@@ -138,6 +138,11 @@ format_as_map({resource_limit, memory, Node}) ->
138138
<<"resource">> => ?MEMORY_RESOURCE,
139139
<<"node">> => Node
140140
};
141+
format_as_map({resource_limit, {queue_type_disk, QType}, Node}) ->
142+
#{
143+
<<"resource">> => todo,
144+
<<"node">> => Node
145+
};
141146
format_as_map({resource_limit, Limit, Node}) ->
142147
#{
143148
<<"resource">> => rabbit_data_coercion:to_binary(Limit),
@@ -291,7 +296,7 @@ maybe_alert(UpdateFun, Node, Source, WasAlertAdded,
291296
StillHasAlerts = lists:any(fun ({_Node, NodeAlerts}) -> lists:member(Source, NodeAlerts) end, dict:to_list(AN1)),
292297
case StillHasAlerts of
293298
true -> ok;
294-
false -> rabbit_log:warning("~ts resource limit alarm cleared across the cluster", [Source])
299+
false -> rabbit_log:warning("~tp resource limit alarm cleared across the cluster", [Source])
295300
end,
296301
Alert = {WasAlertAdded, StillHasAlerts, Node},
297302
case node() of
@@ -326,9 +331,11 @@ internal_register(Pid, {M, F, A} = AlertMFA,
326331
NewAlertees = dict:store(Pid, AlertMFA, Alertees),
327332
State#alarms{alertees = NewAlertees}.
328333

334+
%% TODO: handle formatting of resources in these:
335+
329336
handle_set_resource_alarm(Source, Node, State) ->
330337
rabbit_log:warning(
331-
"~ts resource limit alarm set on node ~tp.~n~n"
338+
"~tp resource limit alarm set on node ~tp.~n~n"
332339
"**********************************************************~n"
333340
"*** Publishers will be blocked until this alarm clears ***~n"
334341
"**********************************************************~n",
@@ -347,7 +354,7 @@ handle_set_alarm(Alarm, State) ->
347354
{ok, State}.
348355

349356
handle_clear_resource_alarm(Source, Node, State) ->
350-
rabbit_log:warning("~ts resource limit alarm cleared on node ~tp",
357+
rabbit_log:warning("~tp resource limit alarm cleared on node ~tp",
351358
[Source, Node]),
352359
{ok, maybe_alert(fun dict_unappend/3, Node, Source, false, State)}.
353360

deps/rabbit/src/rabbit_channel.erl

Lines changed: 26 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -163,6 +163,7 @@
163163
delivery_flow :: flow | noflow,
164164
interceptor_state,
165165
queue_states,
166+
queue_types_published :: sets:set(QType :: atom()),
166167
tick_timer,
167168
publishing_mode = false :: boolean()
168169
}).
@@ -527,7 +528,8 @@ init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost,
527528
reply_consumer = none,
528529
delivery_flow = Flow,
529530
interceptor_state = undefined,
530-
queue_states = rabbit_queue_type:init()
531+
queue_states = rabbit_queue_type:init(),
532+
queue_types_published = sets:new([{version, 2}])
531533
},
532534
State1 = State#ch{
533535
interceptor_state = rabbit_channel_interceptor:init(State)},
@@ -2057,9 +2059,10 @@ deliver_to_queues(XName,
20572059
ok = process_routing_mandatory(Mandatory, RoutedToQueues, Message, XName, State0),
20582060
MsgSeqNo = maps:get(correlation, Options, undefined),
20592061
State1 = process_routing_confirm(MsgSeqNo, QueueNames, XName, State0),
2062+
State2 = notify_published_queue_types(Qs, State1),
20602063
%% Actions must be processed after registering confirms as actions may
20612064
%% contain rejections of publishes
2062-
State = handle_queue_actions(Actions, State1#ch{queue_states = QueueStates}),
2065+
State = handle_queue_actions(Actions, State2#ch{queue_states = QueueStates}),
20632066
case rabbit_event:stats_level(State, #ch.stats_timer) of
20642067
fine ->
20652068
?INCR_STATS(exchange_stats, XName, 1, publish),
@@ -2082,6 +2085,27 @@ deliver_to_queues(XName,
20822085
[rabbit_misc:rs(Resource)])
20832086
end.
20842087

2088+
notify_published_queue_types(Qs,
2089+
#ch{cfg = #conf{reader_pid = ReaderPid},
2090+
queue_types_published = QTypes0} = State0) ->
2091+
QTypes = lists:foldl(
2092+
fun(Q0, Acc) ->
2093+
Q = case Q0 of
2094+
{Q1, _RouteInfo} -> Q1;
2095+
_ -> Q0
2096+
end,
2097+
QType = amqqueue:get_type(Q),
2098+
case sets:is_element(QType, Acc) of
2099+
true ->
2100+
Acc;
2101+
false ->
2102+
ReaderPid ! {channel_published_to_queue_type,
2103+
self(), QType},
2104+
sets:add_element(QType, Acc)
2105+
end
2106+
end, QTypes0, Qs),
2107+
State0#ch{queue_types_published = QTypes}.
2108+
20852109
process_routing_mandatory(_Mandatory = true,
20862110
_RoutedToQs = [],
20872111
Msg,
Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
%% This Source Code Form is subject to the terms of the Mozilla Public
2+
%% License, v. 2.0. If a copy of the MPL was not distributed with this
3+
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
4+
%%
5+
%% Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.
6+
%%
7+
8+
-module(rabbit_disk_usage).
9+
10+
%% Functions for calculating disk usage of a given directory.
11+
12+
-include_lib("kernel/include/file.hrl").
13+
14+
-export([scan/1]).
15+
16+
%% @doc Calculates the disk usage in bytes of the given directory.
17+
%%
18+
%% On especially large directories this can be an expensive operation since
19+
%% each sub-directory is scanned recursively and each file's metadata must be
20+
%% read.
21+
-spec scan(Dir) -> {ok, Size} | {error, Error} when
22+
Dir :: filename:filename_all(),
23+
Size :: non_neg_integer(),
24+
Error :: not_directory | file:posix() | badarg.
25+
scan(Dir) ->
26+
case file:read_file_info(Dir) of
27+
{ok, #file_info{type = directory, size = S}} ->
28+
{ok, Gatherer} = gatherer:start_link(),
29+
scan_directory(Dir, Gatherer),
30+
Size = sum(Gatherer, S),
31+
gatherer:stop(Gatherer),
32+
{ok, Size};
33+
{ok, #file_info{}} ->
34+
{error, not_directory};
35+
{error, _} = Err ->
36+
Err
37+
end.
38+
39+
scan_directory(Dir, Gatherer) ->
40+
gatherer:fork(Gatherer),
41+
worker_pool:submit_async(fun() -> scan_directory0(Dir, Gatherer) end).
42+
43+
scan_directory0(Dir, Gatherer) ->
44+
link(Gatherer),
45+
Size = case file:list_dir_all(Dir) of
46+
{ok, Entries} ->
47+
lists:foldl(
48+
fun(Entry, Acc) ->
49+
Path = filename:join(Dir, Entry),
50+
case file:read_file_info(Path) of
51+
{ok, #file_info{type = directory,
52+
size = S}} ->
53+
scan_directory(Path, Gatherer),
54+
Acc + S;
55+
{ok, #file_info{size = S}} ->
56+
Acc + S;
57+
_ ->
58+
Acc
59+
end
60+
end, 0, Entries);
61+
_ ->
62+
0
63+
end,
64+
gatherer:in(Gatherer, Size),
65+
gatherer:finish(Gatherer),
66+
unlink(Gatherer),
67+
ok.
68+
69+
-spec sum(pid(), non_neg_integer()) -> non_neg_integer().
70+
sum(Gatherer, Size) ->
71+
case gatherer:out(Gatherer) of
72+
empty ->
73+
Size;
74+
{value, S} ->
75+
sum(Gatherer, Size + S)
76+
end.

deps/rabbit/src/rabbit_queue_type.erl

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -304,6 +304,16 @@
304304

305305
-callback queue_vm_ets() -> {StatsKeys :: [atom()], ETSNames:: [[atom()]]}.
306306

307+
%% The disk usage limit for the queue type, if any.
308+
-callback disk_limit() -> rabbit_queue_type_disk_monitor:disk_usage_limit_spec() | undefined.
309+
%% Calculate the disk space in bytes of the queue type.
310+
%% This callback is optional but must be implemented if `disk_limit/0' is
311+
%% defined.
312+
-callback disk_footprint() -> {ok, Bytes :: non_neg_integer()} | {error, file:posix()}.
313+
314+
-optional_callbacks([disk_footprint/0,
315+
disk_limit/0]).
316+
307317
-spec discover(binary() | atom()) -> queue_type().
308318
discover(<<"undefined">>) ->
309319
fallback();
Lines changed: 159 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,159 @@
1+
%% This Source Code Form is subject to the terms of the Mozilla Public
2+
%% License, v. 2.0. If a copy of the MPL was not distributed with this
3+
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
4+
%%
5+
%% Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.
6+
%%
7+
8+
-module(rabbit_queue_type_disk_monitor).
9+
10+
%% A server for alarming on high disk usage per queue type.
11+
%%
12+
%% The server scans periodically and checks each queue type against its limit
13+
%% using the `disk_footprint/0' and `disk_limit/0' callbacks in
14+
%% `rabbit_queue_type'. Typically this callback uses `rabbit_disk_usage:scan/1'.
15+
%%
16+
%% Also see `rabbit_disk_monitoring' which periodically checks the total space
17+
%% taken on the mounted disk containing `rabbit:data_dir/0'.
18+
19+
-include_lib("kernel/include/logger.hrl").
20+
21+
-behaviour(gen_server).
22+
23+
-export([start_link/0]).
24+
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
25+
terminate/2, code_change/3]).
26+
27+
-record(limit, {queue_type :: queue_type(),
28+
type_module :: module(),
29+
limit :: Bytes :: non_neg_integer()}).
30+
31+
-record(state, {limits :: [#limit{}],
32+
alarmed = alarmed() :: alarmed(),
33+
timer :: timer:tref() | undefined}).
34+
35+
-type queue_type() :: atom().
36+
-type alarmed() :: sets:set(queue_type()).
37+
38+
-type disk_usage_limit_spec() :: %% A total number of bytes
39+
{absolute, non_neg_integer()} |
40+
%% %% A fraction of the disk's capacity.
41+
%% {relative, float()} |
42+
%% A string which will be parsed and
43+
%% interpreted as an absolute limit.
44+
string().
45+
46+
%%----------------------------------------------------------------------------
47+
48+
start_link() ->
49+
gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
50+
51+
init([]) ->
52+
Limits = lists:foldl(
53+
fun({Type, TypeModule}, Acc) ->
54+
case get_limit(Type, TypeModule) of
55+
{ok, Limit} ->
56+
[#limit{queue_type = Type,
57+
type_module = TypeModule,
58+
limit = Limit} | Acc];
59+
error ->
60+
Acc
61+
end
62+
end, [], rabbit_registry:lookup_all(queue)),
63+
Timer = erlang:send_after(5_000, self(), scan),
64+
{ok, #state{limits = Limits, timer = Timer}}.
65+
66+
handle_call(_Request, _From, State) ->
67+
{noreply, State}.
68+
69+
handle_cast(_Request, State) ->
70+
{noreply, State}.
71+
72+
handle_info(scan, #state{alarmed = Alarmed0} = State) ->
73+
Alarmed = lists:foldl(fun scan/2, alarmed(), State#state.limits),
74+
ok = handle_alarmed(Alarmed0, Alarmed),
75+
Timer = erlang:send_after(5_000, self(), scan),
76+
{noreply, State#state{alarmed = Alarmed, timer = Timer}};
77+
handle_info(Info, State) ->
78+
?LOG_DEBUG("~tp unhandled msg: ~tp", [?MODULE, Info]),
79+
{noreply, State}.
80+
81+
terminate(_Reason, _State) ->
82+
ok.
83+
84+
code_change(_OldVsn, State, _Extra) ->
85+
{ok, State}.
86+
87+
%%----------------------------------------------------------------------------
88+
89+
alarmed() -> sets:new([{version, 2}]).
90+
91+
-spec get_limit(atom(), module()) -> {ok, disk_usage_limit_spec()} | error.
92+
get_limit(QType, QTypeModule) ->
93+
try QTypeModule:disk_limit() of
94+
undefined ->
95+
error;
96+
{absolute, Abs} when is_integer(Abs) andalso Abs >= 0 ->
97+
{ok, Abs};
98+
%% {relative, Rel} when is_float(Rel) andalso Rel >= 0.0 ->
99+
%% TODO: to convert to abs we need to cache the disk capacity for
100+
%% the first `relative' spec we see.
101+
%% Do we even need relative? Should it be proportional to the disk
102+
%% capacity or to the other components?
103+
%% {ok, {relative, Rel}};
104+
String when is_list(String) ->
105+
case rabbit_resource_monitor_misc:parse_information_unit(String) of
106+
{ok, Bytes} ->
107+
{ok, Bytes};
108+
{error, parse_error} ->
109+
?LOG_WARNING("Unable to parse disk limit ~tp for queue "
110+
"type '~ts'", [String, QType]),
111+
error
112+
end
113+
catch
114+
error:undef ->
115+
error
116+
end.
117+
118+
-spec scan(Limit :: #limit{}, alarmed()) -> alarmed().
119+
scan(#limit{queue_type = QType,
120+
type_module = QTypeModule,
121+
limit = Limit}, Alarmed) ->
122+
%% NOTE: `disk_footprint/0' is an optional callback but it should always
123+
%% be implemented if the queue type implements `disk_limit/0'.
124+
case QTypeModule:disk_footprint() of
125+
{ok, Bytes} ->
126+
%% TODO: remove this printf debugging...
127+
?LOG_INFO("Measured queue type '~ts' at ~p bytes (limit ~p)", [QType, Bytes, Limit]),
128+
case Bytes >= Limit of
129+
true -> sets:add_element(QTypeModule, Alarmed);
130+
false -> Alarmed
131+
end;
132+
{error, enoent} ->
133+
Alarmed;
134+
{error, Error} ->
135+
?LOG_WARNING("Failed to calculate disk usage of queue type '~ts': "
136+
"~tp", [QType, Error]),
137+
Alarmed
138+
end.
139+
140+
-spec handle_alarmed(Before :: alarmed(), After :: alarmed()) -> ok.
141+
handle_alarmed(NoChange, NoChange) ->
142+
ok;
143+
handle_alarmed(Before, After) ->
144+
Added = sets:subtract(After, Before),
145+
?LOG_WARNING("Newly alarmed: ~p", [Added]),
146+
ok = sets:fold(
147+
fun(QType, ok) ->
148+
rabbit_alarm:set_alarm({alarm(QType), []})
149+
end, ok, Added),
150+
Removed = sets:subtract(Before, After),
151+
?LOG_WARNING("Stopped alarming: ~p", [Removed]),
152+
ok = sets:fold(
153+
fun(QType, ok) ->
154+
rabbit_alarm:clear_alarm(alarm(QType))
155+
end, ok, Removed),
156+
ok.
157+
158+
alarm(QType) ->
159+
{resource_limit, {queue_type_disk, QType}, node()}.

0 commit comments

Comments
 (0)