Skip to content

Commit b923ce0

Browse files
committed
Improve docs and tests
1 parent 211918f commit b923ce0

File tree

3 files changed

+141
-61
lines changed

3 files changed

+141
-61
lines changed

README.md

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,20 +18,20 @@ use ringbuffer_spsc::ringbuffer;
1818

1919
fn main() {
2020
// Create a writer/reader pair
21-
let (mut tx, mut rx) = ringbuffer::<usize>(16);
21+
let (mut writer, mut reader) = ringbuffer::<usize>(16);
2222

2323
// Thread that pushes elements on the ringbuffer
2424
std::thread::spawn(move || for i in 0..usize::MAX {
2525
// Attempt to push an element
26-
if tx.push(i).is_some() {
26+
if writer.push(i).is_some() {
2727
// The ringbuffer is full, yield the thread
2828
std::thread::yield_now();
2929
}
3030
});
3131

3232
// Loop that pulls elements from the ringbuffer
3333
loop {
34-
match rx.pull() {
34+
match reader.pull() {
3535
// We have got an element, do something
3636
Some(t) => std::hint::blackbox(t),
3737
// The ringbuffer is empty, yield the thread

src/lib.rs

Lines changed: 99 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -1,40 +1,44 @@
1-
//! A fast thread-safe `no_std` single-producer single-consumer ring buffer.
2-
//! For performance reasons, the capacity of the buffer is determined
3-
//! at compile time via a const generic and it is required to be a
4-
//! power of two for a more efficient index handling.
1+
//! A fast, small single-producer single-consumer (SPSC) ringbuffer designed
2+
//! for low-latency and high-throughput exchange between a single writer and
3+
//! a single reader. This crate is `#![no_std]` and uses `alloc` internally; it provides a
4+
//! minimal, ergonomic API that works well from `no_std` contexts that supply
5+
//! an allocator as well as from normal `std` programs and examples.
56
//!
6-
//! # Example
7+
//! Important design points:
8+
//! - The ringbuffer capacity is specified at runtime via the [`ringbuffer`] constructor
9+
//! and **must be a power of two**. The implementation uses a bitmask to wrap
10+
//! indices which is much faster than a modulo operation and reduces runtime
11+
//! overhead in hot paths.
12+
//! - The API is non-blocking: [`RingBufferWriter::push`] returns `Some(T)` immediately when the
13+
//! buffer is full (giving back ownership of the value), and [`RingBufferReader::pull`] returns
14+
//! `None` immediately when the buffer is empty. Typical usage is to yield or
15+
//! retry in those cases.
16+
//!
17+
//! *NOTE:* elements remaining in the buffer are dropped when the internal storage is deallocated.
18+
//! This happens when both [`RingBufferReader`] and [`RingBufferWriter`] are dropped.
19+
//!
20+
//! ## Example
721
//! ```rust
822
//! use ringbuffer_spsc::ringbuffer;
923
//!
10-
//! const N: usize = 1_000_000;
11-
//! let (mut tx, mut rx) = ringbuffer::<usize>(16);
24+
//! const N: usize = 8;
1225
//!
13-
//! let p = std::thread::spawn(move || {
14-
//! let mut current: usize = 0;
15-
//! while current < N {
16-
//! if tx.push(current).is_none() {
17-
//! current = current.wrapping_add(1);
18-
//! } else {
19-
//! std::thread::yield_now();
20-
//! }
26+
//! // Create a ringbuffer with capacity 16 (must be power of two)
27+
//! let (mut writer, mut reader) = ringbuffer::<usize>(16);
28+
//! // Producer
29+
//! std::thread::spawn(move || for i in 0..N {
30+
//! if writer.push(i).is_some() {
31+
//! std::thread::yield_now();
2132
//! }
2233
//! });
23-
//!
24-
//! let c = std::thread::spawn(move || {
25-
//! let mut current: usize = 0;
26-
//! while current < N {
27-
//! if let Some(c) = rx.pull() {
28-
//! assert_eq!(c, current);
29-
//! current = current.wrapping_add(1);
30-
//! } else {
31-
//! std::thread::yield_now();
32-
//! }
34+
//! // Consumer
35+
//! let mut i = 0;
36+
//! while i < N {
37+
//! match reader.pull() {
38+
//! Some(_) => i += 1,
39+
//! None => std::thread::yield_now(),
3340
//! }
34-
//! });
35-
//!
36-
//! p.join().unwrap();
37-
//! c.join().unwrap();
41+
//! }
3842
//! ```
3943
#![no_std]
4044
extern crate alloc;
@@ -46,7 +50,19 @@ use core::{
4650
};
4751
use crossbeam_utils::CachePadded;
4852

49-
/// Panic: it panics if capacity is not a power of 2.
53+
/// Create a new ringbuffer with a fixed capacity.
54+
///
55+
/// # Panics
56+
///
57+
/// Panics if *capacity* is not a power of two.
58+
///
59+
/// This requirement enables a fast bitmask wrap for indices which avoids the cost of `mod`
60+
/// in the hot path.
61+
///
62+
/// # Returns
63+
///
64+
/// A `(`[`RingBufferWriter<T>`]`, `[`RingBufferReader<T>`]`)` pair where the writer is
65+
/// intended for the single producer and the reader for the single consumer.
5066
pub fn ringbuffer<T>(capacity: usize) -> (RingBufferWriter<T>, RingBufferReader<T>) {
5167
assert!(capacity.is_power_of_two(), "Capacity must be a power of 2");
5268

@@ -60,7 +76,7 @@ pub fn ringbuffer<T>(capacity: usize) -> (RingBufferWriter<T>, RingBufferReader<
6076
// Keep the pointer to the boxed slice
6177
ptr: Box::into_raw(v),
6278
// Since capacity is a power of two, capacity-1 is a mask covering N elements overflowing when N elements have been added.
63-
// Indexes are left growing indefinetely and naturally wrap around once the index increment reaches usize::MAX.
79+
// Indexes are left growing indefinitely and naturally wrap around once the index increment reaches usize::MAX.
6480
mask: capacity - 1,
6581
idx_r: CachePadded::new(AtomicUsize::new(0)),
6682
idx_w: CachePadded::new(AtomicUsize::new(0)),
@@ -79,6 +95,12 @@ pub fn ringbuffer<T>(capacity: usize) -> (RingBufferWriter<T>, RingBufferReader<
7995
)
8096
}
8197

98+
/// Internal ringbuffer storage. This type is private to the crate.
99+
///
100+
/// It stores the raw boxed slice pointer and the atomic indices used for
101+
/// synchronization. The implementation uses monotonically increasing indices
102+
/// (wrapping on overflow) and a power-of-two mask to convert indices to
103+
/// positions inside the buffer.
82104
struct RingBuffer<T> {
83105
ptr: *mut [MaybeUninit<T>],
84106
mask: usize,
@@ -89,28 +111,47 @@ struct RingBuffer<T> {
89111
impl<T> RingBuffer<T> {
90112
#[allow(clippy::mut_from_ref)]
91113
unsafe fn get_unchecked_mut(&self, idx: usize) -> &mut MaybeUninit<T> {
114+
// Safety: caller must ensure that `idx` is in a range that refers to
115+
// an initialized slot when reading, or to a slot that may be written
116+
// when writing. This helper performs unchecked indexing into the
117+
// backing slice using the internal mask.
92118
unsafe { (&mut (*self.ptr)).get_unchecked_mut(idx & self.mask) }
93119
}
94120
}
95121

122+
// The internal `RingBuffer` is stored inside an `Arc` and will be deallocated
123+
// when the last writer or reader handle is dropped (i.e., when the `Arc`
124+
// reference count reaches zero).
96125
impl<T> Drop for RingBuffer<T> {
97126
fn drop(&mut self) {
98127
let mut idx_r = self.idx_r.load(Ordering::Acquire);
99128
let idx_w = self.idx_w.load(Ordering::Acquire);
100129

101130
while idx_r != idx_w {
131+
// SAFETY: we are in Drop and we must clean up any elements still
132+
// present in the buffer. Since only one producer and one
133+
// consumer exist, and we're dropping the entire buffer, it is
134+
// safe to assume we can take ownership of remaining initialized
135+
// elements between `idx_r` and `idx_w`.
102136
let t = unsafe {
103137
mem::replace(self.get_unchecked_mut(idx_r), MaybeUninit::uninit()).assume_init()
104138
};
105139
mem::drop(t);
106140
idx_r = idx_r.wrapping_add(1);
107141
}
108142

143+
// At this point we've taken ownership of and dropped every
144+
// initialized element that was still present in the buffer. It is
145+
// important to drop all elements before freeing the backing storage
146+
// so that each element's destructor runs exactly once. Converting
147+
// the raw pointer back into a `Box` will free the allocation for
148+
// the slice itself.
109149
let ptr = unsafe { Box::from_raw(self.ptr) };
110150
mem::drop(ptr);
111151
}
112152
}
113153

154+
/// Writer handle of the ringbuffer.
114155
pub struct RingBufferWriter<T> {
115156
inner: Arc<RingBuffer<T>>,
116157
cached_idx_r: usize,
@@ -121,21 +162,22 @@ unsafe impl<T: Send> Send for RingBufferWriter<T> {}
121162
unsafe impl<T: Sync> Sync for RingBufferWriter<T> {}
122163

123164
impl<T> RingBufferWriter<T> {
124-
// Returns the capacity of the ringbuffer
165+
/// Returns the capacity (number of slots) of the ringbuffer.
125166
pub fn capacity(&self) -> usize {
126167
self.inner.ptr.len()
127168
}
128169

129170
/// Push an element into the RingBuffer.
130-
/// It returns `Some(T)` if the RingBuffer is full, giving back the ownership of `T`.
171+
///
172+
/// Returns `Some(T)` when the buffer is full (giving back ownership of the value), otherwise returns `None` on success.
131173
#[inline]
132174
pub fn push(&mut self, t: T) -> Option<T> {
133-
// Check if the ring buffer is full.
175+
// Check if the ringbuffer is full.
134176
if self.is_full() {
135177
return Some(t);
136178
}
137179

138-
// Insert the element in the ring buffer
180+
// Insert the element in the ringbuffer
139181
let _ = mem::replace(
140182
unsafe { self.inner.get_unchecked_mut(self.local_idx_w) },
141183
MaybeUninit::new(t),
@@ -148,24 +190,25 @@ impl<T> RingBufferWriter<T> {
148190
None
149191
}
150192

151-
/// Check if the RingBuffer is full and evenutally updates the internal cached indexes.
193+
/// Check if the RingBuffer is full.
152194
#[inline]
153195
pub fn is_full(&mut self) -> bool {
154-
// Check if the ring buffer is potentially full.
196+
// Check if the ringbuffer is potentially full.
155197
// This happens when the difference between the write and read indexes equals
156-
// the ring buffer capacity. Note that the write and read indexes are left growing
198+
// the ringbuffer capacity. Note that the write and read indexes are left growing
157199
// indefinitely, so we need to compute the difference by accounting for any eventual
158200
// overflow. This requires wrapping the subtraction operation.
159201
if self.local_idx_w.wrapping_sub(self.cached_idx_r) == self.inner.ptr.len() {
160202
self.cached_idx_r = self.inner.idx_r.load(Ordering::Acquire);
161-
// Check if the ring buffer is really full
203+
// Check if the ringbuffer is really full
162204
self.local_idx_w.wrapping_sub(self.cached_idx_r) == self.inner.ptr.len()
163205
} else {
164206
false
165207
}
166208
}
167209
}
168210

211+
/// Reader handle of the ringbuffer.
169212
pub struct RingBufferReader<T> {
170213
inner: Arc<RingBuffer<T>>,
171214
local_idx_r: usize,
@@ -176,21 +219,22 @@ unsafe impl<T: Send> Send for RingBufferReader<T> {}
176219
unsafe impl<T: Sync> Sync for RingBufferReader<T> {}
177220

178221
impl<T> RingBufferReader<T> {
179-
// Returns the capacity of the ringbuffer
222+
/// Returns the capacity (number of slots) of the ringbuffer.
180223
pub fn capacity(&self) -> usize {
181224
self.inner.ptr.len()
182225
}
183226

184-
/// Pull an element from the RingBuffer.
185-
/// It returns `None` if the RingBuffer is empty.
227+
/// Pull an element from the ringbuffer.
228+
///
229+
/// Returns `Some(T)` if an element is available, otherwise `None` when the buffer is empty.
186230
#[inline]
187231
pub fn pull(&mut self) -> Option<T> {
188-
// Check if the ring buffer is potentially empty
232+
// Check if the ringbuffer is potentially empty
189233
if self.is_empty() {
190234
return None;
191235
}
192236

193-
// Remove the element from the ring buffer
237+
// Remove the element from the ringbuffer
194238
let t = unsafe {
195239
mem::replace(
196240
self.inner.get_unchecked_mut(self.local_idx_r),
@@ -206,11 +250,12 @@ impl<T> RingBufferReader<T> {
206250
Some(t)
207251
}
208252

209-
/// Peek an element from the RingBuffer without pulling it out.
210-
/// It returns `None` if the RingBuffer is empty.
253+
/// Peek an element from the ringbuffer without pulling it out.
254+
///
255+
/// Returns `Some(&T)` when at lease one element is present, or `None` when the buffer is empty.
211256
#[inline]
212257
pub fn peek(&mut self) -> Option<&T> {
213-
// Check if the ring buffer is potentially empty
258+
// Check if the ringbuffer is potentially empty
214259
if self.is_empty() {
215260
return None;
216261
}
@@ -222,11 +267,12 @@ impl<T> RingBufferReader<T> {
222267
})
223268
}
224269

225-
/// Peek a mutable element from the RingBuffer without pulling it out.
226-
/// It returns `None` if the RingBuffer is empty.
270+
/// Peek a mutable element from the ringbuffer without pulling it out.
271+
///
272+
/// Returns `Some(&mut T)` when at lease one element is present, or `None` when the buffer is empty.
227273
#[inline]
228274
pub fn peek_mut(&mut self) -> Option<&mut T> {
229-
// Check if the ring buffer is potentially empty
275+
// Check if the ringbuffer is potentially empty
230276
if self.is_empty() {
231277
return None;
232278
}
@@ -238,14 +284,14 @@ impl<T> RingBufferReader<T> {
238284
})
239285
}
240286

241-
/// Check if the RingBuffer is empty and evenutally updates the internal cached indexes.
287+
/// Check if the ringbuffer is empty.
242288
#[inline]
243289
pub fn is_empty(&mut self) -> bool {
244-
// Check if the ring buffer is potentially empty
290+
// Check if the ringbuffer is potentially empty
245291
if self.local_idx_r == self.cached_idx_w {
246292
// Update the write index
247293
self.cached_idx_w = self.inner.idx_w.load(Ordering::Acquire);
248-
// Check if the ring buffer is really empty
294+
// Check if the ringbuffer is really empty
249295
self.local_idx_r == self.cached_idx_w
250296
} else {
251297
false

tests/verify.rs

Lines changed: 39 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,8 @@
1+
use std::sync::atomic::{AtomicUsize, Ordering};
2+
13
use ringbuffer_spsc::ringbuffer;
24

5+
// Elements arrive in order
36
#[test]
47
fn it_works() {
58
const N: usize = 1_000_000;
@@ -37,16 +40,47 @@ fn it_works() {
3740
c.join().unwrap();
3841
}
3942

43+
// Memory drop check
44+
static COUNTER: AtomicUsize = AtomicUsize::new(0);
45+
46+
struct DropCounter;
47+
48+
impl DropCounter {
49+
fn new() -> Self {
50+
COUNTER.fetch_add(1, Ordering::SeqCst);
51+
Self
52+
}
53+
}
54+
55+
impl Drop for DropCounter {
56+
fn drop(&mut self) {
57+
COUNTER.fetch_sub(1, Ordering::SeqCst);
58+
}
59+
}
60+
4061
#[test]
4162
fn memcheck() {
42-
const N: usize = 4;
63+
const N: usize = 1_024;
4364

44-
let (mut tx, rx) = ringbuffer::<Box<usize>>(N);
45-
for i in 0..N {
46-
assert!(tx.push(Box::new(i)).is_none());
65+
let (mut tx, rx) = ringbuffer::<DropCounter>(N);
66+
for _ in 0..N {
67+
assert!(tx.push(DropCounter::new()).is_none());
4768
}
48-
assert!(tx.push(Box::new(N)).is_some());
69+
assert!(tx.push(DropCounter::new()).is_some());
4970

71+
assert_eq!(
72+
COUNTER.load(Ordering::SeqCst),
73+
N,
74+
"There should be as many counters as ringbuffer capacity"
75+
);
76+
77+
// Drop both reader and writer
5078
drop(tx);
5179
drop(rx);
80+
81+
assert_eq!(
82+
COUNTER.load(Ordering::SeqCst),
83+
0,
84+
"All the drop counters should have been dropped"
85+
);
5286
}

0 commit comments

Comments
 (0)