diff --git a/bottlecap/src/bin/bottlecap/main.rs b/bottlecap/src/bin/bottlecap/main.rs index 561c13e7f..f35f51917 100644 --- a/bottlecap/src/bin/bottlecap/main.rs +++ b/bottlecap/src/bin/bottlecap/main.rs @@ -11,7 +11,7 @@ use bottlecap::{ base_url, - config::{self, AwsConfig, Config}, + config::{self, flush_strategy::FlushStrategy, AwsConfig, Config}, event_bus::bus::EventBus, events::Event, lifecycle::{ @@ -356,7 +356,7 @@ async fn extension_loop_active( let telemetry_listener_cancel_token = setup_telemetry_client(&r.extension_id, logs_agent_channel).await?; - let flush_control = FlushControl::new(config.serverless_flush_strategy); + let mut flush_control = FlushControl::new(config.serverless_flush_strategy); let mut shutdown = false; let mut flush_interval = flush_control.get_flush_interval(); @@ -501,7 +501,7 @@ async fn extension_loop_active( trace_flusher.manual_flush(), stats_flusher.manual_flush() ); - if !flush_control.should_flush_end() { + if matches!(flush_control.flush_strategy, FlushStrategy::Periodically(_)) { break; } } diff --git a/bottlecap/src/lifecycle/flush_control.rs b/bottlecap/src/lifecycle/flush_control.rs index 97ff32de9..4609d60ef 100644 --- a/bottlecap/src/lifecycle/flush_control.rs +++ b/bottlecap/src/lifecycle/flush_control.rs @@ -1,11 +1,18 @@ use crate::config::flush_strategy::FlushStrategy; +use std::time; use tokio::time::Interval; +use tracing::debug; + +use crate::lifecycle::invocation_times::InvocationTimes; const DEFAULT_FLUSH_INTERVAL: u64 = 1000; // 1s +const TWENTY_SECONDS: u64 = 20 * 1000; #[derive(Clone, Copy, Debug, PartialEq)] pub struct FlushControl { - flush_strategy: FlushStrategy, + pub last_flush: u64, + pub flush_strategy: FlushStrategy, + invocation_times: InvocationTimes, } // 1. Default Strategy @@ -17,12 +24,42 @@ pub struct FlushControl { impl FlushControl { #[must_use] pub fn new(flush_strategy: FlushStrategy) -> FlushControl { - FlushControl { flush_strategy } + FlushControl { + flush_strategy, + last_flush: 0, + invocation_times: InvocationTimes::new(), + } } #[must_use] - pub fn should_flush_end(&self) -> bool { - !matches!(&self.flush_strategy, FlushStrategy::Periodically(_)) + pub fn should_flush_end(&mut self) -> bool { + // previously: would return true if flush_strategy is not Periodically + // !matches!(self.flush_strategy, FlushStrategy::Periodically(_)) + let now = match time::SystemTime::now().duration_since(time::UNIX_EPOCH) { + Ok(now) => now.as_secs(), + Err(e) => { + debug!("Failed to get current time: {:?}", e); + return false; + } + }; + self.invocation_times.add(now); + match &self.flush_strategy { + FlushStrategy::End | FlushStrategy::EndPeriodically(_) => true, + FlushStrategy::Periodically(_) => false, + FlushStrategy::Default => { + if self.invocation_times.should_adapt_to_periodic(now) { + let should_periodic_flush = self.should_periodic_flush(now, TWENTY_SECONDS); + debug!( + "Adapting over to periodic flush strategy. should_periodic_flush: {}", + should_periodic_flush + ); + return should_periodic_flush; + } + debug!("Not enough invocations to adapt to periodic flush, flushing at the end of the invocation"); + self.last_flush = now; + true + } + } } #[must_use] @@ -37,6 +74,16 @@ impl FlushControl { FlushStrategy::End => tokio::time::interval(tokio::time::Duration::MAX), } } + + // Only used for default strategy + fn should_periodic_flush(&mut self, now: u64, interval: u64) -> bool { + if now - self.last_flush > (interval / 1000) { + self.last_flush = now; + true + } else { + false + } + } } #[cfg(test)] @@ -46,23 +93,39 @@ mod tests { #[test] fn should_flush_end() { - let flush_control = FlushControl::new(FlushStrategy::Default); + let mut flush_control = FlushControl::new(FlushStrategy::Default); assert!(flush_control.should_flush_end()); - let flush_control = FlushControl::new(FlushStrategy::EndPeriodically(PeriodicStrategy { - interval: 1, - })); + let mut flush_control = + FlushControl::new(FlushStrategy::EndPeriodically(PeriodicStrategy { + interval: 1, + })); assert!(flush_control.should_flush_end()); - let flush_control = FlushControl::new(FlushStrategy::End); + let mut flush_control = FlushControl::new(FlushStrategy::End); assert!(flush_control.should_flush_end()); - let flush_control = FlushControl::new(FlushStrategy::Periodically(PeriodicStrategy { + let mut flush_control = FlushControl::new(FlushStrategy::Periodically(PeriodicStrategy { interval: 1, })); assert!(!flush_control.should_flush_end()); } + #[test] + fn should_flush_default_end() { + let mut flush_control = super::FlushControl::new(FlushStrategy::Default); + assert!(flush_control.should_flush_end()); + } + #[test] + fn should_flush_default_periodic() { + const LOOKBACK_COUNT: usize = 20; + let mut flush_control = super::FlushControl::new(FlushStrategy::Default); + for _ in 0..LOOKBACK_COUNT - 1 { + assert!(flush_control.should_flush_end()); + } + assert!(!flush_control.should_flush_end()); + } + #[tokio::test] async fn get_flush_interval() { let flush_control = FlushControl::new(FlushStrategy::Default); diff --git a/bottlecap/src/lifecycle/invocation_times.rs b/bottlecap/src/lifecycle/invocation_times.rs new file mode 100644 index 000000000..15bfaa6b5 --- /dev/null +++ b/bottlecap/src/lifecycle/invocation_times.rs @@ -0,0 +1,89 @@ +const LOOKBACK_COUNT: usize = 20; +const ONE_TWENTY_SECONDS: f64 = 120.0; + +#[derive(Clone, Copy, Debug, PartialEq)] +pub(crate) struct InvocationTimes { + times: [u64; LOOKBACK_COUNT], + head: usize, +} + +impl InvocationTimes { + pub(crate) fn new() -> InvocationTimes { + InvocationTimes { + times: [0; LOOKBACK_COUNT], + head: 0, + } + } + + pub(crate) fn add(&mut self, timestamp: u64) { + self.times[self.head] = timestamp; + self.head = (self.head + 1) % LOOKBACK_COUNT; + } + + pub(crate) fn should_adapt_to_periodic(&self, now: u64) -> bool { + let mut count = 0; + let mut last = 0; + for time in &self.times { + if *time != 0 { + count += 1; + last = *time; + } + } + // If we haven't seen enough invocations, we should flush + if count < LOOKBACK_COUNT { + return false; + } + let elapsed = now - last; + (elapsed as f64 / (count - 1) as f64) < ONE_TWENTY_SECONDS + } +} + +#[cfg(test)] +mod tests { + use crate::lifecycle::invocation_times; + + #[test] + fn new() { + let invocation_times = invocation_times::InvocationTimes::new(); + assert_eq!( + invocation_times.times, + [0; invocation_times::LOOKBACK_COUNT] + ); + assert_eq!(invocation_times.head, 0); + } + + #[test] + fn insertion() { + let mut invocation_times = invocation_times::InvocationTimes::new(); + let timestamp = 1; + invocation_times.add(timestamp); + assert_eq!(invocation_times.times[0], timestamp); + assert_eq!(invocation_times.head, 1); + assert!(!invocation_times.should_adapt_to_periodic(1)); + } + + #[test] + fn insertion_with_full_buffer_fast_invokes() { + let mut invocation_times = invocation_times::InvocationTimes::new(); + for i in 0..=invocation_times::LOOKBACK_COUNT { + invocation_times.add(i as u64); + } + // should wrap around + assert_eq!(invocation_times.times[0], 20); + assert_eq!(invocation_times.head, 1); + assert!(invocation_times.should_adapt_to_periodic(21)); + } + + #[test] + fn insertion_with_full_buffer_slow_invokes() { + let mut invocation_times = invocation_times::InvocationTimes::new(); + invocation_times.add(1_u64); + for i in 0..invocation_times::LOOKBACK_COUNT { + invocation_times.add((i + 5000) as u64); + } + // should wrap around + assert_eq!(invocation_times.times[0], 5019); + assert_eq!(invocation_times.head, 1); + assert!(!invocation_times.should_adapt_to_periodic(10000)); + } +} diff --git a/bottlecap/src/lifecycle/mod.rs b/bottlecap/src/lifecycle/mod.rs index a0b3eda68..681a72e98 100644 --- a/bottlecap/src/lifecycle/mod.rs +++ b/bottlecap/src/lifecycle/mod.rs @@ -1,3 +1,4 @@ pub mod flush_control; pub mod invocation; +pub mod invocation_times; pub mod listener;