Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions bottlecap/src/bin/bottlecap/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@

use bottlecap::{
base_url,
config::{self, AwsConfig, Config},
config::{self, flush_strategy::FlushStrategy, AwsConfig, Config},
event_bus::bus::EventBus,
events::Event,
lifecycle::{
Expand Down Expand Up @@ -356,7 +356,7 @@ async fn extension_loop_active(
let telemetry_listener_cancel_token =
setup_telemetry_client(&r.extension_id, logs_agent_channel).await?;

let flush_control = FlushControl::new(config.serverless_flush_strategy);
let mut flush_control = FlushControl::new(config.serverless_flush_strategy);
let mut shutdown = false;

let mut flush_interval = flush_control.get_flush_interval();
Expand Down Expand Up @@ -501,7 +501,7 @@ async fn extension_loop_active(
trace_flusher.manual_flush(),
stats_flusher.manual_flush()
);
if !flush_control.should_flush_end() {
if matches!(flush_control.flush_strategy, FlushStrategy::Periodically(_)) {
break;
}
}
Expand Down
83 changes: 73 additions & 10 deletions bottlecap/src/lifecycle/flush_control.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,18 @@
use crate::config::flush_strategy::FlushStrategy;
use std::time;
use tokio::time::Interval;
use tracing::debug;

use crate::lifecycle::invocation_times::InvocationTimes;

const DEFAULT_FLUSH_INTERVAL: u64 = 1000; // 1s
const TWENTY_SECONDS: u64 = 20 * 1000;

#[derive(Clone, Copy, Debug, PartialEq)]
pub struct FlushControl {
flush_strategy: FlushStrategy,
pub last_flush: u64,
pub flush_strategy: FlushStrategy,
invocation_times: InvocationTimes,
}

// 1. Default Strategy
Expand All @@ -17,12 +24,42 @@ pub struct FlushControl {
impl FlushControl {
#[must_use]
pub fn new(flush_strategy: FlushStrategy) -> FlushControl {
FlushControl { flush_strategy }
FlushControl {
flush_strategy,
last_flush: 0,
invocation_times: InvocationTimes::new(),
}
}

#[must_use]
pub fn should_flush_end(&self) -> bool {
!matches!(&self.flush_strategy, FlushStrategy::Periodically(_))
pub fn should_flush_end(&mut self) -> bool {
// previously: would return true if flush_strategy is not Periodically
// !matches!(self.flush_strategy, FlushStrategy::Periodically(_))
let now = match time::SystemTime::now().duration_since(time::UNIX_EPOCH) {
Ok(now) => now.as_secs(),
Err(e) => {
debug!("Failed to get current time: {:?}", e);
return false;
}
};
self.invocation_times.add(now);
match &self.flush_strategy {
FlushStrategy::End | FlushStrategy::EndPeriodically(_) => true,
FlushStrategy::Periodically(_) => false,
FlushStrategy::Default => {
if self.invocation_times.should_adapt_to_periodic(now) {
let should_periodic_flush = self.should_periodic_flush(now, TWENTY_SECONDS);
debug!(
"Adapting over to periodic flush strategy. should_periodic_flush: {}",
should_periodic_flush
);
return should_periodic_flush;
}
debug!("Not enough invocations to adapt to periodic flush, flushing at the end of the invocation");
self.last_flush = now;
true
}
}
}

#[must_use]
Expand All @@ -37,6 +74,16 @@ impl FlushControl {
FlushStrategy::End => tokio::time::interval(tokio::time::Duration::MAX),
}
}

// Only used for default strategy
fn should_periodic_flush(&mut self, now: u64, interval: u64) -> bool {
if now - self.last_flush > (interval / 1000) {
self.last_flush = now;
true
} else {
false
}
}
}

#[cfg(test)]
Expand All @@ -46,23 +93,39 @@ mod tests {

#[test]
fn should_flush_end() {
let flush_control = FlushControl::new(FlushStrategy::Default);
let mut flush_control = FlushControl::new(FlushStrategy::Default);
assert!(flush_control.should_flush_end());

let flush_control = FlushControl::new(FlushStrategy::EndPeriodically(PeriodicStrategy {
interval: 1,
}));
let mut flush_control =
FlushControl::new(FlushStrategy::EndPeriodically(PeriodicStrategy {
interval: 1,
}));
assert!(flush_control.should_flush_end());

let flush_control = FlushControl::new(FlushStrategy::End);
let mut flush_control = FlushControl::new(FlushStrategy::End);
assert!(flush_control.should_flush_end());

let flush_control = FlushControl::new(FlushStrategy::Periodically(PeriodicStrategy {
let mut flush_control = FlushControl::new(FlushStrategy::Periodically(PeriodicStrategy {
interval: 1,
}));
assert!(!flush_control.should_flush_end());
}

#[test]
fn should_flush_default_end() {
let mut flush_control = super::FlushControl::new(FlushStrategy::Default);
assert!(flush_control.should_flush_end());
}
#[test]
fn should_flush_default_periodic() {
const LOOKBACK_COUNT: usize = 20;
let mut flush_control = super::FlushControl::new(FlushStrategy::Default);
for _ in 0..LOOKBACK_COUNT - 1 {
assert!(flush_control.should_flush_end());
}
assert!(!flush_control.should_flush_end());
}

#[tokio::test]
async fn get_flush_interval() {
let flush_control = FlushControl::new(FlushStrategy::Default);
Expand Down
89 changes: 89 additions & 0 deletions bottlecap/src/lifecycle/invocation_times.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
const LOOKBACK_COUNT: usize = 20;
const ONE_TWENTY_SECONDS: f64 = 120.0;

#[derive(Clone, Copy, Debug, PartialEq)]
pub(crate) struct InvocationTimes {
times: [u64; LOOKBACK_COUNT],
head: usize,
}

impl InvocationTimes {
pub(crate) fn new() -> InvocationTimes {
InvocationTimes {
times: [0; LOOKBACK_COUNT],
head: 0,
}
}

pub(crate) fn add(&mut self, timestamp: u64) {
self.times[self.head] = timestamp;
self.head = (self.head + 1) % LOOKBACK_COUNT;
}

pub(crate) fn should_adapt_to_periodic(&self, now: u64) -> bool {
let mut count = 0;
let mut last = 0;
for time in &self.times {
if *time != 0 {
count += 1;
last = *time;
}
}
// If we haven't seen enough invocations, we should flush
if count < LOOKBACK_COUNT {
return false;
}
let elapsed = now - last;
(elapsed as f64 / (count - 1) as f64) < ONE_TWENTY_SECONDS
}
}

#[cfg(test)]
mod tests {
use crate::lifecycle::invocation_times;

#[test]
fn new() {
let invocation_times = invocation_times::InvocationTimes::new();
assert_eq!(
invocation_times.times,
[0; invocation_times::LOOKBACK_COUNT]
);
assert_eq!(invocation_times.head, 0);
}

#[test]
fn insertion() {
let mut invocation_times = invocation_times::InvocationTimes::new();
let timestamp = 1;
invocation_times.add(timestamp);
assert_eq!(invocation_times.times[0], timestamp);
assert_eq!(invocation_times.head, 1);
assert!(!invocation_times.should_adapt_to_periodic(1));
}

#[test]
fn insertion_with_full_buffer_fast_invokes() {
let mut invocation_times = invocation_times::InvocationTimes::new();
for i in 0..=invocation_times::LOOKBACK_COUNT {
invocation_times.add(i as u64);
}
// should wrap around
assert_eq!(invocation_times.times[0], 20);
assert_eq!(invocation_times.head, 1);
assert!(invocation_times.should_adapt_to_periodic(21));
}

#[test]
fn insertion_with_full_buffer_slow_invokes() {
let mut invocation_times = invocation_times::InvocationTimes::new();
invocation_times.add(1_u64);
for i in 0..invocation_times::LOOKBACK_COUNT {
invocation_times.add((i + 5000) as u64);
}
// should wrap around
assert_eq!(invocation_times.times[0], 5019);
assert_eq!(invocation_times.head, 1);
assert!(!invocation_times.should_adapt_to_periodic(10000));
}
}
1 change: 1 addition & 0 deletions bottlecap/src/lifecycle/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
pub mod flush_control;
pub mod invocation;
pub mod invocation_times;
pub mod listener;
Loading