diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index dbb731068f..3ff2ce31b4 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -50,6 +50,9 @@ jobs: - name: Run miri run: MIRIFLAGS=-Zmiri-ignore-leaks cargo miri test --features="alloc,defmt,mpmc_large,portable-atomic-critical-section,serde,ufmt,bytes,zeroize" + - name: Run miri (mpmc crossbeam) + run: MIRIFLAGS=-Zmiri-ignore-leaks cargo miri test --features="alloc,defmt,mpmc_large,mpmc_crossbeam,portable-atomic-critical-section,serde,ufmt,bytes,zeroize" + # Run cargo test test: name: test @@ -86,6 +89,9 @@ jobs: - name: Run cargo test run: cargo test --features="alloc,defmt,mpmc_large,portable-atomic-critical-section,serde,ufmt,bytes,zeroize" + - name: Run cargo test (mpmc crossbeam) + run: cargo test --features="alloc,defmt,mpmc_large,mpmc_crossbeam,portable-atomic-critical-section,serde,ufmt,bytes,zeroize" + # Run cargo fmt --check style: name: style diff --git a/Cargo.toml b/Cargo.toml index 7390c86bc9..e5b8e07690 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -58,6 +58,9 @@ alloc = [] nightly = [] +# Implement MPMC using `crossbeam::ArrayQueue` +mpmc_crossbeam = ["dep:crossbeam-utils"] + [dependencies] bytes = { version = "1", default-features = false, optional = true } portable-atomic = { version = "1.0", optional = true } @@ -67,6 +70,7 @@ ufmt = { version = "0.2", optional = true } ufmt-write = { version = "0.1", optional = true } defmt = { version = "1.0.1", optional = true } zeroize = { version = "1.8", optional = true, default-features = false, features = ["derive"] } +crossbeam-utils = { version = "0.8", optional = true } # for the pool module [target.'cfg(any(target_arch = "arm", target_pointer_width = "32", target_pointer_width = "64"))'.dependencies] diff --git a/src/mpmc.rs b/src/mpmc.rs index f676c7c93a..f68193315b 100644 --- a/src/mpmc.rs +++ b/src/mpmc.rs @@ -68,17 +68,13 @@ //! //! [bounded MPMC queue]: http://www.1024cores.net/home/lock-free-algorithms/queues/bounded-mpmc-queue -use core::{cell::UnsafeCell, mem::MaybeUninit}; +use crate::storage::ViewStorage; #[cfg(not(feature = "portable-atomic"))] use core::sync::atomic; #[cfg(feature = "portable-atomic")] use portable_atomic as atomic; -use atomic::Ordering; - -use crate::storage::{OwnedStorage, Storage, ViewStorage}; - #[cfg(feature = "mpmc_large")] type AtomicTargetSize = atomic::AtomicUsize; #[cfg(not(feature = "mpmc_large"))] @@ -89,268 +85,39 @@ type UintSize = usize; #[cfg(not(feature = "mpmc_large"))] type UintSize = u8; -#[cfg(feature = "mpmc_large")] -type IntSize = isize; -#[cfg(not(feature = "mpmc_large"))] -type IntSize = i8; +#[cfg(feature = "mpmc_crossbeam")] +pub mod crossbeam_array_queue; +#[cfg(feature = "mpmc_crossbeam")] +pub use crossbeam_array_queue::*; -/// Base struct for [`Queue`] and [`QueueView`], generic over the [`Storage`]. -/// -/// In most cases you should use [`Queue`] or [`QueueView`] directly. Only use this -/// struct if you want to write code that's generic over both. -pub struct QueueInner { - dequeue_pos: AtomicTargetSize, - enqueue_pos: AtomicTargetSize, - buffer: UnsafeCell>>, -} - -/// A statically allocated multi-producer, multi-consumer queue with a capacity of `N` elements. -/// -///
-/// -/// `N` must be a power of 2. -/// -///
-/// -/// The maximum value of `N` is 128 if the `mpmc_large` feature is not enabled. -pub type Queue = QueueInner>; +#[cfg(not(feature = "mpmc_crossbeam"))] +mod original; +#[cfg(not(feature = "mpmc_crossbeam"))] +pub use original::*; /// A [`Queue`] with dynamic capacity. /// /// [`Queue`] coerces to `QueueView`. `QueueView` is `!Sized`, meaning it can only ever be used by reference. pub type QueueView = QueueInner; -impl Queue { - /// Creates an empty queue. - pub const fn new() -> Self { - const { - assert!(N > 1); - assert!(N.is_power_of_two()); - assert!(N < UintSize::MAX as usize); - } - - let mut cell_count = 0; - - let mut result_cells: [Cell; N] = [const { Cell::new(0) }; N]; - while cell_count != N { - result_cells[cell_count] = Cell::new(cell_count); - cell_count += 1; - } - - Self { - buffer: UnsafeCell::new(result_cells), - dequeue_pos: AtomicTargetSize::new(0), - enqueue_pos: AtomicTargetSize::new(0), - } - } - - /// Used in `Storage` implementation. - pub(crate) fn as_view_private(&self) -> &QueueView { - self - } - /// Used in `Storage` implementation. - pub(crate) fn as_view_mut_private(&mut self) -> &mut QueueView { - self - } -} - -impl QueueInner { - /// Returns the maximum number of elements the queue can hold. - #[inline] - pub fn capacity(&self) -> usize { - S::len(self.buffer.get()) - } - - /// Get a reference to the `Queue`, erasing the `N` const-generic. - /// - /// - /// ```rust - /// # use heapless::mpmc::{Queue, QueueView}; - /// let queue: Queue = Queue::new(); - /// let view: &QueueView = queue.as_view(); - /// ``` - /// - /// It is often preferable to do the same through type coerction, since `Queue` implements `Unsize>`: - /// - /// ```rust - /// # use heapless::mpmc::{Queue, QueueView}; - /// let queue: Queue = Queue::new(); - /// let view: &QueueView = &queue; - /// ``` - #[inline] - pub fn as_view(&self) -> &QueueView { - S::as_mpmc_view(self) - } - - /// Get a mutable reference to the `Queue`, erasing the `N` const-generic. - /// - /// ```rust - /// # use heapless::mpmc::{Queue, QueueView}; - /// let mut queue: Queue = Queue::new(); - /// let view: &mut QueueView = queue.as_mut_view(); - /// ``` - /// - /// It is often preferable to do the same through type coerction, since `Queue` implements `Unsize>`: - /// - /// ```rust - /// # use heapless::mpmc::{Queue, QueueView}; - /// let mut queue: Queue = Queue::new(); - /// let view: &mut QueueView = &mut queue; - /// ``` - #[inline] - pub fn as_mut_view(&mut self) -> &mut QueueView { - S::as_mpmc_mut_view(self) - } - - fn mask(&self) -> UintSize { - (S::len(self.buffer.get()) - 1) as _ - } - - /// Returns the item in the front of the queue, or `None` if the queue is empty. - pub fn dequeue(&self) -> Option { - unsafe { dequeue(S::as_ptr(self.buffer.get()), &self.dequeue_pos, self.mask()) } - } - - /// Adds an `item` to the end of the queue. - /// - /// Returns back the `item` if the queue is full. - pub fn enqueue(&self, item: T) -> Result<(), T> { - unsafe { - enqueue( - S::as_ptr(self.buffer.get()), - &self.enqueue_pos, - self.mask(), - item, - ) - } - } -} - -impl Default for Queue { - fn default() -> Self { - Self::new() - } -} - -impl Drop for QueueInner { - fn drop(&mut self) { - // Drop all elements currently in the queue. - while self.dequeue().is_some() {} - } -} - -unsafe impl Sync for QueueInner where T: Send {} - -struct Cell { - data: MaybeUninit, - sequence: AtomicTargetSize, -} - -impl Cell { - const fn new(seq: usize) -> Self { - Self { - data: MaybeUninit::uninit(), - sequence: AtomicTargetSize::new(seq as UintSize), - } - } -} - -unsafe fn dequeue( - buffer: *mut Cell, - dequeue_pos: &AtomicTargetSize, - mask: UintSize, -) -> Option { - let mut pos = dequeue_pos.load(Ordering::Relaxed); - - let mut cell; - loop { - cell = buffer.add(usize::from(pos & mask)); - let seq = (*cell).sequence.load(Ordering::Acquire); - let dif = (seq as IntSize).wrapping_sub((pos.wrapping_add(1)) as IntSize); - - match dif.cmp(&0) { - core::cmp::Ordering::Equal => { - if dequeue_pos - .compare_exchange_weak( - pos, - pos.wrapping_add(1), - Ordering::Relaxed, - Ordering::Relaxed, - ) - .is_ok() - { - break; - } - } - core::cmp::Ordering::Less => { - return None; - } - core::cmp::Ordering::Greater => { - pos = dequeue_pos.load(Ordering::Relaxed); - } - } - } - - let data = (*cell).data.as_ptr().read(); - (*cell) - .sequence - .store(pos.wrapping_add(mask).wrapping_add(1), Ordering::Release); - Some(data) -} - -unsafe fn enqueue( - buffer: *mut Cell, - enqueue_pos: &AtomicTargetSize, - mask: UintSize, - item: T, -) -> Result<(), T> { - let mut pos = enqueue_pos.load(Ordering::Relaxed); - - let mut cell; - loop { - cell = buffer.add(usize::from(pos & mask)); - let seq = (*cell).sequence.load(Ordering::Acquire); - let dif = (seq as IntSize).wrapping_sub(pos as IntSize); - - match dif.cmp(&0) { - core::cmp::Ordering::Equal => { - if enqueue_pos - .compare_exchange_weak( - pos, - pos.wrapping_add(1), - Ordering::Relaxed, - Ordering::Relaxed, - ) - .is_ok() - { - break; - } - } - core::cmp::Ordering::Less => { - return Err(item); - } - core::cmp::Ordering::Greater => { - pos = enqueue_pos.load(Ordering::Relaxed); - } - } - } - - (*cell).data.as_mut_ptr().write(item); - (*cell) - .sequence - .store(pos.wrapping_add(1), Ordering::Release); - Ok(()) -} - #[cfg(test)] mod tests { use static_assertions::assert_not_impl_any; - use super::Queue; + use super::{Queue, QueueView}; // Ensure a `Queue` containing `!Send` values stays `!Send` itself. assert_not_impl_any!(Queue<*const (), 4>: Send); + fn to_vec(q: &QueueView) -> Vec { + // inaccurate + let mut ret = vec![]; + while let Some(v) = q.dequeue() { + ret.push(v); + } + ret + } + #[test] fn memory_leak() { droppable!(); @@ -419,4 +186,49 @@ mod tests { // Queue is full, this should not block forever. q.enqueue(0x55).unwrap_err(); } + + #[test] + fn issue_583_enqueue() { + const N: usize = 4; + + let q0 = Queue::::new(); + for i in 0..N { + q0.enqueue(i as u8).expect("new enqueue"); + } + eprintln!("start!"); + + std::thread::scope(|sc| { + for _ in 0..2 { + sc.spawn(|| { + for k in 0..1000_000 { + if let Some(v) = q0.dequeue() { + q0.enqueue(v).unwrap_or_else(|v| { + panic!("{k}: q0 -> q0: {v}, {:?}", to_vec(&q0)) + }); + } + } + }); + } + }); + } + + #[test] + fn issue_583_dequeue() { + const N: usize = 4; + + let q0 = Queue::::new(); + eprintln!("start!"); + std::thread::scope(|sc| { + for _ in 0..2 { + sc.spawn(|| { + for k in 0..1000_000 { + q0.enqueue(k as u8).unwrap(); + if q0.dequeue().is_none() { + panic!("{k}"); + } + } + }); + } + }); + } } diff --git a/src/mpmc/crossbeam_array_queue.rs b/src/mpmc/crossbeam_array_queue.rs new file mode 100644 index 0000000000..74e07e250a --- /dev/null +++ b/src/mpmc/crossbeam_array_queue.rs @@ -0,0 +1,600 @@ +//! The implementation is based on Dmitry Vyukov's bounded MPMC queue. +//! +//! Source: +//! - +//! +//! From the [crossbeam-queue](https://github.com/crossbeam-rs/crossbeam/blob/master/crossbeam-queue/src/array_queue.rs) implementation. + +use super::{atomic, atomic::Ordering}; +use core::cell::UnsafeCell; +use core::fmt; +use core::mem::MaybeUninit; +use core::panic::{RefUnwindSafe, UnwindSafe}; + +use crate::storage::{OwnedStorage, Storage}; + +use crossbeam_utils::{Backoff, CachePadded}; + +use super::{AtomicTargetSize, QueueView, UintSize}; + +/// A slot in a queue. +struct Slot { + /// The current stamp. + /// + /// If the stamp equals the tail, this node will be next written to. If it equals head + 1, + /// this node will be next read from. + stamp: AtomicTargetSize, + + /// The value in this slot. + value: UnsafeCell>, +} + +impl Slot { + /// Creates a new uninitialized [Slot]. + pub const fn new() -> Self { + Self { + stamp: AtomicTargetSize::new(0), + value: UnsafeCell::new(MaybeUninit::uninit()), + } + } + + /// Creates a new uninitialized [Slot] with the provided stamp. + pub const fn create_uninit(stamp: UintSize) -> Self { + Self { + stamp: AtomicTargetSize::new(stamp), + value: UnsafeCell::new(MaybeUninit::uninit()), + } + } +} + +impl Default for Slot { + fn default() -> Self { + Self::new() + } +} + +/// A bounded multi-producer multi-consumer queue. +/// +/// This queue allocates a fixed-capacity buffer on construction, which is used to store pushed +/// elements. The queue cannot hold more elements than the buffer allows. Attempting to push an +/// element into a full queue will fail. Alternatively, [`force_push`] makes it possible for +/// this queue to be used as a ring-buffer. Having a buffer allocated upfront makes this queue +/// a bit faster than [`SegQueue`]. +/// +/// [`force_push`]: Queue::force_push +/// [`SegQueue`]: super::SegQueue +/// +/// # Examples +/// +/// ``` +/// use heapless::mpmc::Queue; +/// const N: usize = 2; +/// +/// let q = Queue::new::(); +/// +/// assert_eq!(q.enqueue('a'), Ok(())); +/// assert_eq!(q.enqueue('b'), Ok(())); +/// assert_eq!(q.enqueue('c'), Err('c')); +/// assert_eq!(q.dequeue(), Some('a')); +/// ``` +pub type Queue = QueueInner>; + +/// Base struct for [`Queue`] and [`QueueView`], generic over the [`Storage`]. +/// +/// In most cases you should use [`Queue`] or [`QueueView`] directly. Only use this +/// struct if you want to write code that's generic over both. +pub struct QueueInner { + /// The head of the queue. + /// + /// This value is a "stamp" consisting of an index into the buffer and a lap, but packed into a + /// single `usize`. The lower bits represent the index, while the upper bits represent the lap. + /// + /// Elements are popped from the head of the queue. + head: CachePadded, + + /// The tail of the queue. + /// + /// This value is a "stamp" consisting of an index into the buffer and a lap, but packed into a + /// single `usize`. The lower bits represent the index, while the upper bits represent the lap. + /// + /// Elements are pushed into the tail of the queue. + tail: CachePadded, + + /// A stamp with the value of `{ lap: 1, index: 0 }`. + one_lap: UintSize, + + /// The buffer holding slots. + buffer: UnsafeCell>>, +} + +impl QueueInner { + /// Attempts to push an element into the queue. + /// + /// If the queue is full, the element is returned back as an error. + /// + /// # Examples + /// + /// ``` + /// use heapless::mpmc::Queue; + /// const N: usize = 1; + /// + /// let q = Queue::new::(); + /// + /// assert_eq!(q.enqueue(10), Ok(())); + /// assert_eq!(q.enqueue(20), Err(20)); + /// ``` + pub fn enqueue(&self, value: T) -> Result<(), T> { + self.push_or_else(value, |v, tail, _, _| { + let head = self.head.load(Ordering::Relaxed); + + // If the head lags one lap behind the tail as well... + if head.wrapping_add(self.one_lap) == tail as UintSize { + // ...then the queue is full. + Err(v) + } else { + Ok(v) + } + }) + } + + /// Returns the number of elements in the queue. + /// + /// # Examples + /// + /// ``` + /// use heapless::mpmc::Queue; + /// const N: usize = 100; + /// + /// let q = Queue::new::(); + /// assert_eq!(q.len(), 0); + /// + /// q.enqueue(10).unwrap(); + /// assert_eq!(q.len(), 1); + /// + /// q.enqueue(20).unwrap(); + /// assert_eq!(q.len(), 2); + /// ``` + pub fn len(&self) -> usize { + loop { + // Load the tail, then load the head. + let tail = self.tail.load(Ordering::SeqCst); + let head = self.head.load(Ordering::SeqCst); + + // If the tail didn't change, we've got consistent values to work with + + if self.tail.load(Ordering::SeqCst) == tail { + let hix = head & (self.one_lap - 1); + let tix = tail & (self.one_lap - 1); + + return if hix < tix { + usize::from(tix - hix) + } else if hix > tix { + self.capacity() - usize::from(hix + tix) + } else if tail == head { + 0 + } else { + self.capacity() + }; + } + } + } + + fn as_ptr(&self) -> *const Slot { + S::as_ptr(self.buffer.get() as *mut S::Buffer>) as *const _ + } + + fn push_or_else(&self, mut value: T, f: F) -> Result<(), T> + where + F: Fn(T, UintSize, UintSize, &Slot) -> Result, + { + let backoff = Backoff::new(); + let mut tail = self.tail.load(Ordering::Relaxed); + + loop { + // Deconstruct the tail. + let lap_mask = self.one_lap.wrapping_sub(1); + let index = usize::from(tail & lap_mask); + let lap = tail & !lap_mask; + + let new_tail = if index + 1 < self.capacity() { + // Same lap, incremented index. + // Set to `{ lap: lap, index: index + 1 }`. + tail + 1 + } else { + // One lap forward, index wraps around to zero. + // Set to `{ lap: lap.wrapping_add(1), index: 0 }`. + lap.wrapping_add(self.one_lap) + }; + + // Inspect the corresponding slot. + debug_assert!(index < self.capacity()); + // SAFETY: index is a valid offset, and buffer is valid contiguous memory. + let slot = unsafe { &*self.as_ptr().add(index) }; + let stamp = slot.stamp.load(Ordering::Acquire); + + // If the tail and the stamp match, we may attempt to push. + if tail == stamp { + // Try moving the tail. + match self.tail.compare_exchange_weak( + tail, + new_tail, + Ordering::SeqCst, + Ordering::Relaxed, + ) { + Ok(_) => { + // Write the value into the slot and update the stamp. + unsafe { + slot.value.get().write(MaybeUninit::new(value)); + } + slot.stamp.store(tail + 1, Ordering::Release); + return Ok(()); + } + Err(t) => { + tail = t; + backoff.spin(); + } + } + } else if stamp.wrapping_add(self.one_lap) == tail + 1 { + atomic::fence(Ordering::SeqCst); + value = f(value, tail, new_tail, slot)?; + backoff.spin(); + tail = self.tail.load(Ordering::Relaxed); + } else { + // Snooze because we need to wait for the stamp to get updated. + backoff.snooze(); + tail = self.tail.load(Ordering::Relaxed); + } + } + } + + /// Pushes an element into the queue, replacing the oldest element if necessary. + /// + /// If the queue is full, the oldest element is replaced and returned, + /// otherwise `None` is returned. + /// + /// # Examples + /// + /// ``` + /// use heapless::mpmc::Queue; + /// const N: usize = 2; + /// + /// let q = Queue::new::(); + /// + /// assert_eq!(q.force_enqueue(10), None); + /// assert_eq!(q.force_enqueue(20), None); + /// assert_eq!(q.force_enqueue(30), Some(10)); + /// assert_eq!(q.dequeue(), Some(20)); + /// ``` + pub fn force_enqueue(&self, value: T) -> Option { + self.push_or_else(value, |v, tail, new_tail, slot| { + let head = (tail as UintSize).wrapping_sub(self.one_lap); + let new_head = (new_tail as UintSize).wrapping_sub(self.one_lap); + + // Try moving the head. + if self + .head + .compare_exchange_weak(head, new_head, Ordering::SeqCst, Ordering::Relaxed) + .is_ok() + { + // Move the tail. + self.tail.store(new_tail, Ordering::SeqCst); + + // Swap the previous value. + let old = unsafe { slot.value.get().replace(MaybeUninit::new(v)).assume_init() }; + + // Update the stamp. + slot.stamp.store(tail + 1, Ordering::Release); + + Err(old) + } else { + Ok(v) + } + }) + .err() + } + + /// Attempts to pop an element from the queue. + /// + /// If the queue is empty, `None` is returned. + /// + /// # Examples + /// + /// ``` + /// use heapless::mpmc::Queue; + /// const N: usize = 1; + /// + /// let q = Queue::new::(); + /// assert_eq!(q.enqueue(10), Ok(())); + /// + /// assert_eq!(q.dequeue(), Some(10)); + /// assert!(q.dequeue().is_none()); + /// ``` + pub fn dequeue(&self) -> Option { + let backoff = Backoff::new(); + let mut head = self.head.load(Ordering::Relaxed); + + loop { + // Deconstruct the head. + let lap_mask = self.one_lap.wrapping_sub(1); + let index = usize::from(head & lap_mask); + let lap = head & !lap_mask; + + // Inspect the corresponding slot. + debug_assert!(index < self.capacity()); + // SAFETY: index is a valid offset, and buffer is valid contiguous memory. + let slot = unsafe { &*self.as_ptr().add(index) }; + let stamp = slot.stamp.load(Ordering::Acquire); + + // If the stamp is ahead of the head by 1, we may attempt to pop. + if head + 1 == stamp { + let new = if index + 1 < self.capacity() { + // Same lap, incremented index. + // Set to `{ lap: lap, index: index + 1 }`. + head + 1 + } else { + // One lap forward, index wraps around to zero. + // Set to `{ lap: lap.wrapping_add(1), index: 0 }`. + lap.wrapping_add(self.one_lap) + }; + + // Try moving the head. + match self.head.compare_exchange_weak( + head, + new, + Ordering::SeqCst, + Ordering::Relaxed, + ) { + Ok(_) => { + // Read the value from the slot and update the stamp. + let msg = unsafe { slot.value.get().read().assume_init() }; + slot.stamp + .store(head.wrapping_add(self.one_lap), Ordering::Release); + return Some(msg); + } + Err(h) => { + head = h; + backoff.spin(); + } + } + } else if stamp == head { + atomic::fence(Ordering::SeqCst); + let tail = self.tail.load(Ordering::Relaxed); + + // If the tail equals the head, that means the channel is empty. + if tail == head { + return None; + } + + backoff.spin(); + head = self.head.load(Ordering::Relaxed); + } else { + // Snooze because we need to wait for the stamp to get updated. + backoff.snooze(); + head = self.head.load(Ordering::Relaxed); + } + } + } + + /// Returns the maximum number of elements the queue can hold. + #[inline] + pub fn capacity(&self) -> usize { + S::len(self.buffer.get()) + } + + /// Get a reference to the `Queue`, erasing the `N` const-generic. + /// + /// + /// ```rust + /// # use heapless::mpmc::{Queue, QueueView}; + /// let queue: Queue = Queue::new(); + /// let view: &QueueView = queue.as_view(); + /// ``` + /// + /// It is often preferable to do the same through type coerction, since `Queue` implements `Unsize>`: + /// + /// ```rust + /// # use heapless::mpmc::{Queue, QueueView}; + /// let queue: Queue = Queue::new(); + /// let view: &QueueView = &queue; + /// ``` + #[inline] + pub fn as_view(&self) -> &QueueView { + S::as_mpmc_view(self) + } + + /// Get a mutable reference to the `Queue`, erasing the `N` const-generic. + /// + /// ```rust + /// # use heapless::mpmc::{Queue, QueueView}; + /// let mut queue: Queue = Queue::new(); + /// let view: &mut QueueView = queue.as_mut_view(); + /// ``` + /// + /// It is often preferable to do the same through type coerction, since `Queue` implements `Unsize>`: + /// + /// ```rust + /// # use heapless::mpmc::{Queue, QueueView}; + /// let mut queue: Queue = Queue::new(); + /// let view: &mut QueueView = &mut queue; + /// ``` + #[inline] + pub fn as_mut_view(&mut self) -> &mut QueueView { + S::as_mpmc_mut_view(self) + } +} + +impl Drop for QueueInner { + fn drop(&mut self) { + while self.dequeue().is_some() {} + } +} + +unsafe impl Sync for Queue {} +unsafe impl Send for Queue {} + +impl UnwindSafe for Queue {} +impl RefUnwindSafe for Queue {} + +impl Queue { + const _MIN_SIZE: () = assert!(N > 1, "capacity must be at least two"); + const _IS_POW2: () = assert!(N.is_power_of_two(), "capacity must be power of two"); + const _CAP_MAX: () = assert!(N < UintSize::MAX as usize, "capacity maximum exceeded"); + + /// Creates a new bounded queue with the given capacity. + /// + /// # Panics + /// + /// Panics if the capacity is zero. + /// + /// # Examples + /// + /// ``` + /// use heapless::mpmc::Queue; + /// const N: usize = 100; + /// + /// let q = Queue::::new(); + /// ``` + pub const fn new() -> Self { + // Head is initialized to `{ lap: 0, index: 0 }`. + // Tail is initialized to `{ lap: 0, index: 0 }`. + let head = 0; + let tail = 0; + + // Allocate a buffer of `cap` slots initialized + // with stamps. + let mut slot_count = 0usize; + let mut buffer: [Slot; N] = [const { Slot::::new() }; N]; + while slot_count < N { + // Set the stamp to `{ lap: 0, index: i }`. + buffer[slot_count] = Slot::create_uninit(slot_count as UintSize); + slot_count += 1; + } + + // One lap is the smallest power of two greater than `cap`. + let one_lap = (N + 1).next_power_of_two() as UintSize; + + Self { + buffer: UnsafeCell::new(buffer), + one_lap, + head: CachePadded::new(AtomicTargetSize::new(head)), + tail: CachePadded::new(AtomicTargetSize::new(tail)), + } + } + + /// Returns `true` if the queue is empty. + /// + /// # Examples + /// + /// ``` + /// use heapless::mpmc::Queue; + /// const N: usize = 100; + /// + /// let q = Queue::new::(); + /// + /// assert!(q.is_empty()); + /// q.push(1).unwrap(); + /// assert!(!q.is_empty()); + /// ``` + pub fn is_empty(&self) -> bool { + let head = self.head.load(Ordering::SeqCst); + let tail = self.tail.load(Ordering::SeqCst); + + // Is the tail lagging one lap behind head? + // Is the tail equal to the head? + // + // Note: If the head changes just before we load the tail, that means there was a moment + // when the channel was not empty, so it is safe to just return `false`. + tail == head + } + + /// Returns `true` if the queue is full. + /// + /// # Examples + /// + /// ``` + /// use heapless::mpmc::Queue; + /// const N: usize = 1; + /// + /// let q = Queue::new::(); + /// + /// assert!(!q.is_full()); + /// q.enqueue(1).unwrap(); + /// assert!(q.is_full()); + /// ``` + pub fn is_full(&self) -> bool { + let tail = self.tail.load(Ordering::SeqCst); + let head = self.head.load(Ordering::SeqCst); + + // Is the head lagging one lap behind tail? + // + // Note: If the tail changes just before we load the head, that means there was a moment + // when the queue was not full, so it is safe to just return `false`. + head.wrapping_add(self.one_lap) == tail + } + + /// Used in `Storage` implementation. + pub(crate) fn as_view_private(&self) -> &QueueView { + self + } + /// Used in `Storage` implementation. + pub(crate) fn as_view_mut_private(&mut self) -> &mut QueueView { + self + } +} + +impl fmt::Debug for Queue { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.pad("Queue { .. }") + } +} + +impl IntoIterator for Queue { + type Item = T; + + type IntoIter = IntoIter; + + fn into_iter(self) -> Self::IntoIter { + IntoIter { value: self } + } +} + +/// Represents the iterator container for implementing the [`Iterator`] trait for [Queue]. +#[derive(Debug)] +pub struct IntoIter { + value: Queue, +} + +impl Iterator for IntoIter { + type Item = T; + + fn next(&mut self) -> Option { + let value = &mut self.value; + let head = *value.head.get_mut(); + if value.head.get_mut() != value.tail.get_mut() { + let index = usize::from(head & (value.one_lap - 1)); + let lap = head & !(value.one_lap - 1); + // SAFETY: We have mutable access to this, so we can read without + // worrying about concurrency. Furthermore, we know this is + // initialized because it is the value pointed at by `value.head` + // and this is a non-empty queue. + let val = unsafe { + debug_assert!(index < N); + let slot = (&mut *value.buffer.get_mut()).get_unchecked_mut(index); + slot.value.get().read().assume_init() + }; + let new = if index + 1 < value.capacity() { + // Same lap, incremented index. + // Set to `{ lap: lap, index: index + 1 }`. + head + 1 + } else { + // One lap forward, index wraps around to zero. + // Set to `{ lap: lap.wrapping_add(1), index: 0 }`. + lap.wrapping_add(value.one_lap) + }; + *value.head.get_mut() = new; + Some(val) + } else { + None + } + } +} diff --git a/src/mpmc/original.rs b/src/mpmc/original.rs new file mode 100644 index 0000000000..a5b57f2821 --- /dev/null +++ b/src/mpmc/original.rs @@ -0,0 +1,293 @@ +use core::{cell::UnsafeCell, mem::MaybeUninit}; + +use atomic::Ordering; + +use crate::storage::{OwnedStorage, Storage}; + +use super::{atomic, AtomicTargetSize, QueueView, UintSize}; + +#[cfg(feature = "mpmc_large")] +type IntSize = isize; +#[cfg(not(feature = "mpmc_large"))] +type IntSize = i8; + +const CONTENTION_RETRY_COUNT: usize = 10000; + +struct Cell { + data: MaybeUninit, + sequence: AtomicTargetSize, +} + +impl Cell { + const fn new(seq: usize) -> Self { + Self { + data: MaybeUninit::uninit(), + sequence: AtomicTargetSize::new(seq as UintSize), + } + } +} + +/// Base struct for [`Queue`] and [`QueueView`], generic over the [`Storage`]. +/// +/// In most cases you should use [`Queue`] or [`QueueView`] directly. Only use this +/// struct if you want to write code that's generic over both. +pub struct QueueInner { + dequeue_pos: AtomicTargetSize, + enqueue_pos: AtomicTargetSize, + buffer: UnsafeCell>>, +} + +/// A statically allocated multi-producer, multi-consumer queue with a capacity of `N` elements. +/// +///
+/// +/// `N` must be a power of 2. +/// +///
+/// +/// The maximum value of `N` is 128 if the `mpmc_large` feature is not enabled. +pub type Queue = QueueInner>; + +impl Queue { + /// Creates an empty queue. + pub const fn new() -> Self { + const { + assert!(N > 1); + assert!(N.is_power_of_two()); + assert!(N < UintSize::MAX as usize); + } + + let mut cell_count = 0; + + let mut result_cells: [Cell; N] = [const { Cell::new(0) }; N]; + while cell_count != N { + result_cells[cell_count] = Cell::new(cell_count); + cell_count += 1; + } + + Self { + buffer: UnsafeCell::new(result_cells), + dequeue_pos: AtomicTargetSize::new(0), + enqueue_pos: AtomicTargetSize::new(0), + } + } + + /// Used in `Storage` implementation. + pub(crate) fn as_view_private(&self) -> &QueueView { + self + } + /// Used in `Storage` implementation. + pub(crate) fn as_view_mut_private(&mut self) -> &mut QueueView { + self + } +} + +impl QueueInner { + /// Returns the maximum number of elements the queue can hold. + #[inline] + pub fn capacity(&self) -> usize { + S::len(self.buffer.get()) + } + + /// Get a reference to the `Queue`, erasing the `N` const-generic. + /// + /// + /// ```rust + /// # use heapless::mpmc::{Queue, QueueView}; + /// let queue: Queue = Queue::new(); + /// let view: &QueueView = queue.as_view(); + /// ``` + /// + /// It is often preferable to do the same through type coerction, since `Queue` implements `Unsize>`: + /// + /// ```rust + /// # use heapless::mpmc::{Queue, QueueView}; + /// let queue: Queue = Queue::new(); + /// let view: &QueueView = &queue; + /// ``` + #[inline] + pub fn as_view(&self) -> &QueueView { + S::as_mpmc_view(self) + } + + /// Get a mutable reference to the `Queue`, erasing the `N` const-generic. + /// + /// ```rust + /// # use heapless::mpmc::{Queue, QueueView}; + /// let mut queue: Queue = Queue::new(); + /// let view: &mut QueueView = queue.as_mut_view(); + /// ``` + /// + /// It is often preferable to do the same through type coerction, since `Queue` implements `Unsize>`: + /// + /// ```rust + /// # use heapless::mpmc::{Queue, QueueView}; + /// let mut queue: Queue = Queue::new(); + /// let view: &mut QueueView = &mut queue; + /// ``` + #[inline] + pub fn as_mut_view(&mut self) -> &mut QueueView { + S::as_mpmc_mut_view(self) + } + + fn mask(&self) -> UintSize { + (S::len(self.buffer.get()) - 1) as _ + } + + /// Returns the item in the front of the queue, or `None` if the queue is empty. + pub fn dequeue(&self) -> Option { + unsafe { + dequeue( + S::as_ptr(self.buffer.get()), + &self.dequeue_pos, + &self.enqueue_pos, + self.mask(), + ) + } + } + + /// Adds an `item` to the end of the queue. + /// + /// Returns back the `item` if the queue is full. + pub fn enqueue(&self, item: T) -> Result<(), T> { + unsafe { + enqueue( + S::as_ptr(self.buffer.get()), + &self.dequeue_pos, + &self.enqueue_pos, + self.mask(), + item, + ) + } + } +} + +impl Default for Queue { + fn default() -> Self { + Self::new() + } +} + +impl Drop for QueueInner { + fn drop(&mut self) { + // Drop all elements currently in the queue. + while self.dequeue().is_some() {} + } +} + +unsafe impl Sync for QueueInner where T: Send {} + +unsafe fn dequeue( + buffer: *mut Cell, + dequeue_pos: &AtomicTargetSize, + enqueue_pos: &AtomicTargetSize, + mask: UintSize, +) -> Option { + let mut pos = dequeue_pos.load(Ordering::Relaxed); + + let mut cell; + let mut seq; + let mut dif; + let mut contention_retry_count = 0; + + loop { + cell = buffer.add(usize::from(pos & mask)); + seq = (*cell).sequence.load(Ordering::Acquire); + dif = (seq as IntSize).wrapping_sub((pos.wrapping_add(1)) as IntSize); + + match dif.cmp(&0) { + core::cmp::Ordering::Equal => { + if dequeue_pos + .compare_exchange_weak( + pos, + pos.wrapping_add(1), + Ordering::Relaxed, + Ordering::Relaxed, + ) + .is_ok() + { + break; + } + } + core::cmp::Ordering::Less => { + if pos != enqueue_pos.load(Ordering::Relaxed) + && contention_retry_count < CONTENTION_RETRY_COUNT + { + // In this case according to the positions the queue is not empty + // This suggests that there is a enqueue operations that is in progress in some other task + // Therefore we can wait a bit hoping that the other task can finish its `enqueue` operation complete + core::hint::spin_loop(); + contention_retry_count += 1; + continue; + } + return None; + } + core::cmp::Ordering::Greater => { + pos = dequeue_pos.load(Ordering::Relaxed); + } + } + } + + let data = (*cell).data.as_ptr().read(); + (*cell) + .sequence + .store(pos.wrapping_add(mask).wrapping_add(1), Ordering::Release); + Some(data) +} + +unsafe fn enqueue( + buffer: *mut Cell, + dequeue_pos: &AtomicTargetSize, + enqueue_pos: &AtomicTargetSize, + mask: UintSize, + item: T, +) -> Result<(), T> { + let mut pos = enqueue_pos.load(Ordering::Relaxed); + + let mut cell; + let mut contention_retry_count = 0; + loop { + cell = buffer.add(usize::from(pos & mask)); + let seq = (*cell).sequence.load(Ordering::Acquire); + let dif = (seq as IntSize).wrapping_sub(pos as IntSize); + + match dif.cmp(&0) { + core::cmp::Ordering::Equal => { + if enqueue_pos + .compare_exchange_weak( + pos, + pos.wrapping_add(1), + Ordering::Relaxed, + Ordering::Relaxed, + ) + .is_ok() + { + break; + } + pos = enqueue_pos.load(Ordering::Relaxed); + } + core::cmp::Ordering::Less => { + if dequeue_pos.load(Ordering::Relaxed).wrapping_add(mask + 1) != pos + && contention_retry_count < CONTENTION_RETRY_COUNT + { + // In this case according to the positions the queue is not full + // This suggests that there is a dequeue operation that is in progress in some other task + // Therefore we can wait a bit hoping that the other task can finish its `dequeue` operation completes + core::hint::spin_loop(); + contention_retry_count += 1; + continue; + } + return Err(item); + } + core::cmp::Ordering::Greater => { + pos = enqueue_pos.load(Ordering::Relaxed); + } + } + } + + (*cell).data.as_mut_ptr().write(item); + (*cell) + .sequence + .store(pos.wrapping_add(1), Ordering::Release); + Ok(()) +}