Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion bottlecap/src/logs/agent.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use std::sync::Arc;
use std::time::{Duration, Instant};
use tokio::sync::mpsc::{self, Sender};
use tokio_util::sync::CancellationToken;
use tracing::debug;
Expand All @@ -9,6 +10,8 @@ use crate::logs::{aggregator_service::AggregatorHandle, processor::LogsProcessor
use crate::tags;
use crate::{LAMBDA_RUNTIME_SLUG, config};

const DRAIN_LOG_INTERVAL: Duration = Duration::from_millis(100);

#[allow(clippy::module_name_repetitions)]
pub struct LogsAgent {
rx: mpsc::Receiver<TelemetryEvent>,
Expand Down Expand Up @@ -57,6 +60,7 @@ impl LogsAgent {
debug!("LOGS_AGENT | Received shutdown signal, draining remaining events");

// Drain remaining events
let mut last_drain_log_time = Instant::now().checked_sub(DRAIN_LOG_INTERVAL).expect("Failed to subtract interval from now");
'drain_logs_loop: loop {
match self.rx.try_recv() {
Ok(event) => {
Expand All @@ -68,7 +72,12 @@ impl LogsAgent {
},
// Empty signals there are still outstanding senders
Err(tokio::sync::mpsc::error::TryRecvError::Empty) => {
debug!("LOGS_AGENT | No more events to process but still have senders, continuing to drain...");
// Log at most once every 100ms to avoid spamming the logs
let now = Instant::now();
if now.duration_since(last_drain_log_time) >= DRAIN_LOG_INTERVAL {
debug!("LOGS_AGENT | No more events to process but still have senders, continuing to drain...");
last_drain_log_time = now;
}
},
}
}
Expand Down
Loading