Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,9 @@ jobs:
- name: Run miri
run: MIRIFLAGS=-Zmiri-ignore-leaks cargo miri test --features="alloc,defmt,mpmc_large,portable-atomic-critical-section,serde,ufmt,bytes,zeroize"

- name: Run miri (mpmc crossbeam)
run: MIRIFLAGS=-Zmiri-ignore-leaks cargo miri test --features="alloc,defmt,mpmc_large,mpmc_crossbeam,portable-atomic-critical-section,serde,ufmt,bytes,zeroize"

# Run cargo test
test:
name: test
Expand Down Expand Up @@ -86,6 +89,9 @@ jobs:
- name: Run cargo test
run: cargo test --features="alloc,defmt,mpmc_large,portable-atomic-critical-section,serde,ufmt,bytes,zeroize"

- name: Run cargo test (mpmc crossbeam)
run: cargo test --features="alloc,defmt,mpmc_large,mpmc_crossbeam,portable-atomic-critical-section,serde,ufmt,bytes,zeroize"

# Run cargo fmt --check
style:
name: style
Expand Down
4 changes: 4 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,9 @@ alloc = []

nightly = []

# Implement MPMC using `crossbeam::ArrayQueue`
mpmc_crossbeam = ["dep:crossbeam-utils"]

[dependencies]
bytes = { version = "1", default-features = false, optional = true }
portable-atomic = { version = "1.0", optional = true }
Expand All @@ -67,6 +70,7 @@ ufmt = { version = "0.2", optional = true }
ufmt-write = { version = "0.1", optional = true }
defmt = { version = "1.0.1", optional = true }
zeroize = { version = "1.8", optional = true, default-features = false, features = ["derive"] }
crossbeam-utils = { version = "0.8", optional = true }

# for the pool module
[target.'cfg(any(target_arch = "arm", target_pointer_width = "32", target_pointer_width = "64"))'.dependencies]
Expand Down
316 changes: 64 additions & 252 deletions src/mpmc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,17 +68,13 @@
//!
//! [bounded MPMC queue]: http://www.1024cores.net/home/lock-free-algorithms/queues/bounded-mpmc-queue

use core::{cell::UnsafeCell, mem::MaybeUninit};
use crate::storage::ViewStorage;

#[cfg(not(feature = "portable-atomic"))]
use core::sync::atomic;
#[cfg(feature = "portable-atomic")]
use portable_atomic as atomic;

use atomic::Ordering;

use crate::storage::{OwnedStorage, Storage, ViewStorage};

#[cfg(feature = "mpmc_large")]
type AtomicTargetSize = atomic::AtomicUsize;
#[cfg(not(feature = "mpmc_large"))]
Expand All @@ -89,268 +85,39 @@ type UintSize = usize;
#[cfg(not(feature = "mpmc_large"))]
type UintSize = u8;

#[cfg(feature = "mpmc_large")]
type IntSize = isize;
#[cfg(not(feature = "mpmc_large"))]
type IntSize = i8;
#[cfg(feature = "mpmc_crossbeam")]
pub mod crossbeam_array_queue;
#[cfg(feature = "mpmc_crossbeam")]
pub use crossbeam_array_queue::*;

/// Base struct for [`Queue`] and [`QueueView`], generic over the [`Storage`].
///
/// In most cases you should use [`Queue`] or [`QueueView`] directly. Only use this
/// struct if you want to write code that's generic over both.
pub struct QueueInner<T, S: Storage> {
dequeue_pos: AtomicTargetSize,
enqueue_pos: AtomicTargetSize,
buffer: UnsafeCell<S::Buffer<Cell<T>>>,
}

/// A statically allocated multi-producer, multi-consumer queue with a capacity of `N` elements.
///
/// <div class="warning">
///
/// `N` must be a power of 2.
///
/// </div>
///
/// The maximum value of `N` is 128 if the `mpmc_large` feature is not enabled.
pub type Queue<T, const N: usize> = QueueInner<T, OwnedStorage<N>>;
#[cfg(not(feature = "mpmc_crossbeam"))]
mod original;
#[cfg(not(feature = "mpmc_crossbeam"))]
pub use original::*;

/// A [`Queue`] with dynamic capacity.
///
/// [`Queue`] coerces to `QueueView`. `QueueView` is `!Sized`, meaning it can only ever be used by reference.
pub type QueueView<T> = QueueInner<T, ViewStorage>;

impl<T, const N: usize> Queue<T, N> {
/// Creates an empty queue.
pub const fn new() -> Self {
const {
assert!(N > 1);
assert!(N.is_power_of_two());
assert!(N < UintSize::MAX as usize);
}

let mut cell_count = 0;

let mut result_cells: [Cell<T>; N] = [const { Cell::new(0) }; N];
while cell_count != N {
result_cells[cell_count] = Cell::new(cell_count);
cell_count += 1;
}

Self {
buffer: UnsafeCell::new(result_cells),
dequeue_pos: AtomicTargetSize::new(0),
enqueue_pos: AtomicTargetSize::new(0),
}
}

/// Used in `Storage` implementation.
pub(crate) fn as_view_private(&self) -> &QueueView<T> {
self
}
/// Used in `Storage` implementation.
pub(crate) fn as_view_mut_private(&mut self) -> &mut QueueView<T> {
self
}
}

impl<T, S: Storage> QueueInner<T, S> {
/// Returns the maximum number of elements the queue can hold.
#[inline]
pub fn capacity(&self) -> usize {
S::len(self.buffer.get())
}

/// Get a reference to the `Queue`, erasing the `N` const-generic.
///
///
/// ```rust
/// # use heapless::mpmc::{Queue, QueueView};
/// let queue: Queue<u8, 2> = Queue::new();
/// let view: &QueueView<u8> = queue.as_view();
/// ```
///
/// It is often preferable to do the same through type coerction, since `Queue<T, N>` implements `Unsize<QueueView<T>>`:
///
/// ```rust
/// # use heapless::mpmc::{Queue, QueueView};
/// let queue: Queue<u8, 2> = Queue::new();
/// let view: &QueueView<u8> = &queue;
/// ```
#[inline]
pub fn as_view(&self) -> &QueueView<T> {
S::as_mpmc_view(self)
}

/// Get a mutable reference to the `Queue`, erasing the `N` const-generic.
///
/// ```rust
/// # use heapless::mpmc::{Queue, QueueView};
/// let mut queue: Queue<u8, 2> = Queue::new();
/// let view: &mut QueueView<u8> = queue.as_mut_view();
/// ```
///
/// It is often preferable to do the same through type coerction, since `Queue<T, N>` implements `Unsize<QueueView<T>>`:
///
/// ```rust
/// # use heapless::mpmc::{Queue, QueueView};
/// let mut queue: Queue<u8, 2> = Queue::new();
/// let view: &mut QueueView<u8> = &mut queue;
/// ```
#[inline]
pub fn as_mut_view(&mut self) -> &mut QueueView<T> {
S::as_mpmc_mut_view(self)
}

fn mask(&self) -> UintSize {
(S::len(self.buffer.get()) - 1) as _
}

/// Returns the item in the front of the queue, or `None` if the queue is empty.
pub fn dequeue(&self) -> Option<T> {
unsafe { dequeue(S::as_ptr(self.buffer.get()), &self.dequeue_pos, self.mask()) }
}

/// Adds an `item` to the end of the queue.
///
/// Returns back the `item` if the queue is full.
pub fn enqueue(&self, item: T) -> Result<(), T> {
unsafe {
enqueue(
S::as_ptr(self.buffer.get()),
&self.enqueue_pos,
self.mask(),
item,
)
}
}
}

impl<T, const N: usize> Default for Queue<T, N> {
fn default() -> Self {
Self::new()
}
}

impl<T, S: Storage> Drop for QueueInner<T, S> {
fn drop(&mut self) {
// Drop all elements currently in the queue.
while self.dequeue().is_some() {}
}
}

unsafe impl<T, S: Storage> Sync for QueueInner<T, S> where T: Send {}

struct Cell<T> {
data: MaybeUninit<T>,
sequence: AtomicTargetSize,
}

impl<T> Cell<T> {
const fn new(seq: usize) -> Self {
Self {
data: MaybeUninit::uninit(),
sequence: AtomicTargetSize::new(seq as UintSize),
}
}
}

unsafe fn dequeue<T>(
buffer: *mut Cell<T>,
dequeue_pos: &AtomicTargetSize,
mask: UintSize,
) -> Option<T> {
let mut pos = dequeue_pos.load(Ordering::Relaxed);

let mut cell;
loop {
cell = buffer.add(usize::from(pos & mask));
let seq = (*cell).sequence.load(Ordering::Acquire);
let dif = (seq as IntSize).wrapping_sub((pos.wrapping_add(1)) as IntSize);

match dif.cmp(&0) {
core::cmp::Ordering::Equal => {
if dequeue_pos
.compare_exchange_weak(
pos,
pos.wrapping_add(1),
Ordering::Relaxed,
Ordering::Relaxed,
)
.is_ok()
{
break;
}
}
core::cmp::Ordering::Less => {
return None;
}
core::cmp::Ordering::Greater => {
pos = dequeue_pos.load(Ordering::Relaxed);
}
}
}

let data = (*cell).data.as_ptr().read();
(*cell)
.sequence
.store(pos.wrapping_add(mask).wrapping_add(1), Ordering::Release);
Some(data)
}

unsafe fn enqueue<T>(
buffer: *mut Cell<T>,
enqueue_pos: &AtomicTargetSize,
mask: UintSize,
item: T,
) -> Result<(), T> {
let mut pos = enqueue_pos.load(Ordering::Relaxed);

let mut cell;
loop {
cell = buffer.add(usize::from(pos & mask));
let seq = (*cell).sequence.load(Ordering::Acquire);
let dif = (seq as IntSize).wrapping_sub(pos as IntSize);

match dif.cmp(&0) {
core::cmp::Ordering::Equal => {
if enqueue_pos
.compare_exchange_weak(
pos,
pos.wrapping_add(1),
Ordering::Relaxed,
Ordering::Relaxed,
)
.is_ok()
{
break;
}
}
core::cmp::Ordering::Less => {
return Err(item);
}
core::cmp::Ordering::Greater => {
pos = enqueue_pos.load(Ordering::Relaxed);
}
}
}

(*cell).data.as_mut_ptr().write(item);
(*cell)
.sequence
.store(pos.wrapping_add(1), Ordering::Release);
Ok(())
}

#[cfg(test)]
mod tests {
use static_assertions::assert_not_impl_any;

use super::Queue;
use super::{Queue, QueueView};

// Ensure a `Queue` containing `!Send` values stays `!Send` itself.
assert_not_impl_any!(Queue<*const (), 4>: Send);

fn to_vec<T>(q: &QueueView<T>) -> Vec<T> {
// inaccurate
let mut ret = vec![];
while let Some(v) = q.dequeue() {
ret.push(v);
}
ret
}

#[test]
fn memory_leak() {
droppable!();
Expand Down Expand Up @@ -419,4 +186,49 @@ mod tests {
// Queue is full, this should not block forever.
q.enqueue(0x55).unwrap_err();
}

#[test]
fn issue_583_enqueue() {
const N: usize = 4;

let q0 = Queue::<u8, N>::new();
for i in 0..N {
q0.enqueue(i as u8).expect("new enqueue");
}
eprintln!("start!");

std::thread::scope(|sc| {
for _ in 0..2 {
sc.spawn(|| {
for k in 0..1000_000 {
if let Some(v) = q0.dequeue() {
q0.enqueue(v).unwrap_or_else(|v| {
panic!("{k}: q0 -> q0: {v}, {:?}", to_vec(&q0))
});
}
}
});
}
});
}

#[test]
fn issue_583_dequeue() {
const N: usize = 4;

let q0 = Queue::<u8, N>::new();
eprintln!("start!");
std::thread::scope(|sc| {
for _ in 0..2 {
sc.spawn(|| {
for k in 0..1000_000 {
q0.enqueue(k as u8).unwrap();
if q0.dequeue().is_none() {
panic!("{k}");
}
}
});
}
});
}
}
Loading
Loading