diff --git a/benches/benchmarks.rs b/benches/benchmarks.rs index c97d1db..a08c85c 100644 --- a/benches/benchmarks.rs +++ b/benches/benchmarks.rs @@ -23,6 +23,11 @@ pub fn benchmark(c: &mut Criterion) { input.publish(); }) }); + uncontended.bench_function("guarded write + send", |b| { + b.iter(|| { + *input.input_buffer_publisher() = black_box(0); + }) + }); uncontended.bench_function("send", |b| b.iter(|| input.write(black_box(0)))); uncontended.bench_function("publish + dirty update", |b| { b.iter(|| { @@ -36,6 +41,12 @@ pub fn benchmark(c: &mut Criterion) { *output.read() }) }); + uncontended.bench_function("guarded transmit", |b| { + b.iter(|| { + *input.input_buffer_publisher() = black_box(0); + *output.read() + }) + }); } { @@ -53,6 +64,10 @@ pub fn benchmark(c: &mut Criterion) { input.publish(); }) }); + read_contended.bench_function("guarded write+send", |b| { + b.iter(|| *input.input_buffer_publisher() = black_box(0)) + }); + read_contended.bench_function("send", |b| b.iter(|| input.write(black_box(0)))); }, ); diff --git a/src/lib.rs b/src/lib.rs index fc9e272..670b090 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -81,6 +81,8 @@ use crossbeam_utils::CachePadded; use alloc::sync::Arc; use core::{ cell::UnsafeCell, + fmt, + ops::{Deref, DerefMut}, sync::atomic::{AtomicU8, Ordering}, }; @@ -231,6 +233,17 @@ impl Input { back_info & BACK_DIRTY_BIT == 0 } + /// Access the input buffer directly, in non-mutable way + /// + /// This is simply a non-mutable version of `input_buffer()`. + /// For details, see the `input_buffer()` method. + /// + fn peek_input_buffer(&self) -> &T { + // Access the input buffer directly + let input_ptr = self.shared.buffers[self.input_idx as usize].get(); + unsafe { &*input_ptr } + } + /// Access the input buffer directly /// /// This advanced interface allows you to update the input buffer in place, @@ -300,6 +313,70 @@ impl Input { // Tell whether we have overwritten unread data former_back_info & BACK_DIRTY_BIT != 0 } + + /// Access the input buffer wrapped in the `InputPublishGuard` + /// + /// This interface allows you to update the input buffer in place, + /// so that you can avoid creating values of type T repeatedy just to push + /// them into the triple buffer when doing so is expensive. + /// + /// To avoid that you have to take care of pushing the update manually + /// we return an `InputPublishGuard` that calls `publish()` when dropped. + /// + /// Be aware that the buffer does not contain the last value that you published + /// (which is now available to the consumer thread). In fact, what you get + /// may not match _any_ value that you sent in the past, but rather be a new + /// value that was written in there by the consumer thread. All you can + /// safely assume is that the buffer contains a valid value of type T, which + /// you need to fill with valid values. + /// + pub fn input_buffer_publisher(&mut self) -> InputPublishGuard { + InputPublishGuard { reference: self } + } +} + +/// RAII Guard to the buffer provided by an `Input`. +/// +/// The current buffer of the `Input` can be accessed through this guard via its `Deref` and `DerefMut` implementations. +/// When the `InputPublishGuard` is dropped it calls publish. +/// +/// This structure is created by the `guarded_input_buffer` method. +/// +pub struct InputPublishGuard<'a, T: 'a + Send> { + reference: &'a mut Input, +} + +impl Deref for InputPublishGuard<'_, T> { + type Target = T; + + fn deref(&self) -> &T { + self.reference.peek_input_buffer() + } +} + +impl DerefMut for InputPublishGuard<'_, T> { + fn deref_mut(&mut self) -> &mut T { + self.reference.input_buffer() + } +} + +impl Drop for InputPublishGuard<'_, T> { + #[inline] + fn drop(&mut self) { + self.reference.publish(); + } +} + +impl fmt::Debug for InputPublishGuard<'_, T> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + fmt::Debug::fmt(&**self, f) + } +} + +impl fmt::Display for InputPublishGuard<'_, T> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + (**self).fmt(f) + } } /// Consumer interface to the triple buffer @@ -390,12 +467,12 @@ impl Output { /// performed to the output buffer via the `output_buffer()` interface. /// pub fn update(&mut self) -> bool { - // Access the shared state - let shared_state = &(*self.shared); - // Check if an update is present in the back-buffer let updated = self.updated(); if updated { + // Access the shared state + let shared_state = &(*self.shared); + // If so, exchange our output buffer with the back-buffer, thusly // acquiring exclusive access to the old back buffer while giving // the producer a new back-buffer to write to. @@ -701,6 +778,57 @@ mod tests { check_buf_state(&mut buf, false); } + /// Check that writing to a triple buffer works + #[test] + fn vec_guarded_write() { + let mut buf = TripleBuffer::new(&vec![]); + + // write new value, publish, read + { + let mut buffer = buf.input.input_buffer_publisher(); + buffer.push(0); + buffer.push(1); + buffer.push(2); + + // not yet published + let back_info = buffer.reference.shared.back_info.load(Ordering::Relaxed); + let back_buffer_dirty = back_info & BACK_DIRTY_BIT != 0; + assert!(!back_buffer_dirty); + } + check_buf_state(&mut buf, true); // after publish, before read + assert_eq!(*buf.output.read(), vec![0, 1, 2]); + check_buf_state(&mut buf, false); // after publish and read + + // write new value, publish, don't read + { + buf.input.input_buffer_publisher().push(3); + } + check_buf_state(&mut buf, true); + + // write new value, publish, read + { + buf.input.input_buffer_publisher().push(4); + } + assert_eq!(*buf.output.read(), vec![4]); + check_buf_state(&mut buf, false); + + // overwrite existing value, publish, surprising read + { + buf.input.input_buffer_publisher().push(5); + } + assert_eq!(*buf.output.read(), vec![3, 5]); + check_buf_state(&mut buf, false); + + // to avoid surprise, always clear before write + { + let mut buffer = buf.input.input_buffer_publisher(); + buffer.clear(); + buffer.push(6); + } + assert_eq!(*buf.output.read(), vec![6]); + check_buf_state(&mut buf, false); + } + /// Check that (sequentially) writing to a triple buffer works #[test] fn sequential_write() { @@ -728,6 +856,33 @@ mod tests { } } + /// Check that (sequentially) writing to a triple buffer works + #[test] + fn sequential_guarded_write() { + // Let's create a triple buffer + let mut buf = TripleBuffer::new(&false); + + // Back up the initial buffer state + let old_buf = buf.clone(); + + // Perform a write + *buf.input.input_buffer_publisher() = true; + + // Check new implementation state + { + // Starting from the old buffer state... + let mut expected_buf = old_buf.clone(); + + // ...write the new value in and swap... + *expected_buf.input.input_buffer() = true; + expected_buf.input.publish(); + + // Nothing else should have changed + assert_eq!(buf, expected_buf); + check_buf_state(&mut buf, true); + } + } + /// Check that (sequentially) reading from a triple buffer works #[test] fn sequential_read() { @@ -770,6 +925,48 @@ mod tests { } } + /// Check that (sequentially) reading from a triple buffer works + #[test] + fn sequential_guarded_read() { + // Let's create a triple buffer and write into it + let mut buf = TripleBuffer::new(&1.0); + *buf.input.input_buffer_publisher() = 4.2; + + // Test readout from dirty (freshly written) triple buffer + { + // Back up the initial buffer state + let old_buf: TripleBuffer = buf.clone(); + + // Read from the buffer + let result = *buf.output.read(); + + // Output value should be correct + assert_eq!(result, 4.2); + + // Result should be equivalent to carrying out an update + let mut expected_buf = old_buf.clone(); + assert!(expected_buf.output.update()); + assert_eq!(buf, expected_buf); + check_buf_state(&mut buf, false); + } + + // Test readout from clean (unchanged) triple buffer + { + // Back up the initial buffer state + let old_buf = buf.clone(); + + // Read from the buffer + let result = *buf.output.read(); + + // Output value should be correct + assert_eq!(result, 4.2); + + // Buffer state should be unchanged + assert_eq!(buf, old_buf); + check_buf_state(&mut buf, false); + } + } + /// Check that contended concurrent reads and writes work #[test] #[ignore]