11mod norec;
22
3- use std:: { sync:: Arc , time:: Duration } ;
3+ use std:: {
4+ cmp:: Ordering ,
5+ sync:: { Arc , Mutex } ,
6+ time:: { Duration , UNIX_EPOCH } ,
7+ } ;
48
59use anyhow:: anyhow;
610use norec:: NoRec ;
@@ -172,6 +176,7 @@ struct NotifySink {
172176 rt : tokio:: runtime:: Handle ,
173177 notifiers : Vec < Box < dyn notify:: NotifierTrait > > ,
174178 no_rec : NoRec ,
179+ hourly_errs : Mutex < ( u64 , usize ) > ,
175180}
176181
177182impl NotifySink {
@@ -193,6 +198,55 @@ impl NotifySink {
193198 rt : tokio:: runtime:: Handle :: current ( ) ,
194199 notifiers : notify. into_iter ( ) . map ( notify:: notifier) . collect ( ) ,
195200 no_rec : NoRec :: new ( ) ,
201+ hourly_errs : Mutex :: new ( ( 0 , 0 ) ) ,
202+ }
203+ }
204+
205+ fn notify_log ( & self , message : impl Into < String > ) {
206+ let notification = Notification {
207+ kind : NotificationKind :: Log ( message. into ( ) ) ,
208+ source : & Self :: STATUS_SOURCE ,
209+ } ;
210+
211+ tokio:: task:: block_in_place ( || {
212+ for notifier in & self . notifiers {
213+ self . rt
214+ . block_on ( async { notify:: notify ( & * * notifier, & notification) . await } ) ;
215+ }
216+ } ) ;
217+ }
218+
219+ fn should_skip ( & self , record : & Record ) -> bool {
220+ const ERR_LIMIT : usize = 20 ;
221+
222+ let this_hour = record. time ( ) . duration_since ( UNIX_EPOCH ) . unwrap ( ) . as_secs ( ) / 3600 ;
223+
224+ let mut hourly_errs = self . hourly_errs . lock ( ) . unwrap ( ) ;
225+ if hourly_errs. 0 != this_hour {
226+ if hourly_errs. 1 > ERR_LIMIT {
227+ self . notify_log ( format ! (
228+ "⚠️ {} errors were skipped prior to this, resumed error notifications" ,
229+ hourly_errs. 1 - ERR_LIMIT
230+ ) ) ;
231+ }
232+ * hourly_errs = ( this_hour, 0 ) ;
233+ }
234+
235+ if LevelFilter :: MoreSevereEqual ( Level :: Error ) . test ( record. level ( ) ) {
236+ hourly_errs. 1 += 1 ;
237+
238+ match hourly_errs. 1 . cmp ( & ( ERR_LIMIT + 1 ) ) {
239+ Ordering :: Equal => {
240+ self . notify_log ( format ! (
241+ "⚠️ too many errors (limit {ERR_LIMIT}), skip notifying in this hour. please check the logs through other means."
242+ ) ) ;
243+ true
244+ }
245+ Ordering :: Greater => true ,
246+ Ordering :: Less => false ,
247+ }
248+ } else {
249+ false
196250 }
197251 }
198252}
@@ -209,23 +263,15 @@ impl Sink for NotifySink {
209263 if guard. is_none ( ) {
210264 return Ok ( ( ) ) ;
211265 }
266+ if self . should_skip ( record) {
267+ return Ok ( ( ) ) ;
268+ }
212269
213270 let mut buf = StringBuf :: new ( ) ;
214271 let mut ctx = FormatterContext :: new ( ) ;
215272 self . prop . formatter ( ) . format ( record, & mut buf, & mut ctx) ?;
216273
217- let notification = Notification {
218- kind : NotificationKind :: Log ( buf) ,
219- source : & Self :: STATUS_SOURCE ,
220- } ;
221-
222- tokio:: task:: block_in_place ( || {
223- for notifier in & self . notifiers {
224- self . rt
225- . block_on ( async { notify:: notify ( & * * notifier, & notification) . await } ) ;
226- }
227- } ) ;
228-
274+ self . notify_log ( buf) ;
229275 Ok ( ( ) )
230276 }
231277
0 commit comments