Skip to content

Commit 78a8a8b

Browse files
authored
fix(internal_logs source): remove rate limit (#24218)
* fix(internal_logs source): remove rate limit * add changelog * Debug commit * Fix validated - Revert "Debug commit" This reverts commit c7b9ec9. * add unit test * fix check-events
1 parent 44f34e8 commit 78a8a8b

File tree

3 files changed

+48
-3
lines changed

3 files changed

+48
-3
lines changed
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
The `internal_logs` source now captures all internal Vector logs without rate limiting. Previously, repeated log messages were silently
2+
dropped.
3+
4+
authors: pront

src/sources/internal_logs.rs

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -213,6 +213,8 @@ mod tests {
213213
use vector_lib::{event::Value, lookup::OwnedTargetPath};
214214
use vrl::value::kind::Collection;
215215

216+
use serial_test::serial;
217+
216218
use super::*;
217219
use crate::{
218220
event::Event,
@@ -234,6 +236,7 @@ mod tests {
234236
// cases because `consume_early_buffer` (called within the
235237
// `start_source` helper) panics when called more than once.
236238
#[tokio::test]
239+
#[serial]
237240
async fn receives_logs() {
238241
trace::init(false, false, "debug", 10);
239242
trace::reset_early_buffer();
@@ -341,6 +344,43 @@ mod tests {
341344
rx
342345
}
343346

347+
// NOTE: This test requires #[serial] because it directly interacts with global tracing state.
348+
// This is a pre-existing limitation around tracing initialization in tests.
349+
#[tokio::test]
350+
#[serial]
351+
async fn repeated_logs_are_not_rate_limited() {
352+
trace::init(false, false, "info", 10);
353+
trace::reset_early_buffer();
354+
355+
let rx = start_source().await;
356+
357+
// Generate 20 identical log messages with the same component_id
358+
for _ in 0..20 {
359+
info!(component_id = "test", "Repeated test message.");
360+
}
361+
362+
sleep(Duration::from_millis(50)).await;
363+
let events = collect_ready(rx).await;
364+
365+
// Filter to only our test messages
366+
let test_events: Vec<_> = events
367+
.iter()
368+
.filter(|e| {
369+
e.as_log()
370+
.get("message")
371+
.map(|m| m.to_string_lossy() == "Repeated test message.")
372+
.unwrap_or(false)
373+
})
374+
.collect();
375+
376+
// We should receive all 20 messages, no rate limiting.
377+
assert_eq!(
378+
test_events.len(),
379+
20,
380+
"internal_logs source should capture all repeated messages without rate limiting"
381+
);
382+
}
383+
344384
#[test]
345385
fn output_schema_definition_vector_namespace() {
346386
let config = InternalLogsConfig::default();

src/trace.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -64,9 +64,10 @@ pub fn init(color: bool, json: bool, levels: &str, internal_log_rate_limit: u64)
6464
let metrics_layer =
6565
metrics_layer_enabled().then(|| MetricsLayer::new().with_filter(LevelFilter::INFO));
6666

67-
let broadcast_layer = RateLimitedLayer::new(BroadcastLayer::new())
68-
.with_default_limit(internal_log_rate_limit)
69-
.with_filter(fmt_filter.clone());
67+
// BroadcastLayer should NOT be rate limited because it feeds the internal_logs source,
68+
// which users rely on to capture ALL internal Vector logs for debugging/monitoring.
69+
// Console output (stdout/stderr) has its own separate rate limiting below.
70+
let broadcast_layer = BroadcastLayer::new().with_filter(fmt_filter.clone());
7071

7172
let subscriber = tracing_subscriber::registry()
7273
.with(metrics_layer)

0 commit comments

Comments
 (0)