Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions benches/benchmarks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,11 @@ pub fn benchmark(c: &mut Criterion) {
input.publish();
})
});
uncontended.bench_function("guarded write + send", |b| {
b.iter(|| {
*input.input_buffer_publisher() = black_box(0);
})
});
uncontended.bench_function("send", |b| b.iter(|| input.write(black_box(0))));
uncontended.bench_function("publish + dirty update", |b| {
b.iter(|| {
Expand All @@ -36,6 +41,12 @@ pub fn benchmark(c: &mut Criterion) {
*output.read()
})
});
uncontended.bench_function("guarded transmit", |b| {
b.iter(|| {
*input.input_buffer_publisher() = black_box(0);
*output.read()
})
});
}

{
Expand All @@ -53,6 +64,10 @@ pub fn benchmark(c: &mut Criterion) {
input.publish();
})
});
read_contended.bench_function("guarded write+send", |b| {
b.iter(|| *input.input_buffer_publisher() = black_box(0))
});

read_contended.bench_function("send", |b| b.iter(|| input.write(black_box(0))));
},
);
Expand Down
203 changes: 200 additions & 3 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,8 @@ use crossbeam_utils::CachePadded;
use alloc::sync::Arc;
use core::{
cell::UnsafeCell,
fmt,
ops::{Deref, DerefMut},
sync::atomic::{AtomicU8, Ordering},
};

Expand Down Expand Up @@ -231,6 +233,17 @@ impl<T: Send> Input<T> {
back_info & BACK_DIRTY_BIT == 0
}

/// Access the input buffer directly, in non-mutable way
///
/// This is simply a non-mutable version of `input_buffer()`.
/// For details, see the `input_buffer()` method.
///
fn peek_input_buffer(&self) -> &T {
// Access the input buffer directly
let input_ptr = self.shared.buffers[self.input_idx as usize].get();
unsafe { &*input_ptr }
}

/// Access the input buffer directly
///
/// This advanced interface allows you to update the input buffer in place,
Expand Down Expand Up @@ -300,6 +313,70 @@ impl<T: Send> Input<T> {
// Tell whether we have overwritten unread data
former_back_info & BACK_DIRTY_BIT != 0
}

/// Access the input buffer wrapped in the `InputPublishGuard`
///
/// This interface allows you to update the input buffer in place,
/// so that you can avoid creating values of type T repeatedy just to push
/// them into the triple buffer when doing so is expensive.
///
/// To avoid that you have to take care of pushing the update manually
/// we return an `InputPublishGuard` that calls `publish()` when dropped.
///
/// Be aware that the buffer does not contain the last value that you published
/// (which is now available to the consumer thread). In fact, what you get
/// may not match _any_ value that you sent in the past, but rather be a new
/// value that was written in there by the consumer thread. All you can
/// safely assume is that the buffer contains a valid value of type T, which
/// you need to fill with valid values.
///
pub fn input_buffer_publisher(&mut self) -> InputPublishGuard<T> {
InputPublishGuard { reference: self }
}
}

/// RAII Guard to the buffer provided by an `Input`.
///
/// The current buffer of the `Input` can be accessed through this guard via its `Deref` and `DerefMut` implementations.
/// When the `InputPublishGuard` is dropped it calls publish.
///
/// This structure is created by the `guarded_input_buffer` method.
///
pub struct InputPublishGuard<'a, T: 'a + Send> {
reference: &'a mut Input<T>,
}

impl<T: Send> Deref for InputPublishGuard<'_, T> {
type Target = T;

fn deref(&self) -> &T {
self.reference.peek_input_buffer()
}
}

impl<T: Send> DerefMut for InputPublishGuard<'_, T> {
fn deref_mut(&mut self) -> &mut T {
self.reference.input_buffer()
}
}

impl<T: Send> Drop for InputPublishGuard<'_, T> {
#[inline]
fn drop(&mut self) {
self.reference.publish();
}
}

impl<T: Send + fmt::Debug> fmt::Debug for InputPublishGuard<'_, T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt::Debug::fmt(&**self, f)
}
}

impl<T: fmt::Display + Send> fmt::Display for InputPublishGuard<'_, T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
(**self).fmt(f)
}
}

/// Consumer interface to the triple buffer
Expand Down Expand Up @@ -390,12 +467,12 @@ impl<T: Send> Output<T> {
/// performed to the output buffer via the `output_buffer()` interface.
///
pub fn update(&mut self) -> bool {
// Access the shared state
let shared_state = &(*self.shared);

// Check if an update is present in the back-buffer
let updated = self.updated();
if updated {
// Access the shared state
let shared_state = &(*self.shared);

// If so, exchange our output buffer with the back-buffer, thusly
// acquiring exclusive access to the old back buffer while giving
// the producer a new back-buffer to write to.
Expand Down Expand Up @@ -701,6 +778,57 @@ mod tests {
check_buf_state(&mut buf, false);
}

/// Check that writing to a triple buffer works
#[test]
fn vec_guarded_write() {
let mut buf = TripleBuffer::new(&vec![]);

// write new value, publish, read
{
let mut buffer = buf.input.input_buffer_publisher();
buffer.push(0);
buffer.push(1);
buffer.push(2);

// not yet published
let back_info = buffer.reference.shared.back_info.load(Ordering::Relaxed);
let back_buffer_dirty = back_info & BACK_DIRTY_BIT != 0;
assert!(!back_buffer_dirty);
}
check_buf_state(&mut buf, true); // after publish, before read
assert_eq!(*buf.output.read(), vec![0, 1, 2]);
check_buf_state(&mut buf, false); // after publish and read

// write new value, publish, don't read
{
buf.input.input_buffer_publisher().push(3);
}
check_buf_state(&mut buf, true);

// write new value, publish, read
{
buf.input.input_buffer_publisher().push(4);
}
assert_eq!(*buf.output.read(), vec![4]);
check_buf_state(&mut buf, false);

// overwrite existing value, publish, surprising read
{
buf.input.input_buffer_publisher().push(5);
}
assert_eq!(*buf.output.read(), vec![3, 5]);
check_buf_state(&mut buf, false);

// to avoid surprise, always clear before write
{
let mut buffer = buf.input.input_buffer_publisher();
buffer.clear();
buffer.push(6);
}
assert_eq!(*buf.output.read(), vec![6]);
check_buf_state(&mut buf, false);
}

/// Check that (sequentially) writing to a triple buffer works
#[test]
fn sequential_write() {
Expand Down Expand Up @@ -728,6 +856,33 @@ mod tests {
}
}

/// Check that (sequentially) writing to a triple buffer works
#[test]
fn sequential_guarded_write() {
// Let's create a triple buffer
let mut buf = TripleBuffer::new(&false);

// Back up the initial buffer state
let old_buf = buf.clone();

// Perform a write
*buf.input.input_buffer_publisher() = true;

// Check new implementation state
{
// Starting from the old buffer state...
let mut expected_buf = old_buf.clone();

// ...write the new value in and swap...
*expected_buf.input.input_buffer() = true;
expected_buf.input.publish();

// Nothing else should have changed
assert_eq!(buf, expected_buf);
check_buf_state(&mut buf, true);
}
}

/// Check that (sequentially) reading from a triple buffer works
#[test]
fn sequential_read() {
Expand Down Expand Up @@ -770,6 +925,48 @@ mod tests {
}
}

/// Check that (sequentially) reading from a triple buffer works
#[test]
fn sequential_guarded_read() {
// Let's create a triple buffer and write into it
let mut buf = TripleBuffer::new(&1.0);
*buf.input.input_buffer_publisher() = 4.2;

// Test readout from dirty (freshly written) triple buffer
{
// Back up the initial buffer state
let old_buf: TripleBuffer<f64> = buf.clone();

// Read from the buffer
let result = *buf.output.read();

// Output value should be correct
assert_eq!(result, 4.2);

// Result should be equivalent to carrying out an update
let mut expected_buf = old_buf.clone();
assert!(expected_buf.output.update());
assert_eq!(buf, expected_buf);
check_buf_state(&mut buf, false);
}

// Test readout from clean (unchanged) triple buffer
{
// Back up the initial buffer state
let old_buf = buf.clone();

// Read from the buffer
let result = *buf.output.read();

// Output value should be correct
assert_eq!(result, 4.2);

// Buffer state should be unchanged
assert_eq!(buf, old_buf);
check_buf_state(&mut buf, false);
}
}

/// Check that contended concurrent reads and writes work
#[test]
#[ignore]
Expand Down