Skip to content

Commit c1abf02

Browse files
committed
mpmc: add loom test for queue contention
Adds a `loom` permutation test for `mpmmc::Queue::enqueue/dequeue` contention across threads. Related: #583 Co-authored-by: NODA Kai <https://github.com/nodakai> Co-authored-by: Sosthène Guédon <https://github.com/sosthene-nitrokey>
1 parent dfeb3dd commit c1abf02

File tree

3 files changed

+163
-3
lines changed

3 files changed

+163
-3
lines changed

Cargo.toml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,8 @@ alloc = []
5858

5959
nightly = []
6060

61+
loom = ["dep:loom"]
62+
6163
[dependencies]
6264
bytes = { version = "1", default-features = false, optional = true }
6365
portable-atomic = { version = "1.0", optional = true }
@@ -67,6 +69,7 @@ ufmt = { version = "0.2", optional = true }
6769
ufmt-write = { version = "0.1", optional = true }
6870
defmt = { version = "1.0.1", optional = true }
6971
zeroize = { version = "1.8", optional = true, default-features = false, features = ["derive"] }
72+
loom = { version = "0.7", optional = true }
7073

7174
# for the pool module
7275
[target.'cfg(any(target_arch = "arm", target_pointer_width = "32", target_pointer_width = "64"))'.dependencies]

src/mpmc.rs

Lines changed: 159 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -70,10 +70,12 @@
7070
7171
use core::{cell::UnsafeCell, mem::MaybeUninit};
7272

73-
#[cfg(not(feature = "portable-atomic"))]
73+
#[cfg(all(not(feature = "portable-atomic"), not(feature = "loom")))]
7474
use core::sync::atomic;
75-
#[cfg(feature = "portable-atomic")]
75+
#[cfg(all(not(feature = "portable-atomic"), not(feature = "loom")))]
7676
use portable_atomic as atomic;
77+
#[cfg(feature = "loom")]
78+
use loom::sync::atomic;
7779

7880
use atomic::Ordering;
7981

@@ -122,6 +124,7 @@ pub type QueueView<T> = QueueInner<T, ViewStorage>;
122124

123125
impl<T, const N: usize> Queue<T, N> {
124126
/// Creates an empty queue.
127+
#[cfg(not(feature = "loom"))]
125128
pub const fn new() -> Self {
126129
const {
127130
assert!(N > 1);
@@ -144,6 +147,24 @@ impl<T, const N: usize> Queue<T, N> {
144147
}
145148
}
146149

150+
/// Creates an empty queue.
151+
#[cfg(feature = "loom")]
152+
pub fn new() -> Self {
153+
const {
154+
assert!(N > 1);
155+
assert!(N.is_power_of_two());
156+
assert!(N < UintSize::MAX as usize);
157+
}
158+
159+
let result_cells: [Cell<T>; N] = core::array::from_fn(Cell::new);
160+
161+
Self {
162+
buffer: UnsafeCell::new(result_cells),
163+
dequeue_pos: AtomicTargetSize::new(0),
164+
enqueue_pos: AtomicTargetSize::new(0),
165+
}
166+
}
167+
147168
/// Used in `Storage` implementation.
148169
pub(crate) fn as_view_private(&self) -> &QueueView<T> {
149170
self
@@ -247,12 +268,21 @@ struct Cell<T> {
247268
}
248269

249270
impl<T> Cell<T> {
271+
#[cfg(not(feature = "loom"))]
250272
const fn new(seq: usize) -> Self {
251273
Self {
252274
data: MaybeUninit::uninit(),
253275
sequence: AtomicTargetSize::new(seq as UintSize),
254276
}
255277
}
278+
279+
#[cfg(feature = "loom")]
280+
fn new(seq: usize) -> Self {
281+
Self {
282+
data: MaybeUninit::uninit(),
283+
sequence: AtomicTargetSize::new(seq as UintSize),
284+
}
285+
}
256286
}
257287

258288
unsafe fn dequeue<T>(
@@ -346,12 +376,22 @@ unsafe fn enqueue<T>(
346376
mod tests {
347377
use static_assertions::assert_not_impl_any;
348378

349-
use super::Queue;
379+
use super::{Queue, QueueView};
350380

351381
// Ensure a `Queue` containing `!Send` values stays `!Send` itself.
352382
assert_not_impl_any!(Queue<*const (), 4>: Send);
353383

384+
fn to_vec<T>(q: &QueueView<T>) -> Vec<T> {
385+
// inaccurate
386+
let mut ret = vec![];
387+
while let Some(v) = q.dequeue() {
388+
ret.push(v);
389+
}
390+
ret
391+
}
392+
354393
#[test]
394+
#[cfg(not(feature = "loom"))]
355395
fn memory_leak() {
356396
droppable!();
357397

@@ -364,6 +404,7 @@ mod tests {
364404
}
365405

366406
#[test]
407+
#[cfg(not(feature = "loom"))]
367408
fn sanity() {
368409
let q = Queue::<_, 2>::new();
369410
q.enqueue(0).unwrap();
@@ -376,6 +417,7 @@ mod tests {
376417
}
377418

378419
#[test]
420+
#[cfg(not(feature = "loom"))]
379421
fn drain_at_pos255() {
380422
let q = Queue::<_, 2>::new();
381423
for _ in 0..255 {
@@ -388,6 +430,7 @@ mod tests {
388430
}
389431

390432
#[test]
433+
#[cfg(not(feature = "loom"))]
391434
fn full_at_wrapped_pos0() {
392435
let q = Queue::<_, 2>::new();
393436
for _ in 0..254 {
@@ -401,6 +444,7 @@ mod tests {
401444
}
402445

403446
#[test]
447+
#[cfg(not(feature = "loom"))]
404448
fn enqueue_full() {
405449
#[cfg(not(feature = "mpmc_large"))]
406450
const CAPACITY: usize = 128;
@@ -419,4 +463,116 @@ mod tests {
419463
// Queue is full, this should not block forever.
420464
q.enqueue(0x55).unwrap_err();
421465
}
466+
467+
#[test]
468+
#[cfg(not(feature = "loom"))]
469+
fn issue_583_enqueue() {
470+
const N: usize = 4;
471+
472+
let q0 = Queue::<u8, N>::new();
473+
for i in 0..N {
474+
q0.enqueue(i as u8).expect("new enqueue");
475+
}
476+
eprintln!("start!");
477+
478+
std::thread::scope(|sc| {
479+
for _ in 0..2 {
480+
sc.spawn(|| {
481+
for k in 0..1000_000 {
482+
if let Some(v) = q0.dequeue() {
483+
q0.enqueue(v).unwrap_or_else(|v| {
484+
panic!("{k}: q0 -> q0: {v}, {:?}", to_vec(&q0))
485+
});
486+
}
487+
}
488+
});
489+
}
490+
});
491+
}
492+
493+
#[test]
494+
#[cfg(not(feature = "loom"))]
495+
fn issue_583_dequeue() {
496+
const N: usize = 4;
497+
498+
let q0 = Queue::<u8, N>::new();
499+
eprintln!("start!");
500+
std::thread::scope(|sc| {
501+
for _ in 0..2 {
502+
sc.spawn(|| {
503+
for k in 0..1000_000 {
504+
q0.enqueue(k as u8).unwrap();
505+
if q0.dequeue().is_none() {
506+
panic!("{k}");
507+
}
508+
}
509+
});
510+
}
511+
});
512+
}
513+
514+
#[test]
515+
#[cfg(feature = "loom")]
516+
fn issue_583_enqueue_loom() {
517+
loom::model(|| {
518+
const N: usize = 4;
519+
520+
let q0 = loom::sync::Arc::new(Queue::<u8, N>::new());
521+
for i in 0..N {
522+
q0.enqueue(i as u8).expect("new enqueue");
523+
}
524+
eprintln!("start!");
525+
526+
let q1 = q0.clone();
527+
loom::thread::spawn(move || {
528+
for k in 0..1000_000 {
529+
if let Some(v) = q0.dequeue() {
530+
q0.enqueue(v)
531+
.unwrap_or_else(|v| panic!("{k}: q0 -> q0: {v}, {:?}", to_vec(&*q0)));
532+
}
533+
}
534+
});
535+
536+
loom::thread::spawn(move || {
537+
for k in 0..1000_000 {
538+
if let Some(v) = q1.dequeue() {
539+
q1.enqueue(v)
540+
.unwrap_or_else(|v| panic!("{k}: q0 -> q0: {v}, {:?}", to_vec(&*q1)));
541+
}
542+
}
543+
});
544+
});
545+
}
546+
547+
#[test]
548+
#[cfg(feature = "loom")]
549+
fn issue_583_dequeue_loom() {
550+
loom::model(|| {
551+
const N: usize = 4;
552+
553+
let q0 = loom::sync::Arc::new(Queue::<u8, N>::new());
554+
555+
eprintln!("start!");
556+
557+
let q1 = q0.clone();
558+
559+
loom::thread::spawn(move || {
560+
for k in 0..1000_000 {
561+
q0.enqueue(k as u8).unwrap();
562+
if q0.dequeue().is_none() {
563+
panic!("{k}");
564+
}
565+
}
566+
});
567+
568+
loom::thread::spawn(move || {
569+
for k in 0..1000_000 {
570+
q1.enqueue(k as u8).unwrap();
571+
if q1.dequeue().is_none() {
572+
panic!("{k}");
573+
}
574+
}
575+
});
576+
});
577+
}
422578
}

tests/tsan.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,7 @@ fn contention() {
115115

116116
#[test]
117117
#[cfg_attr(miri, ignore)] // too slow
118+
#[cfg(not(feature = "loom"))]
118119
fn mpmc_contention() {
119120
use std::sync::mpsc;
120121

0 commit comments

Comments
 (0)