@@ -288,10 +288,14 @@ internal_update(#state{limit = DataDirLimit,
288288 limit = Limit } <- NewMountPoints ]),
289289
290290 AlarmedMPs = alarmed_mount_points (MountPoints ),
291+ AlarmedQTs = alarmed_queue_types (MountPoints ),
291292 NewAlarmedMPs = alarmed_mount_points (NewMountPoints ),
293+ NewAlarmedQTs = alarmed_queue_types (NewMountPoints ),
292294
293295 NewlyClearedMPs = sets :subtract (AlarmedMPs , NewAlarmedMPs ),
296+ NewlyClearedQTs = sets :subtract (AlarmedQTs , NewAlarmedQTs ),
294297 NewlyAlarmedMPs = sets :subtract (NewAlarmedMPs , AlarmedMPs ),
298+ NewlyAlarmedQTs = sets :subtract (NewAlarmedQTs , AlarmedQTs ),
295299
296300 lists :foreach (
297301 fun (Path ) ->
@@ -301,7 +305,11 @@ internal_update(#state{limit = DataDirLimit,
301305 NewMountPoints ),
302306 emit_update_info (Id , " insufficient" , Available , Limit )
303307 end , lists :sort (sets :to_list (NewlyAlarmedMPs ))),
304- % % TODO: rabbit_alarm:set_alarm/1 for affected queue types
308+ lists :foreach (
309+ fun (QT ) ->
310+ Alarm = {resource_limit , {disk , QT }, node ()},
311+ rabbit_alarm :set_alarm ({Alarm , []})
312+ end , lists :sort (sets :to_list (NewlyAlarmedQTs ))),
305313 lists :foreach (
306314 fun (Path ) ->
307315 # mount_point {id = Id ,
@@ -310,7 +318,11 @@ internal_update(#state{limit = DataDirLimit,
310318 NewMountPoints ),
311319 emit_update_info (Id , " sufficient" , Available , Limit )
312320 end , lists :sort (sets :to_list (NewlyClearedMPs ))),
313- % % TODO: rabbit_alarm:clear_alarm/1 for affected queue types
321+ lists :foreach (
322+ fun (QT ) ->
323+ Alarm = {resource_limit , {disk , QT }, node ()},
324+ rabbit_alarm :clear_alarm (Alarm )
325+ end , lists :sort (sets :to_list (NewlyClearedQTs ))),
314326
315327 State # state {alarmed = NewAlarmed ,
316328 actual = DataDirFree ,
@@ -336,6 +348,19 @@ alarmed_mount_points(MountPoints) ->
336348 Acc
337349 end , sets :new ([{version , 2 }]), MountPoints ).
338350
351+ -spec alarmed_queue_types (mount_points ()) ->
352+ sets :set (module ()).
353+ alarmed_queue_types (MountPoints ) ->
354+ maps :fold (
355+ fun (_Path , # mount_point {available = Available ,
356+ limit = Limit ,
357+ queue_types = QTs }, Acc )
358+ when Available < Limit ->
359+ sets :union (QTs , Acc );
360+ (_Path , _MP , Acc ) ->
361+ Acc
362+ end , sets :new ([{version , 2 }]), MountPoints ).
363+
339364-spec get_disk_free (# state {}) ->
340365 #{file :filename_all () => AvailableB :: non_neg_integer ()}.
341366get_disk_free (# state {dir = DataDir , mount_points = MountPoints }) ->
0 commit comments