Skip to content

Commit 8c4e644

Browse files
rate limit some logs (#4483)
* introduce rate limited variant of tracing::* macros * use rate limited warn in doc processor
1 parent a2974e5 commit 8c4e644

File tree

3 files changed

+103
-2
lines changed

3 files changed

+103
-2
lines changed

quickwit/quickwit-common/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ mod path_hasher;
3131
mod progress;
3232
pub mod pubsub;
3333
pub mod rand;
34+
pub mod rate_limited_tracing;
3435
pub mod rate_limiter;
3536
pub mod rendezvous_hasher;
3637
pub mod retry;
Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,99 @@
1+
// Copyright (C) 2024 Quickwit, Inc.
2+
//
3+
// Quickwit is offered under the AGPL v3.0 and as commercial software.
4+
// For commercial licensing, contact us at [email protected].
5+
//
6+
// AGPL:
7+
// This program is free software: you can redistribute it and/or modify
8+
// it under the terms of the GNU Affero General Public License as
9+
// published by the Free Software Foundation, either version 3 of the
10+
// License, or (at your option) any later version.
11+
//
12+
// This program is distributed in the hope that it will be useful,
13+
// but WITHOUT ANY WARRANTY; without even the implied warranty of
14+
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15+
// GNU Affero General Public License for more details.
16+
//
17+
// You should have received a copy of the GNU Affero General Public License
18+
// along with this program. If not, see <http://www.gnu.org/licenses/>.
19+
20+
#[macro_export]
21+
macro_rules! rate_limited_tracing {
22+
($log_fn:ident, limit_per_min=$limit:literal, $($args:tt)*) => {{
23+
use ::std::sync::atomic::{AtomicU32, Ordering};
24+
use ::std::sync::Mutex;
25+
use ::std::time::{Instant, Duration};
26+
27+
static COUNT: AtomicU32 = AtomicU32::new(0);
28+
// we can't build an Instant from const context, so we pinitialize with a None
29+
static LAST_RESET: Mutex<Option<Instant>> = Mutex::new(None);
30+
31+
let count = COUNT.fetch_add(1, Ordering::Relaxed);
32+
if count == 0 {
33+
// this can only be reached the very 1st time we log
34+
*LAST_RESET.lock().unwrap() = Some(Instant::now());
35+
}
36+
37+
let do_log = if count >= $limit {
38+
let mut last_reset = LAST_RESET.lock().unwrap();
39+
let current_time = Instant::now();
40+
let should_reset = last_reset
41+
.map(|last_reset| current_time.duration_since(last_reset) >= Duration::from_secs(60))
42+
.unwrap_or(true);
43+
44+
if should_reset {
45+
*last_reset = Some(current_time);
46+
// we store 1 because we are already about to log something
47+
COUNT.store(1, Ordering::Relaxed);
48+
true
49+
} else {
50+
// we are over-limit and not far enough in time to reset: don't log
51+
false
52+
}
53+
} else {
54+
true
55+
};
56+
57+
if do_log {
58+
::tracing::$log_fn!($($args)*);
59+
}
60+
}};
61+
}
62+
63+
#[macro_export]
64+
macro_rules! rate_limited_trace {
65+
($unit:ident=$limit:literal, $($args:tt)*) => {
66+
$crate::rate_limited_tracing::rate_limited_tracing!(trace, $unit=$limit, $($args)*)
67+
};
68+
}
69+
#[macro_export]
70+
macro_rules! rate_limited_debug {
71+
($unit:ident=$limit:literal, $($args:tt)*) => {
72+
$crate::rate_limited_tracing::rate_limited_tracing!(debug, $unit=$limit, $($args)*)
73+
};
74+
}
75+
#[macro_export]
76+
macro_rules! rate_limited_info {
77+
($unit:ident=$limit:literal, $($args:tt)*) => {
78+
$crate::rate_limited_tracing::rate_limited_tracing!(info, $unit=$limit, $($args)*)
79+
};
80+
}
81+
#[macro_export]
82+
macro_rules! rate_limited_warn {
83+
($unit:ident=$limit:literal, $($args:tt)*) => {
84+
$crate::rate_limited_tracing::rate_limited_tracing!(warn, $unit=$limit, $($args)*)
85+
};
86+
}
87+
#[macro_export]
88+
macro_rules! rate_limited_error {
89+
($unit:literal=$limit:literal, $($args:tt)*) => {
90+
$crate::rate_limited_tracing::rate_limited_tracing!(error, $unit=$limit, $($args)*)
91+
};
92+
}
93+
94+
#[doc(hidden)]
95+
pub use rate_limited_tracing;
96+
pub use {
97+
rate_limited_debug, rate_limited_error, rate_limited_info, rate_limited_trace,
98+
rate_limited_warn,
99+
};

quickwit/quickwit-indexing/src/actors/doc_processor.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ use anyhow::{bail, Context};
2525
use async_trait::async_trait;
2626
use bytes::Bytes;
2727
use quickwit_actors::{Actor, ActorContext, ActorExitStatus, Handler, Mailbox, QueueCapacity};
28+
use quickwit_common::rate_limited_tracing::rate_limited_warn;
2829
use quickwit_common::runtimes::RuntimeType;
2930
use quickwit_config::{SourceInputFormat, TransformConfig};
3031
use quickwit_doc_mapper::{DocMapper, DocParsingError, JsonObject};
@@ -37,7 +38,6 @@ use tantivy::schema::{Field, Value};
3738
use tantivy::{DateTime, TantivyDocument};
3839
use thiserror::Error;
3940
use tokio::runtime::Handle;
40-
use tracing::warn;
4141

4242
#[cfg(feature = "vrl")]
4343
use super::vrl_processing::*;
@@ -413,7 +413,8 @@ impl DocProcessor {
413413
processed_docs.push(processed_doc);
414414
}
415415
Err(error) => {
416-
warn!(
416+
rate_limited_warn!(
417+
limit_per_min = 5,
417418
index_id = self.counters.index_id,
418419
source_id = self.counters.source_id,
419420
"{}",

0 commit comments

Comments
 (0)