Skip to content

Commit e1ac9d5

Browse files
committed
Set per-queue-type disk alarms for configured mounts
This introduces a new variant of `rabbit_alarm:resource_alarm_source()`: `{disk, QueueType}` which triggers when the configured mount for queue type(s) fall under their limit of available space.
1 parent 23ee1d5 commit e1ac9d5

File tree

3 files changed

+46
-6
lines changed

3 files changed

+46
-6
lines changed

deps/rabbit/src/rabbit_alarm.erl

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,10 @@
4949

5050
-export_type([alarm/0]).
5151
-type local_alarm() :: 'file_descriptor_limit'.
52-
-type resource_alarm_source() :: 'disk' | 'memory'.
52+
-type resource_alarm_source() ::
53+
memory
54+
| disk
55+
| {disk, rabbit_queue_type:queue_type()}.
5356
-type resource_alarm() :: {resource_limit, resource_alarm_source(), node()}.
5457
-type alarm() :: local_alarm() | resource_alarm().
5558
-type resource_alert() :: {WasAlarmSetForNode :: boolean(),
@@ -128,6 +131,8 @@ is_local({{resource_limit, _Resource, Node}, _}) when Node =/= node() -> false.
128131
-spec format_resource_alarm_source(resource_alarm_source()) -> iodata().
129132
format_resource_alarm_source(disk) ->
130133
?DISK_SPACE_RESOURCE;
134+
format_resource_alarm_source({disk, QueueType}) ->
135+
io_lib:format("disk for queue type '~ts'", [QueueType]);
131136
format_resource_alarm_source(memory) ->
132137
?MEMORY_RESOURCE;
133138
format_resource_alarm_source(Unknown) ->

deps/rabbit/src/rabbit_disk_monitor.erl

Lines changed: 26 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -308,10 +308,14 @@ internal_update(#state{limit = DataDirLimit,
308308
ets:insert(?MOUNT_ETS_NAME, [M || _Path := M <- NewMounts]),
309309

310310
AlarmedMs = alarmed_mounts(Mounts),
311+
AlarmedQTs = alarmed_queue_types(Mounts),
311312
NewAlarmedMs = alarmed_mounts(NewMounts),
313+
NewAlarmedQTs = alarmed_queue_types(NewMounts),
312314

313315
NewlyClearedMs = sets:subtract(AlarmedMs, NewAlarmedMs),
316+
NewlyClearedQTs = sets:subtract(AlarmedQTs, NewAlarmedQTs),
314317
NewlyAlarmedMs = sets:subtract(NewAlarmedMs, AlarmedMs),
318+
NewlyAlarmedQTs = sets:subtract(NewAlarmedQTs, AlarmedQTs),
315319

316320
lists:foreach(
317321
fun(Path) ->
@@ -320,15 +324,23 @@ internal_update(#state{limit = DataDirLimit,
320324
available = Available} = maps:get(Path, NewMounts),
321325
emit_update_info(Name, "insufficient", Available, Limit)
322326
end, lists:sort(sets:to_list(NewlyAlarmedMs))),
323-
%% TODO: rabbit_alarm:set_alarm/1 for affected queue types
327+
lists:foreach(
328+
fun(QT) ->
329+
Alarm = {resource_limit, {disk, QT}, node()},
330+
rabbit_alarm:set_alarm({Alarm, []})
331+
end, lists:sort(sets:to_list(NewlyAlarmedQTs))),
324332
lists:foreach(
325333
fun(Path) ->
326334
#mount{name = Name,
327335
limit = Limit,
328336
available = Available} = maps:get(Path, NewMounts),
329337
emit_update_info(Name, "sufficient", Available, Limit)
330338
end, lists:sort(sets:to_list(NewlyClearedMs))),
331-
%% TODO: rabbit_alarm:clear_alarm/1 for affected queue types
339+
lists:foreach(
340+
fun(QT) ->
341+
Alarm = {resource_limit, {disk, QT}, node()},
342+
rabbit_alarm:clear_alarm(Alarm)
343+
end, lists:sort(sets:to_list(NewlyClearedQTs))),
332344

333345
State#state{alarmed = NewAlarmed,
334346
actual = DataDirFree,
@@ -353,6 +365,18 @@ alarmed_mounts(Mounts) ->
353365
Acc
354366
end, sets:new([{version, 2}]), Mounts).
355367

368+
-spec alarmed_queue_types(mounts()) ->
369+
sets:set(module()).
370+
alarmed_queue_types(MountPoints) ->
371+
maps:fold(
372+
fun (_Path, #mount{available = Available,
373+
limit = Limit,
374+
queue_types = QTs}, Acc) when Available < Limit ->
375+
sets:union(QTs, Acc);
376+
(_Path, _Mount, Acc) ->
377+
Acc
378+
end, sets:new([{version, 2}]), MountPoints).
379+
356380
-spec get_disk_free(#state{}) ->
357381
#{file:filename() => AvailableB :: non_neg_integer()}.
358382
get_disk_free(#state{dir = DataDir, mounts = Mounts}) ->

deps/rabbitmq_prometheus/src/collectors/prometheus_rabbitmq_alarm_metrics_collector.erl

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -35,11 +35,13 @@ collect_mf(_Registry, Callback) ->
3535
Alarms = rabbit_alarm:get_local_alarms(500), %% TODO: figure out timeout
3636
ActiveAlarms =
3737
lists:foldl(fun ({{resource_limit, disk, _}, _}, Acc) ->
38-
maps:put(disk_limit, 1, Acc);
38+
Acc#{disk_limit => 1};
39+
({{resource_limit, {disk, QT}, _}, _}, Acc) ->
40+
Acc#{{disk, QT} => 1};
3941
({{resource_limit, memory, _}, _}, Acc) ->
40-
maps:put(memory_limit, 1, Acc);
42+
Acc#{memory_limit => 1};
4143
({file_descriptor_limit, _}, Acc) ->
42-
maps:put(file_descriptor_limit, 1, Acc)
44+
Acc#{file_descriptor_limit => 1}
4345
end,
4446
#{},
4547
Alarms),
@@ -58,6 +60,15 @@ collect_mf(_Registry, Callback) ->
5860
<<"is 1 if VM memory watermark alarm is in effect">>,
5961
untyped,
6062
[untyped_metric(maps:get(memory_limit, ActiveAlarms, 0))])),
63+
64+
Callback(create_mf(?METRIC_NAME(<<"queue_type_free_disk_space_watermark">>),
65+
<<"is 1 if the queue type disk-space alarm is in effect">>,
66+
untyped,
67+
[prometheus_model_helpers:untyped_metric(
68+
#{queue_type => QT},
69+
maps:get({disk, QT}, ActiveAlarms, 0)) ||
70+
{_, QT} <- rabbit_registry:lookup_all(queue)])),
71+
6172
ok
6273
catch
6374
exit:{timeout, _} ->

0 commit comments

Comments
 (0)