diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index dbb731068f..e76ca02e51 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -86,6 +86,42 @@ jobs: - name: Run cargo test run: cargo test --features="alloc,defmt,mpmc_large,portable-atomic-critical-section,serde,ufmt,bytes,zeroize" + # Run cargo test (loom) + testloom: + name: test + runs-on: ubuntu-latest + steps: + - name: Checkout + uses: actions/checkout@v4 + + - name: Cache cargo dependencies + uses: actions/cache@v3 + with: + path: | + - ~/.cargo/bin/ + - ~/.cargo/registry/index/ + - ~/.cargo/registry/cache/ + - ~/.cargo/git/db/ + key: ${{ runner.OS }}-cargo-${{ hashFiles('**/Cargo.lock') }} + restore-keys: | + ${{ runner.OS }}-cargo- + + - name: Cache build output dependencies + uses: actions/cache@v3 + with: + path: target + key: ${{ runner.OS }}-build-${{ hashFiles('**/Cargo.lock') }} + restore-keys: | + ${{ runner.OS }}-build- + + - name: Install Rust + uses: dtolnay/rust-toolchain@master + with: + toolchain: stable + + - name: Run cargo test + run: cargo test --features="alloc,defmt,loom,mpmc_large,portable-atomic-critical-section,serde,ufmt,bytes,zeroize" + # Run cargo fmt --check style: name: style diff --git a/Cargo.toml b/Cargo.toml index 7390c86bc9..fbb9e544aa 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -58,6 +58,8 @@ alloc = [] nightly = [] +loom = ["dep:loom"] + [dependencies] bytes = { version = "1", default-features = false, optional = true } portable-atomic = { version = "1.0", optional = true } @@ -67,6 +69,7 @@ ufmt = { version = "0.2", optional = true } ufmt-write = { version = "0.1", optional = true } defmt = { version = "1.0.1", optional = true } zeroize = { version = "1.8", optional = true, default-features = false, features = ["derive"] } +loom = { version = "0.7", optional = true } # for the pool module [target.'cfg(any(target_arch = "arm", target_pointer_width = "32", target_pointer_width = "64"))'.dependencies] diff --git a/src/mpmc.rs b/src/mpmc.rs index f676c7c93a..fc5d944f45 100644 --- a/src/mpmc.rs +++ b/src/mpmc.rs @@ -70,9 +70,11 @@ use core::{cell::UnsafeCell, mem::MaybeUninit}; -#[cfg(not(feature = "portable-atomic"))] +#[cfg(all(not(feature = "portable-atomic"), not(feature = "loom")))] use core::sync::atomic; -#[cfg(feature = "portable-atomic")] +#[cfg(feature = "loom")] +use loom::sync::atomic; +#[cfg(all(feature = "portable-atomic", not(feature = "loom")))] use portable_atomic as atomic; use atomic::Ordering; @@ -122,6 +124,7 @@ pub type QueueView = QueueInner; impl Queue { /// Creates an empty queue. + #[cfg(not(feature = "loom"))] pub const fn new() -> Self { const { assert!(N > 1); @@ -144,6 +147,24 @@ impl Queue { } } + /// Creates an empty queue. + #[cfg(feature = "loom")] + pub fn new() -> Self { + const { + assert!(N > 1); + assert!(N.is_power_of_two()); + assert!(N < UintSize::MAX as usize); + } + + let result_cells: [Cell; N] = core::array::from_fn(Cell::new); + + Self { + buffer: UnsafeCell::new(result_cells), + dequeue_pos: AtomicTargetSize::new(0), + enqueue_pos: AtomicTargetSize::new(0), + } + } + /// Used in `Storage` implementation. pub(crate) fn as_view_private(&self) -> &QueueView { self @@ -247,12 +268,21 @@ struct Cell { } impl Cell { + #[cfg(not(feature = "loom"))] const fn new(seq: usize) -> Self { Self { data: MaybeUninit::uninit(), sequence: AtomicTargetSize::new(seq as UintSize), } } + + #[cfg(feature = "loom")] + fn new(seq: usize) -> Self { + Self { + data: MaybeUninit::uninit(), + sequence: AtomicTargetSize::new(seq as UintSize), + } + } } unsafe fn dequeue( @@ -346,12 +376,22 @@ unsafe fn enqueue( mod tests { use static_assertions::assert_not_impl_any; - use super::Queue; + use super::{Queue, QueueView}; // Ensure a `Queue` containing `!Send` values stays `!Send` itself. assert_not_impl_any!(Queue<*const (), 4>: Send); + fn to_vec(q: &QueueView) -> Vec { + // inaccurate + let mut ret = vec![]; + while let Some(v) = q.dequeue() { + ret.push(v); + } + ret + } + #[test] + #[cfg(not(feature = "loom"))] fn memory_leak() { droppable!(); @@ -364,6 +404,7 @@ mod tests { } #[test] + #[cfg(not(feature = "loom"))] fn sanity() { let q = Queue::<_, 2>::new(); q.enqueue(0).unwrap(); @@ -376,6 +417,7 @@ mod tests { } #[test] + #[cfg(not(feature = "loom"))] fn drain_at_pos255() { let q = Queue::<_, 2>::new(); for _ in 0..255 { @@ -388,6 +430,7 @@ mod tests { } #[test] + #[cfg(not(feature = "loom"))] fn full_at_wrapped_pos0() { let q = Queue::<_, 2>::new(); for _ in 0..254 { @@ -401,6 +444,7 @@ mod tests { } #[test] + #[cfg(not(feature = "loom"))] fn enqueue_full() { #[cfg(not(feature = "mpmc_large"))] const CAPACITY: usize = 128; @@ -419,4 +463,116 @@ mod tests { // Queue is full, this should not block forever. q.enqueue(0x55).unwrap_err(); } + + #[test] + #[cfg(not(feature = "loom"))] + fn issue_583_enqueue() { + const N: usize = 4; + + let q0 = Queue::::new(); + for i in 0..N { + q0.enqueue(i as u8).expect("new enqueue"); + } + eprintln!("start!"); + + std::thread::scope(|sc| { + for _ in 0..2 { + sc.spawn(|| { + for k in 0..1_000_000 { + if let Some(v) = q0.dequeue() { + q0.enqueue(v).unwrap_or_else(|v| { + panic!("{k}: q0 -> q0: {v}, {:?}", to_vec(&q0)) + }); + } + } + }); + } + }); + } + + #[test] + #[cfg(not(feature = "loom"))] + fn issue_583_dequeue() { + const N: usize = 4; + + let q0 = Queue::::new(); + eprintln!("start!"); + std::thread::scope(|sc| { + for _ in 0..2 { + sc.spawn(|| { + for k in 0..1_000_000 { + q0.enqueue(k as u8).unwrap(); + if q0.dequeue().is_none() { + panic!("{k}"); + } + } + }); + } + }); + } + + #[test] + #[cfg(feature = "loom")] + fn issue_583_enqueue_loom() { + loom::model(|| { + const N: usize = 4; + + let q0 = loom::sync::Arc::new(Queue::::new()); + for i in 0..N { + q0.enqueue(i as u8).expect("new enqueue"); + } + eprintln!("start!"); + + let q1 = q0.clone(); + loom::thread::spawn(move || { + for k in 0..1000_000 { + if let Some(v) = q0.dequeue() { + q0.enqueue(v) + .unwrap_or_else(|v| panic!("{k}: q0 -> q0: {v}, {:?}", to_vec(&*q0))); + } + } + }); + + loom::thread::spawn(move || { + for k in 0..1000_000 { + if let Some(v) = q1.dequeue() { + q1.enqueue(v) + .unwrap_or_else(|v| panic!("{k}: q0 -> q0: {v}, {:?}", to_vec(&*q1))); + } + } + }); + }); + } + + #[test] + #[cfg(feature = "loom")] + fn issue_583_dequeue_loom() { + loom::model(|| { + const N: usize = 4; + + let q0 = loom::sync::Arc::new(Queue::::new()); + + eprintln!("start!"); + + let q1 = q0.clone(); + + loom::thread::spawn(move || { + for k in 0..1000_000 { + q0.enqueue(k as u8).unwrap(); + if q0.dequeue().is_none() { + panic!("{k}"); + } + } + }); + + loom::thread::spawn(move || { + for k in 0..1000_000 { + q1.enqueue(k as u8).unwrap(); + if q1.dequeue().is_none() { + panic!("{k}"); + } + } + }); + }); + } } diff --git a/tests/tsan.rs b/tests/tsan.rs index 14391e2435..e13052cbc5 100644 --- a/tests/tsan.rs +++ b/tests/tsan.rs @@ -115,6 +115,7 @@ fn contention() { #[test] #[cfg_attr(miri, ignore)] // too slow +#[cfg(not(feature = "loom"))] fn mpmc_contention() { use std::sync::mpsc;