Skip to content

Commit 19be906

Browse files
committed
rabbit_disk_monitor: Monitor per-queue-type mount points
1 parent d57a1c2 commit 19be906

File tree

1 file changed

+225
-57
lines changed

1 file changed

+225
-57
lines changed

deps/rabbit/src/rabbit_disk_monitor.erl

Lines changed: 225 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -32,19 +32,32 @@
3232

3333
-define(SERVER, ?MODULE).
3434
-define(ETS_NAME, ?MODULE).
35+
-define(MOUNT_POINT_ETS_NAME, rabbit_disk_monitor_per_mount_point).
3536
-define(DEFAULT_MIN_DISK_CHECK_INTERVAL, 100).
3637
-define(DEFAULT_MAX_DISK_CHECK_INTERVAL, 10000).
3738
-define(DEFAULT_DISK_FREE_LIMIT, 50000000).
3839
%% 250MB/s i.e. 250kB/ms
3940
-define(FAST_RATE, (250 * 1000)).
4041

42+
-record(mount_point,
43+
{%% name set in configuration
44+
id :: binary(),
45+
%% minimum bytes available
46+
limit :: non_neg_integer(),
47+
%% detected available disk space in bytes
48+
available = 'NaN' :: non_neg_integer() | 'NaN',
49+
%% set of queue types which should be blocked if the limit is exceeded
50+
queue_types :: sets:set(rabbit_queue_type:queue_type())}).
51+
4152
-record(state, {
42-
%% monitor partition on which this directory resides
53+
%% monitor partition on which the data directory resides
4354
dir,
4455
%% configured limit in bytes
4556
limit,
4657
%% last known free disk space amount in bytes
4758
actual,
59+
%% extra file systems to monitor mapped to the queue types to
60+
mount_points = #{} :: mount_points(),
4861
%% minimum check interval
4962
min_interval,
5063
%% maximum check interval
@@ -67,6 +80,19 @@
6780

6881
-type disk_free_limit() :: integer() | {'absolute', integer()} | string() | {'mem_relative', float() | integer()}.
6982

83+
-type mount_points() :: #{file:filename_all() => #mount_point{}}.
84+
85+
%%----------------------------------------------------------------------------
86+
87+
%% This needs to wait until the recovery phase so that queue types have a
88+
%% chance to register themselves.
89+
-rabbit_boot_step({monitor_mount_points,
90+
[{description, "monitor per-queue-type mount points"},
91+
{mfa, {gen_server, call,
92+
[?MODULE, monitor_mount_points]}},
93+
{requires, recovery},
94+
{enables, routing_ready}]}).
95+
7096
%%----------------------------------------------------------------------------
7197
%% Public API
7298
%%----------------------------------------------------------------------------
@@ -112,12 +138,11 @@ start_link(Args) ->
112138
gen_server:start_link({local, ?SERVER}, ?MODULE, [Args], []).
113139

114140
init([Limit]) ->
115-
Dir = dir(),
116141
{ok, Retries} = application:get_env(rabbit, disk_monitor_failure_retries),
117142
{ok, Interval} = application:get_env(rabbit, disk_monitor_failure_retry_interval),
118143
?ETS_NAME = ets:new(?ETS_NAME, [protected, set, named_table]),
119-
State0 = #state{dir = Dir,
120-
alarmed = false,
144+
?MOUNT_POINT_ETS_NAME = ets:new(?MOUNT_POINT_ETS_NAME, [protected, set, named_table]),
145+
State0 = #state{alarmed = false,
121146
enabled = true,
122147
limit = Limit,
123148
retries = Retries,
@@ -166,6 +191,15 @@ handle_call({set_enabled, _Enabled = false}, _From, State = #state{enabled = fal
166191
?LOG_INFO("Free disk space monitor was already disabled"),
167192
{reply, ok, State#state{enabled = false}};
168193

194+
handle_call(monitor_mount_points, _From, State) ->
195+
case State of
196+
#state{enabled = true} ->
197+
State1 = State#state{mount_points = mount_points()},
198+
{reply, ok, internal_update(State1)};
199+
#state{enabled = false} ->
200+
{reply, ok, State}
201+
end;
202+
169203
handle_call(_Request, _From, State) ->
170204
{noreply, State}.
171205

@@ -205,9 +239,6 @@ safe_ets_lookup(Key, Default) ->
205239
Default
206240
end.
207241

208-
% the partition / drive containing this directory will be monitored
209-
dir() -> rabbit:data_dir().
210-
211242
set_min_check_interval(MinInterval, State) ->
212243
ets:insert(?ETS_NAME, {min_check_interval, MinInterval}),
213244
State#state{min_interval = MinInterval}.
@@ -224,36 +255,94 @@ set_disk_limits(State, Limit0) ->
224255
ets:insert(?ETS_NAME, {disk_free_limit, Limit}),
225256
internal_update(State1).
226257

227-
internal_update(State = #state{limit = Limit,
228-
dir = Dir,
229-
alarmed = Alarmed}) ->
230-
CurrentFree = get_disk_free(Dir),
258+
internal_update(#state{limit = DataDirLimit,
259+
dir = Dir,
260+
mount_points = MountPoints,
261+
alarmed = Alarmed} = State) ->
262+
DiskFree = get_disk_free(State),
263+
DataDirFree = maps:get(Dir, DiskFree, 'NaN'),
231264
%% note: 'NaN' is considered to be less than a number
232-
NewAlarmed = CurrentFree < Limit,
265+
NewAlarmed = DataDirFree < DataDirLimit,
233266
case {Alarmed, NewAlarmed} of
234267
{false, true} ->
235-
emit_update_info("insufficient", CurrentFree, Limit),
268+
emit_update_info("insufficient", DataDirFree, DataDirLimit),
236269
rabbit_alarm:set_alarm({{resource_limit, disk, node()}, []});
237270
{true, false} ->
238-
emit_update_info("sufficient", CurrentFree, Limit),
271+
emit_update_info("sufficient", DataDirFree, DataDirLimit),
239272
rabbit_alarm:clear_alarm({resource_limit, disk, node()});
240273
_ ->
241274
ok
242275
end,
243-
ets:insert(?ETS_NAME, {disk_free, CurrentFree}),
244-
State#state{alarmed = NewAlarmed, actual = CurrentFree}.
245-
246-
-spec get_disk_free(file:filename_all()) ->
247-
AvailableBytes :: non_neg_integer() | 'NaN'.
248-
get_disk_free(Dir) ->
249-
case disksup:get_disk_info(Dir) of
250-
[{D, 0, 0, 0, 0}] when D =:= Dir orelse D =:= "none" ->
251-
'NaN';
252-
[{_MountPoint, _TotalKiB, AvailableKiB, _Capacity}] ->
253-
AvailableKiB * 1024;
254-
_DiskInfo ->
255-
'NaN'
256-
end.
276+
ets:insert(?ETS_NAME, {disk_free, DataDirFree}),
277+
278+
NewMountPoints = maps:map(
279+
fun(Path, MP) ->
280+
Available = maps:get(Path, DiskFree, 'NaN'),
281+
MP#mount_point{available = Available}
282+
end, MountPoints),
283+
ets:insert(
284+
?MOUNT_POINT_ETS_NAME,
285+
[{Id, Available, Limit}
286+
|| _Path := #mount_point{id = Id,
287+
available = Available,
288+
limit = Limit} <- NewMountPoints]),
289+
290+
AlarmedMPs = alarmed_mount_points(MountPoints),
291+
NewAlarmedMPs = alarmed_mount_points(NewMountPoints),
292+
293+
NewlyClearedMPs = sets:subtract(AlarmedMPs, NewAlarmedMPs),
294+
NewlyAlarmedMPs = sets:subtract(NewAlarmedMPs, AlarmedMPs),
295+
296+
lists:foreach(
297+
fun(Path) ->
298+
#mount_point{id = Id,
299+
limit = Limit,
300+
available = Available} = maps:get(Path,
301+
NewMountPoints),
302+
emit_update_info(Id, "insufficient", Available, Limit)
303+
end, lists:sort(sets:to_list(NewlyAlarmedMPs))),
304+
%% TODO: rabbit_alarm:set_alarm/1 for affected queue types
305+
lists:foreach(
306+
fun(Path) ->
307+
#mount_point{id = Id,
308+
limit = Limit,
309+
available = Available} = maps:get(Path,
310+
NewMountPoints),
311+
emit_update_info(Id, "sufficient", Available, Limit)
312+
end, lists:sort(sets:to_list(NewlyClearedMPs))),
313+
%% TODO: rabbit_alarm:clear_alarm/1 for affected queue types
314+
315+
State#state{alarmed = NewAlarmed,
316+
actual = DataDirFree,
317+
mount_points = NewMountPoints}.
318+
319+
emit_update_info(StateStr, CurrentFree, Limit) ->
320+
?LOG_INFO(
321+
"Free disk space is ~ts. Free bytes: ~b. Limit: ~b",
322+
[StateStr, CurrentFree, Limit]).
323+
emit_update_info(MountPoint, StateStr, CurrentFree, Limit) ->
324+
?LOG_INFO(
325+
"Free space of disk '~ts' is ~ts. Free bytes: ~b. Limit: ~b",
326+
[MountPoint, StateStr, CurrentFree, Limit]).
327+
328+
-spec alarmed_mount_points(mount_points()) ->
329+
sets:set(file:filename_all()).
330+
alarmed_mount_points(MountPoints) ->
331+
maps:fold(
332+
fun (Path, #mount_point{available = Available, limit = Limit}, Acc)
333+
when Available < Limit ->
334+
sets:add_element(Path, Acc);
335+
(_Path, _MP, Acc) ->
336+
Acc
337+
end, sets:new([{version, 2}]), MountPoints).
338+
339+
-spec get_disk_free(#state{}) ->
340+
#{file:filename_all() => AvailableB :: non_neg_integer()}.
341+
get_disk_free(#state{dir = DataDir, mount_points = MountPoints}) ->
342+
#{MountPoint => AvailableKiB * 1024 ||
343+
{MountPoint, Total, AvailableKiB, Capacity} <- disksup:get_disk_info(),
344+
{Total, AvailableKiB, Capacity} =/= {0, 0, 0},
345+
MountPoint =:= DataDir orelse is_map_key(MountPoint, MountPoints)}.
257346

258347
interpret_limit({mem_relative, Relative})
259348
when is_number(Relative) ->
@@ -269,11 +358,6 @@ interpret_limit(Absolute) ->
269358
?DEFAULT_DISK_FREE_LIMIT
270359
end.
271360

272-
emit_update_info(StateStr, CurrentFree, Limit) ->
273-
?LOG_INFO(
274-
"Free disk space is ~ts. Free bytes: ~b. Limit: ~b",
275-
[StateStr, CurrentFree, Limit]).
276-
277361
start_timer(State) ->
278362
State#state{timer = erlang:send_after(interval(State), self(), update)}.
279363

@@ -290,30 +374,114 @@ interval(#state{limit = Limit,
290374
IdealInterval = 2 * (Actual - Limit) / ?FAST_RATE,
291375
trunc(erlang:max(MinInterval, erlang:min(MaxInterval, IdealInterval))).
292376

377+
-spec mount_points() ->
378+
#{MountPoint :: file:filename_all() => #mount_point{}}.
379+
mount_points() ->
380+
case application:get_env(rabbit, disk_free_limits) of
381+
{ok, Limits} ->
382+
maps:fold(
383+
fun(Id, {Path, Limit0, QTypes}, Acc) ->
384+
Res = rabbit_resource_monitor_misc:parse_information_unit(
385+
Limit0),
386+
case Res of
387+
{ok, Limit} ->
388+
{Known, Unknown} = resolve_queue_types(QTypes),
389+
case Unknown of
390+
[_ | _] ->
391+
?LOG_WARNING(
392+
"Unknown queue types configured for "
393+
"disk '~ts': ~ts",
394+
[Id, lists:join(", ", Unknown)]),
395+
ok;
396+
_ ->
397+
ok
398+
end,
399+
case Known of
400+
[] ->
401+
?LOG_ERROR("No known queue types "
402+
"configured for disk '~ts'. "
403+
"The disk will not be "
404+
"monitored for free "
405+
"disk space.", [Id]),
406+
Acc;
407+
_ ->
408+
QTs = sets:from_list(Known,
409+
[{version, 2}]),
410+
MP = #mount_point{id = Id,
411+
limit = Limit,
412+
queue_types = QTs},
413+
Acc#{Path => MP}
414+
end;
415+
{error, parse_error} ->
416+
?LOG_ERROR("Unable to parse free disk limit "
417+
"'~ts' for disk '~ts'. The disk will "
418+
"not be monitored for free space.",
419+
[Limit0, Id]),
420+
Acc
421+
end
422+
end, #{}, Limits);
423+
undefined ->
424+
#{}
425+
end.
426+
427+
resolve_queue_types(QTs) ->
428+
resolve_queue_types(QTs, {[], []}).
429+
430+
resolve_queue_types([], Acc) ->
431+
Acc;
432+
resolve_queue_types([QT | Rest], {Known, Unknown}) ->
433+
case rabbit_registry:lookup_type_module(queue, QT) of
434+
{ok, TypeModule} ->
435+
resolve_queue_types(Rest, {[TypeModule | Known], Unknown});
436+
{error, not_found} ->
437+
resolve_queue_types(Rest, {Known, [QT | Unknown]})
438+
end.
439+
293440
enable(#state{retries = 0} = State) ->
294441
?LOG_ERROR("Free disk space monitor failed to start!"),
295442
State;
296-
enable(#state{dir = Dir} = State) ->
297-
enable_handle_disk_free(get_disk_free(Dir), State).
298-
299-
enable_handle_disk_free(DiskFree, State) when is_integer(DiskFree) ->
300-
enable_handle_total_memory(catch vm_memory_monitor:get_total_memory(), DiskFree, State);
301-
enable_handle_disk_free(Error, #state{interval = Interval, retries = Retries} = State) ->
302-
?LOG_WARNING("Free disk space monitor encountered an error "
303-
"(e.g. failed to parse output from OS tools). "
304-
"Retries left: ~b Error:~n~tp",
305-
[Retries, Error]),
306-
erlang:send_after(Interval, self(), try_enable),
307-
State#state{enabled = false}.
308-
309-
enable_handle_total_memory(TotalMemory, DiskFree, #state{limit = Limit} = State) when is_integer(TotalMemory) ->
310-
?LOG_INFO("Enabling free disk space monitoring "
311-
"(disk free space: ~b, total memory: ~b)", [DiskFree, TotalMemory]),
312-
start_timer(set_disk_limits(State, Limit));
313-
enable_handle_total_memory(Error, _DiskFree, #state{interval = Interval, retries = Retries} = State) ->
314-
?LOG_WARNING("Free disk space monitor encountered an error "
315-
"retrieving total memory. "
316-
"Retries left: ~b Error:~n~tp",
317-
[Retries, Error]),
318-
erlang:send_after(Interval, self(), try_enable),
319-
State#state{enabled = false}.
443+
enable(#state{dir = undefined,
444+
interval = Interval,
445+
retries = Retries} = State) ->
446+
case resolve_data_dir() of
447+
{ok, MountPoint} ->
448+
enable(State#state{dir = MountPoint});
449+
{error, Reason} ->
450+
?LOG_WARNING("Free disk space monitor encounter an error "
451+
"resolving the data directory '~ts'. Retries left: "
452+
"~b Error:~n~tp",
453+
[rabbit:data_dir(), Retries, Reason]),
454+
erlang:send_after(Interval, self(), try_enable),
455+
State#state{enabled = false}
456+
end;
457+
enable(#state{dir = Dir,
458+
retries = Retries,
459+
interval = Interval,
460+
limit = Limit} = State) ->
461+
DiskFree = get_disk_free(State),
462+
case vm_memory_monitor:get_total_memory() of
463+
TotalMemory when is_integer(TotalMemory) ->
464+
?LOG_INFO("Enabling free disk space monitoring (data dir free "
465+
"space: ~b, total memory: ~b)",
466+
[maps:get(Dir, DiskFree, unknown), TotalMemory]),
467+
start_timer(set_disk_limits(State, Limit));
468+
unknown ->
469+
?LOG_WARNING("Free disk space monitor could not determine total "
470+
"memory. Retries left: ~b", [Retries]),
471+
erlang:send_after(Interval, self(), try_enable),
472+
State#state{enabled = false}
473+
end.
474+
475+
resolve_data_dir() ->
476+
case disksup:get_disk_info(rabbit:data_dir()) of
477+
[{"none", 0, 0, 0}] ->
478+
{error, disksup_not_available};
479+
[{MountPoint, 0, 0, 0}] ->
480+
{error, {cannot_determine_space, MountPoint}};
481+
[{MountPoint, _TotalKiB, _AvailableKiB, _Capacity}] ->
482+
{ok, MountPoint};
483+
[] ->
484+
{error, no_disk_info};
485+
[_ | _] = Infos ->
486+
{error, {multiple_disks, length(Infos)}}
487+
end.

0 commit comments

Comments
 (0)