diff --git a/bd-buffer/src/buffer/aggregate_ring_buffer.rs b/bd-buffer/src/buffer/aggregate_ring_buffer.rs index b096f37eb..614aecbb2 100644 --- a/bd-buffer/src/buffer/aggregate_ring_buffer.rs +++ b/bd-buffer/src/buffer/aggregate_ring_buffer.rs @@ -134,6 +134,7 @@ impl RingBufferImpl { allow_overwrite: AllowOverwrite, volatile_stats: Arc, non_volatile_stats: Arc, + on_evicted_cb: Option>, ) -> Result> { // For aggregate buffers, the size of the file (after subtracting header space) must be >= the // size of RAM. This is to avoid situations in which we accept a record into RAM but cannot ever @@ -162,10 +163,11 @@ impl RingBufferImpl { BlockWhenReservingIntoConcurrentRead::Yes, per_record_crc32_check, non_volatile_stats, + on_evicted_cb.clone(), )?; let volatile_buffer = - VolatileRingBuffer::new(format!("{name}-volatile"), volatile_size, volatile_stats); + VolatileRingBuffer::new(format!("{name}-volatile"), volatile_size, volatile_stats, on_evicted_cb); let shared_data = Arc::new(SharedData { volatile_buffer, diff --git a/bd-buffer/src/buffer/aggregate_ring_buffer_test.rs b/bd-buffer/src/buffer/aggregate_ring_buffer_test.rs index 006db3270..a1b74da61 100644 --- a/bd-buffer/src/buffer/aggregate_ring_buffer_test.rs +++ b/bd-buffer/src/buffer/aggregate_ring_buffer_test.rs @@ -54,6 +54,7 @@ impl Helper { allow_overwrite, Arc::new(RingBufferStats::default()), stats.stats.clone(), + None, ) .unwrap(); Self { @@ -86,6 +87,7 @@ impl Helper { self.allow_overwrite, Arc::new(RingBufferStats::default()), self.stats.stats.clone(), + None, )?, self.cursor, )); diff --git a/bd-buffer/src/buffer/common_ring_buffer.rs b/bd-buffer/src/buffer/common_ring_buffer.rs index fa7d1327c..a0ba8ef02 100644 --- a/bd-buffer/src/buffer/common_ring_buffer.rs +++ b/bd-buffer/src/buffer/common_ring_buffer.rs @@ -159,6 +159,10 @@ pub struct LockedData { has_read_reservation_cb: Box bool + Send>, has_write_reservation_cb: Box bool + Send>, + // Callback invoked when a record is evicted (overwritten). + // The argument is the record data. + on_evicted_cb: Box, + // Indicates whether the buffer is in a readable state or not, i.e. if we expect a read() call to // immediately return an entry. pub readable: tokio::sync::watch::Sender, @@ -456,6 +460,20 @@ impl LockedData { // When overwriting we zero out any extra data, to make sure that CRCs, etc. become so the // overwritten record is skipped correctly if corruption lands us in it somehow. let next_read_start = guard.next_read_start().ok_or(InvariantError::Invariant)?; + + // Invoke the eviction callback before zeroing/advancing. + // We need to calculate the data slice. + let data_start = (next_read_start + guard.extra_bytes_per_record) as usize; + let data_len = next_read_size as usize; + // Safety: We are holding the lock so reading this memory is safe. + // We just need to make sure we don't access out of bounds. + // The load_next_read_size checks ensures size is valid and within bounds. + let memory_slice = unsafe { + let ptr = guard.memory().as_ptr().add(data_start); + std::slice::from_raw_parts(ptr, data_len) + }; + (guard.on_evicted_cb)(memory_slice); + guard.zero_extra_data(next_read_start); guard.advance_next_read(next_read_actual_size, Cursor::No)?; } else { @@ -981,6 +999,7 @@ impl CommonRingBuffer { on_total_data_loss_cb: impl Fn(&mut ExtraLockedData) + Send + 'static, has_read_reservation_cb: impl Fn(&ExtraLockedData) -> bool + Send + 'static, has_write_reservation_cb: impl Fn(&ExtraLockedData) -> bool + Send + 'static, + on_evicted_cb: Option>, ) -> Self { // Initialize the channel to true on startup. If we end up reading from the buffer when it's // not ready the async read calls will still wait for the read to be available, and starting @@ -1005,6 +1024,11 @@ impl CommonRingBuffer { on_total_data_loss_cb: Box::new(on_total_data_loss_cb), has_read_reservation_cb: Box::new(has_read_reservation_cb), has_write_reservation_cb: Box::new(has_write_reservation_cb), + on_evicted_cb: Box::new(move |data| { + if let Some(cb) = &on_evicted_cb { + cb(data); + } + }), wait_for_drain_data: None, pending_total_data_loss_reset: false, readable, diff --git a/bd-buffer/src/buffer/common_ring_buffer_test.rs b/bd-buffer/src/buffer/common_ring_buffer_test.rs index 04db30bf1..58e1dd648 100644 --- a/bd-buffer/src/buffer/common_ring_buffer_test.rs +++ b/bd-buffer/src/buffer/common_ring_buffer_test.rs @@ -29,7 +29,7 @@ use bd_client_stats_store::Collector; use bd_log_primitives::LossyIntToU32; use parameterized::parameterized; use std::any::Any; -use std::sync::Arc; +use std::sync::{Arc, Mutex}; use tempfile::TempDir; #[derive(Clone, Copy)] @@ -66,15 +66,27 @@ struct Helper { helper: CommonHelper, _temp_dir: TempDir, stats: StatsTestHelper, + evicted_records: Arc>>>, } impl Helper { fn new(size: u32, test_type: TestType) -> Self { let temp_dir = TempDir::with_prefix("buffer_test").unwrap(); let stats = StatsTestHelper::new(&Collector::default().scope("")); + let evicted_records = Arc::new(Mutex::new(Vec::new())); + let evicted_records_clone = evicted_records.clone(); + + let on_evicted_cb = Some(Arc::new(move |bytes: &[u8]| { + evicted_records_clone.lock().unwrap().push(bytes.to_vec()); + }) as Arc); + let buffer = match test_type { - TestType::Volatile => VolatileRingBuffer::new("test".to_string(), size, stats.stats.clone()) - as Arc, + TestType::Volatile => VolatileRingBuffer::new( + "test".to_string(), + size, + stats.stats.clone(), + on_evicted_cb, + ) as Arc, TestType::NonVolatile => NonVolatileRingBuffer::new( "test".to_string(), temp_dir.path().join("buffer"), @@ -83,6 +95,7 @@ impl Helper { BlockWhenReservingIntoConcurrentRead::No, PerRecordCrc32Check::No, stats.stats.clone(), + on_evicted_cb, ) .unwrap() as Arc, TestType::Aggregate => AggregateRingBuffer::new( @@ -94,6 +107,7 @@ impl Helper { AllowOverwrite::Yes, Arc::new(RingBufferStats::default()), stats.stats.clone(), + on_evicted_cb, ) .unwrap() as Arc, }; @@ -101,6 +115,7 @@ impl Helper { helper: CommonHelper::new(buffer, Cursor::No), _temp_dir: temp_dir, stats, + evicted_records, } } } @@ -112,7 +127,67 @@ impl Drop for Helper { } } -// Test basic error cases. + +// Test that the eviction callback is fired when records are overwritten. +#[parameterized(test_type = {TestType::Volatile, TestType::NonVolatile, TestType::Aggregate})] +fn callback_fired_on_eviction(test_type: TestType) { + let mut root = Helper::new(30, test_type); + let helper = &mut root.helper; + + + // Reserve and write 0-9. + helper.reserve_and_commit("aaaaaa"); + + // Reserve and write 10-19. + helper.reserve_and_commit("bbbbbb"); + + // Reserve and write 20-29. + helper.reserve_and_commit("cccccc"); + + // Ensure these are propagated to NonVolatile in Aggregate case. + // This prevents the "slow flush" race where Volatile overwrites before flushing to NonVolatile. + if matches!(test_type, TestType::Aggregate) { + helper.buffer.flush(); + } + + root.stats.wait_for_total_records_written(3); + + // Reserve and write 0-9. This should overwrite "aaaaaa". + helper.reserve_and_commit("dddddd"); + + if matches!(test_type, TestType::Aggregate) { + helper.buffer.flush(); + } + + root.stats.wait_for_total_records_written(4); + + + // Give a little time for callbacks to fire if there's any async behavior (though CommonRingBuffer calls it synchronously). + // However, for AggregateRingBuffer, the flush happens in a separate thread, so the eviction (and callback) + // happens on that thread. + // wait_for_total_records_written waits for the *write* to complete. + // If it's Aggregate, the write goes to Volatile. Then flush thread moves it to NonVolatile. + // The eviction happens when NonVolatile is written to. + + // Let's loop briefly to wait for the callback. + let start = std::time::Instant::now(); + loop { + let evicted = root.evicted_records.lock().unwrap(); + if !evicted.is_empty() || start.elapsed() > std::time::Duration::from_secs(5) { + break; + } + std::thread::sleep(std::time::Duration::from_millis(10)); + } + + let evicted = root.evicted_records.lock().unwrap(); + assert_eq!(evicted.len(), 1, "Expected 1 evicted record"); + let record = &evicted[0]; + let as_string = String::from_utf8_lossy(record); + // We verify that the evicted record contains "aaaaaa". + // The exact format includes some header bytes, but the payload should be there. + assert!(as_string.contains("aaaaaa"), "Evicted record should contain 'aaaaaa', got: {:?}", as_string); +} + #[parameterized(test_type = {TestType::Volatile, TestType::NonVolatile, TestType::Aggregate})] fn errors(test_type: TestType) { let mut root = Helper::new(100, test_type); diff --git a/bd-buffer/src/buffer/non_volatile_ring_buffer.rs b/bd-buffer/src/buffer/non_volatile_ring_buffer.rs index ab3461eea..629b277ed 100644 --- a/bd-buffer/src/buffer/non_volatile_ring_buffer.rs +++ b/bd-buffer/src/buffer/non_volatile_ring_buffer.rs @@ -723,6 +723,7 @@ impl RingBufferImpl { block_when_reserving_into_concurrent_read: BlockWhenReservingIntoConcurrentRead, per_record_crc32_check: PerRecordCrc32Check, stats: Arc, + on_evicted_cb: Option>, ) -> Result> { // The following static asserts verify that FileHeader is a known size with all field offsets // known. This is done to avoid the use of #pragma pack(1) which may lead to poor performance on @@ -896,6 +897,7 @@ impl RingBufferImpl { .and_then(|p| p.reservation.as_ref()) .is_some() }, + on_evicted_cb, ), })) } diff --git a/bd-buffer/src/buffer/non_volatile_ring_buffer_test.rs b/bd-buffer/src/buffer/non_volatile_ring_buffer_test.rs index 4775e4d8a..ba8553686 100644 --- a/bd-buffer/src/buffer/non_volatile_ring_buffer_test.rs +++ b/bd-buffer/src/buffer/non_volatile_ring_buffer_test.rs @@ -40,6 +40,7 @@ impl Helper { super::BlockWhenReservingIntoConcurrentRead::No, super::PerRecordCrc32Check::Yes, stats.stats.clone(), + None, ) .unwrap(); Self { @@ -82,6 +83,7 @@ impl Helper { super::BlockWhenReservingIntoConcurrentRead::No, super::PerRecordCrc32Check::Yes, self.stats.stats.clone(), + None, )?, self.cursor, )); diff --git a/bd-buffer/src/buffer/volatile_ring_buffer.rs b/bd-buffer/src/buffer/volatile_ring_buffer.rs index 753ea41b9..968a2fd6b 100644 --- a/bd-buffer/src/buffer/volatile_ring_buffer.rs +++ b/bd-buffer/src/buffer/volatile_ring_buffer.rs @@ -444,7 +444,12 @@ pub struct RingBufferImpl { impl RingBufferImpl { #[must_use] - pub fn new(name: String, size: u32, stats: Arc) -> Arc { + pub fn new( + name: String, + size: u32, + stats: Arc, + on_evicted_cb: Option>, + ) -> Arc { let mut memory_do_not_use = Vec::with_capacity(size as usize); memory_do_not_use.spare_capacity_mut(); // Appease clippy. unsafe { @@ -481,6 +486,7 @@ impl RingBufferImpl { .is_some() }, |extra_locked_data| !extra_locked_data.reservations.is_empty(), + on_evicted_cb, ); Arc::new(Self { diff --git a/bd-buffer/src/buffer/volatile_ring_buffer_test.rs b/bd-buffer/src/buffer/volatile_ring_buffer_test.rs index f73ad5237..22e8d532d 100644 --- a/bd-buffer/src/buffer/volatile_ring_buffer_test.rs +++ b/bd-buffer/src/buffer/volatile_ring_buffer_test.rs @@ -16,7 +16,7 @@ use itertools::Itertools; fn make_helper(size: u32) -> Helper { Helper::new( - RingBufferImpl::new("test".to_string(), size, RingBufferStats::default().into()), + RingBufferImpl::new("test".to_string(), size, RingBufferStats::default().into(), None), Cursor::No, ) } diff --git a/bd-buffer/src/ring_buffer.rs b/bd-buffer/src/ring_buffer.rs index bfd79726b..833ed228a 100644 --- a/bd-buffer/src/ring_buffer.rs +++ b/bd-buffer/src/ring_buffer.rs @@ -163,6 +163,9 @@ pub struct Manager { stream_buffer_size_flag: bd_runtime::runtime::IntWatch, + + // Callback invoked when a record is evicted (overwritten). + on_evicted_cb: Option>, } impl Manager { @@ -170,6 +173,7 @@ impl Manager { buffer_directory: PathBuf, stats: &Scope, runtime: &bd_runtime::runtime::ConfigLoader, + on_evicted_cb: Option>, ) -> ( Arc, tokio::sync::mpsc::Receiver, @@ -184,6 +188,7 @@ impl Manager { buffer_event_tx, scope, stream_buffer_size_flag: runtime.register_int_watch(), + on_evicted_cb, }), buffer_event_rx, ) @@ -294,6 +299,7 @@ impl Manager { .scope .counter_with_labels("total_data_loss", labels! {"buffer_id" => &buffer.id}), None, + self.on_evicted_cb.clone(), )?; updated_buffers.insert(buffer.id.clone(), (buffer_type, ring_buffer.clone())); @@ -419,6 +425,7 @@ impl Manager { "bd tail".to_string(), *self.stream_buffer_size_flag.read(), Arc::new(RingBufferStats::default()), + self.on_evicted_cb.clone(), )); Some(BufferEventWithResponse::new( @@ -565,6 +572,7 @@ impl RingBuffer { volatile_records_written: Counter, volatile_records_refused: Counter, non_volatile_records_written: Option, + on_evicted_cb: Option>, ) -> Result> { // TODO(mattklein123): Right now we expose a very limited set of stats. Given it's much easier // now to inject stats we can consider exposing the rest. For now just duplicate what we @@ -595,6 +603,7 @@ impl RingBuffer { }, Arc::new(volatile_stats), Arc::new(non_volatile_stats), + on_evicted_cb, ) } @@ -612,6 +621,7 @@ impl RingBuffer { corrupted_record_counter: Counter, total_data_loss_counter: Counter, non_volatile_records_written: Option, + on_evicted_cb: Option>, ) -> Result<(Arc, bool)> { let filename = non_volatile_filename .to_str() @@ -630,6 +640,7 @@ impl RingBuffer { write_counter.clone(), write_failure_counter.clone(), non_volatile_records_written.clone(), + on_evicted_cb.clone(), ); let mut deleted = false; @@ -660,6 +671,7 @@ impl RingBuffer { write_counter, write_failure_counter, non_volatile_records_written, + on_evicted_cb, ); } diff --git a/bd-buffer/src/ring_buffer_test.rs b/bd-buffer/src/ring_buffer_test.rs index 35a832281..c6003b4f4 100644 --- a/bd-buffer/src/ring_buffer_test.rs +++ b/bd-buffer/src/ring_buffer_test.rs @@ -38,6 +38,7 @@ async fn test_create_ring_buffer() { fake_counter(), fake_counter(), None, + None, ) .unwrap(); @@ -64,6 +65,7 @@ fn test_create_ring_buffer_illegal_path() { fake_counter(), fake_counter(), None, + None, ); assert_matches!( @@ -96,6 +98,7 @@ fn corrupted_buffer() { fake_counter(), fake_counter(), None, + None, ) .unwrap(); assert!(!deleted); @@ -118,6 +121,7 @@ fn corrupted_buffer() { fake_counter(), fake_counter(), None, + None, ) .unwrap(); assert!(deleted); @@ -130,6 +134,7 @@ async fn test_ring_buffer_manager() { dir.path().to_path_buf(), &Collector::default().scope(""), &bd_runtime::runtime::ConfigLoader::new(&PathBuf::from(".")), + None, ); // Make sure we're not letting any buffer events sit in the channel, as this extends the @@ -177,6 +182,7 @@ async fn ring_buffer_stats() { diretory.path().to_owned(), &collector.scope(""), &bd_runtime::runtime::ConfigLoader::new(&PathBuf::from(".")), + None, ); // Make sure we're not letting any buffer events sit in the channel, as this extends the @@ -276,6 +282,7 @@ async fn write_failure_stats() { diretory.path().to_owned(), &collector.scope(""), &bd_runtime::runtime::ConfigLoader::new(&PathBuf::from(".")), + None, ); // Make sure we're not letting any buffer events sit in the channel, as this extends the @@ -320,6 +327,7 @@ async fn buffer_never_resizes() { buffer_directory.path().to_path_buf(), &Collector::default().scope(""), &bd_runtime::runtime::ConfigLoader::new(&PathBuf::from("")), + None, ); // Make sure we're not letting any buffer events sit in the channel, as this extends the diff --git a/bd-logger/src/async_log_buffer_test.rs b/bd-logger/src/async_log_buffer_test.rs index 9cd6ed0c6..fc759b0d6 100644 --- a/bd-logger/src/async_log_buffer_test.rs +++ b/bd-logger/src/async_log_buffer_test.rs @@ -81,6 +81,7 @@ impl Setup { tmp_dir.path().join("buffer"), &collector.scope(""), runtime, + None, ) .0, runtime: Self::make_runtime(&tmp_dir), diff --git a/bd-logger/src/builder.rs b/bd-logger/src/builder.rs index 5357356e0..7d7aace4f 100644 --- a/bd-logger/src/builder.rs +++ b/bd-logger/src/builder.rs @@ -32,6 +32,7 @@ use bd_client_stats_store::Collector; use bd_crash_handler::Monitor; use bd_error_reporter::reporter::{UnexpectedErrorHandler, handle_unexpected}; use bd_internal_logging::NoopLogger; +use bd_log_primitives::LogEncodingHelper; use bd_proto::protos::logging::payload::LogType; use bd_runtime::runtime::network_quality::NetworkCallOnlineIndicatorTimeout; use bd_runtime::runtime::stats::{DirectStatFlushIntervalFlag, UploadStatFlushIntervalFlag}; @@ -241,6 +242,16 @@ impl LoggerBuilder { let data_upload_tx_clone = data_upload_tx.clone(); let collector_clone = collector; + // TODO(mattklein123): This is an extremely rough estimate of the oldest log timestamp. + // In the future we should likely do something better here like a histogram. + // For now we just use a gauge which tracks the oldest timestamp evicted. + let on_evicted_cb: Option> = Some(Arc::new(move |bytes| { + if let Some(ts) = LogEncodingHelper::extract_timestamp(bytes) { + // TODO: implement gauge metric update here + let _ = ts.unix_timestamp(); + } + })); + let logger = Logger::new( maybe_shutdown_trigger, runtime_loader.clone(), @@ -346,7 +357,7 @@ impl LoggerBuilder { let buffer_directory = Logger::initialize_buffer_directory(&self.params.sdk_directory)?; let (buffer_manager, buffer_event_rx) = - bd_buffer::Manager::new(buffer_directory, &scope, &runtime_loader); + bd_buffer::Manager::new(buffer_directory, &scope, &runtime_loader, on_evicted_cb); let buffer_uploader = BufferUploadManager::new( data_upload_tx_clone.clone(), &runtime_loader, diff --git a/bd-logger/src/consumer_test.rs b/bd-logger/src/consumer_test.rs index 4da88d5fd..c16db44fa 100644 --- a/bd-logger/src/consumer_test.rs +++ b/bd-logger/src/consumer_test.rs @@ -887,6 +887,7 @@ async fn log_streaming() { "test".to_string(), 1024 * 1024, Arc::new(RingBufferStats::default()), + None, ); let mut producer = buffer.clone().register_producer().unwrap(); @@ -937,6 +938,7 @@ async fn streaming_batch_size_flag() { "test_stream_batch".to_string(), 1024 * 1024, Arc::new(bd_buffer::RingBufferStats::default()), + None, ); let mut producer = buffer.clone().register_producer().unwrap(); @@ -994,6 +996,7 @@ async fn log_streaming_shutdown() { "test".to_string(), 1024 * 1024, Arc::new(RingBufferStats::default()), + None, ); let mut producer = buffer.clone().register_producer().unwrap(); @@ -1070,6 +1073,7 @@ fn create_buffer( Counter::default(), Counter::default(), Some(records_written), + None, ) .unwrap(); diff --git a/fuzz/src/lib.rs b/fuzz/src/lib.rs index bd02dbfb5..8f1bc5074 100644 --- a/fuzz/src/lib.rs +++ b/fuzz/src/lib.rs @@ -117,7 +117,7 @@ impl BufferState { ) -> Option { let buffer = match test_case.buffer_type { BufferType::Volatile => { - VolatileRingBuffer::new("test".to_string(), test_case.buffer_size, stats) + VolatileRingBuffer::new("test".to_string(), test_case.buffer_size, stats, None) }, BufferType::NonVolatile => { // TODO(mattklein123): fuzz no overwrite in non-cursor mode. @@ -133,6 +133,7 @@ impl BufferState { BlockWhenReservingIntoConcurrentRead::No, PerRecordCrc32Check::Yes, stats, + None, ) { Ok(buffer) => buffer as Arc, Err(e) => { @@ -166,6 +167,7 @@ impl BufferState { }, stats.clone(), stats, + None, ) { Ok(buffer) => buffer as Arc, Err(e) => {