Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 36 additions & 0 deletions .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,42 @@ jobs:
- name: Run cargo test
run: cargo test --features="alloc,defmt,mpmc_large,portable-atomic-critical-section,serde,ufmt,bytes,zeroize"

# Run cargo test (loom)
testloom:
name: test
runs-on: ubuntu-latest
steps:
- name: Checkout
uses: actions/checkout@v4

- name: Cache cargo dependencies
uses: actions/cache@v3
with:
path: |
- ~/.cargo/bin/
- ~/.cargo/registry/index/
- ~/.cargo/registry/cache/
- ~/.cargo/git/db/
key: ${{ runner.OS }}-cargo-${{ hashFiles('**/Cargo.lock') }}
restore-keys: |
${{ runner.OS }}-cargo-

- name: Cache build output dependencies
uses: actions/cache@v3
with:
path: target
key: ${{ runner.OS }}-build-${{ hashFiles('**/Cargo.lock') }}
restore-keys: |
${{ runner.OS }}-build-

- name: Install Rust
uses: dtolnay/rust-toolchain@master
with:
toolchain: stable

- name: Run cargo test
run: cargo test --features="alloc,defmt,loom,mpmc_large,portable-atomic-critical-section,serde,ufmt,bytes,zeroize"

# Run cargo fmt --check
style:
name: style
Expand Down
3 changes: 3 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ alloc = []

nightly = []

loom = ["dep:loom"]

[dependencies]
bytes = { version = "1", default-features = false, optional = true }
portable-atomic = { version = "1.0", optional = true }
Expand All @@ -67,6 +69,7 @@ ufmt = { version = "0.2", optional = true }
ufmt-write = { version = "0.1", optional = true }
defmt = { version = "1.0.1", optional = true }
zeroize = { version = "1.8", optional = true, default-features = false, features = ["derive"] }
loom = { version = "0.7", optional = true }

# for the pool module
[target.'cfg(any(target_arch = "arm", target_pointer_width = "32", target_pointer_width = "64"))'.dependencies]
Expand Down
162 changes: 159 additions & 3 deletions src/mpmc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,9 +70,11 @@

use core::{cell::UnsafeCell, mem::MaybeUninit};

#[cfg(not(feature = "portable-atomic"))]
#[cfg(all(not(feature = "portable-atomic"), not(feature = "loom")))]
use core::sync::atomic;
#[cfg(feature = "portable-atomic")]
#[cfg(feature = "loom")]
use loom::sync::atomic;
#[cfg(all(feature = "portable-atomic", not(feature = "loom")))]
use portable_atomic as atomic;

use atomic::Ordering;
Expand Down Expand Up @@ -122,6 +124,7 @@ pub type QueueView<T> = QueueInner<T, ViewStorage>;

impl<T, const N: usize> Queue<T, N> {
/// Creates an empty queue.
#[cfg(not(feature = "loom"))]
pub const fn new() -> Self {
const {
assert!(N > 1);
Expand All @@ -144,6 +147,24 @@ impl<T, const N: usize> Queue<T, N> {
}
}

/// Creates an empty queue.
#[cfg(feature = "loom")]
pub fn new() -> Self {
const {
assert!(N > 1);
assert!(N.is_power_of_two());
assert!(N < UintSize::MAX as usize);
}

let result_cells: [Cell<T>; N] = core::array::from_fn(Cell::new);

Self {
buffer: UnsafeCell::new(result_cells),
dequeue_pos: AtomicTargetSize::new(0),
enqueue_pos: AtomicTargetSize::new(0),
}
}

/// Used in `Storage` implementation.
pub(crate) fn as_view_private(&self) -> &QueueView<T> {
self
Expand Down Expand Up @@ -247,12 +268,21 @@ struct Cell<T> {
}

impl<T> Cell<T> {
#[cfg(not(feature = "loom"))]
const fn new(seq: usize) -> Self {
Self {
data: MaybeUninit::uninit(),
sequence: AtomicTargetSize::new(seq as UintSize),
}
}

#[cfg(feature = "loom")]
fn new(seq: usize) -> Self {
Self {
data: MaybeUninit::uninit(),
sequence: AtomicTargetSize::new(seq as UintSize),
}
}
}

unsafe fn dequeue<T>(
Expand Down Expand Up @@ -346,12 +376,22 @@ unsafe fn enqueue<T>(
mod tests {
use static_assertions::assert_not_impl_any;

use super::Queue;
use super::{Queue, QueueView};

// Ensure a `Queue` containing `!Send` values stays `!Send` itself.
assert_not_impl_any!(Queue<*const (), 4>: Send);

fn to_vec<T>(q: &QueueView<T>) -> Vec<T> {
// inaccurate
let mut ret = vec![];
while let Some(v) = q.dequeue() {
ret.push(v);
}
ret
}

#[test]
#[cfg(not(feature = "loom"))]
fn memory_leak() {
droppable!();

Expand All @@ -364,6 +404,7 @@ mod tests {
}

#[test]
#[cfg(not(feature = "loom"))]
fn sanity() {
let q = Queue::<_, 2>::new();
q.enqueue(0).unwrap();
Expand All @@ -376,6 +417,7 @@ mod tests {
}

#[test]
#[cfg(not(feature = "loom"))]
fn drain_at_pos255() {
let q = Queue::<_, 2>::new();
for _ in 0..255 {
Expand All @@ -388,6 +430,7 @@ mod tests {
}

#[test]
#[cfg(not(feature = "loom"))]
fn full_at_wrapped_pos0() {
let q = Queue::<_, 2>::new();
for _ in 0..254 {
Expand All @@ -401,6 +444,7 @@ mod tests {
}

#[test]
#[cfg(not(feature = "loom"))]
fn enqueue_full() {
#[cfg(not(feature = "mpmc_large"))]
const CAPACITY: usize = 128;
Expand All @@ -419,4 +463,116 @@ mod tests {
// Queue is full, this should not block forever.
q.enqueue(0x55).unwrap_err();
}

#[test]
#[cfg(not(feature = "loom"))]
fn issue_583_enqueue() {
const N: usize = 4;

let q0 = Queue::<u8, N>::new();
for i in 0..N {
q0.enqueue(i as u8).expect("new enqueue");
}
eprintln!("start!");

std::thread::scope(|sc| {
for _ in 0..2 {
sc.spawn(|| {
for k in 0..1_000_000 {
if let Some(v) = q0.dequeue() {
q0.enqueue(v).unwrap_or_else(|v| {
panic!("{k}: q0 -> q0: {v}, {:?}", to_vec(&q0))
});
}
}
});
}
});
}

#[test]
#[cfg(not(feature = "loom"))]
fn issue_583_dequeue() {
const N: usize = 4;

let q0 = Queue::<u8, N>::new();
eprintln!("start!");
std::thread::scope(|sc| {
for _ in 0..2 {
sc.spawn(|| {
for k in 0..1_000_000 {
q0.enqueue(k as u8).unwrap();
if q0.dequeue().is_none() {
panic!("{k}");
}
}
});
}
});
}

#[test]
#[cfg(feature = "loom")]
fn issue_583_enqueue_loom() {
loom::model(|| {
const N: usize = 4;

let q0 = loom::sync::Arc::new(Queue::<u8, N>::new());
for i in 0..N {
q0.enqueue(i as u8).expect("new enqueue");
}
eprintln!("start!");

let q1 = q0.clone();
loom::thread::spawn(move || {
for k in 0..1000_000 {
if let Some(v) = q0.dequeue() {
q0.enqueue(v)
.unwrap_or_else(|v| panic!("{k}: q0 -> q0: {v}, {:?}", to_vec(&*q0)));
}
}
});

loom::thread::spawn(move || {
for k in 0..1000_000 {
if let Some(v) = q1.dequeue() {
q1.enqueue(v)
.unwrap_or_else(|v| panic!("{k}: q0 -> q0: {v}, {:?}", to_vec(&*q1)));
}
}
});
});
}

#[test]
#[cfg(feature = "loom")]
fn issue_583_dequeue_loom() {
loom::model(|| {
const N: usize = 4;

let q0 = loom::sync::Arc::new(Queue::<u8, N>::new());

eprintln!("start!");

let q1 = q0.clone();

loom::thread::spawn(move || {
for k in 0..1000_000 {
q0.enqueue(k as u8).unwrap();
if q0.dequeue().is_none() {
panic!("{k}");
}
}
});

loom::thread::spawn(move || {
for k in 0..1000_000 {
q1.enqueue(k as u8).unwrap();
if q1.dequeue().is_none() {
panic!("{k}");
}
}
});
});
}
}
1 change: 1 addition & 0 deletions tests/tsan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ fn contention() {

#[test]
#[cfg_attr(miri, ignore)] // too slow
#[cfg(not(feature = "loom"))]
fn mpmc_contention() {
use std::sync::mpsc;

Expand Down
Loading