Skip to content

Commit b7f9559

Browse files
committed
mpmc: use heapless API for crossbeam impl
Converts the `crossbeam::ArrayQueue` impl for MPMC to use a `heapless`-compatible API.
1 parent 21bdbe4 commit b7f9559

File tree

3 files changed

+466
-526
lines changed

3 files changed

+466
-526
lines changed

src/mpmc.rs

Lines changed: 214 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,23 @@
6868
//!
6969
//! [bounded MPMC queue]: http://www.1024cores.net/home/lock-free-algorithms/queues/bounded-mpmc-queue
7070
71+
use crate::storage::ViewStorage;
72+
73+
#[cfg(not(feature = "portable-atomic"))]
74+
use core::sync::atomic;
75+
#[cfg(feature = "portable-atomic")]
76+
use portable_atomic as atomic;
77+
78+
#[cfg(feature = "mpmc_large")]
79+
type AtomicTargetSize = atomic::AtomicUsize;
80+
#[cfg(not(feature = "mpmc_large"))]
81+
type AtomicTargetSize = atomic::AtomicU8;
82+
83+
#[cfg(feature = "mpmc_large")]
84+
type UintSize = usize;
85+
#[cfg(not(feature = "mpmc_large"))]
86+
type UintSize = u8;
87+
7188
#[cfg(feature = "mpmc_crossbeam")]
7289
pub mod crossbeam_array_queue;
7390
#[cfg(feature = "mpmc_crossbeam")]
@@ -77,3 +94,200 @@ pub use crossbeam_array_queue::*;
7794
mod original;
7895
#[cfg(not(feature = "mpmc_crossbeam"))]
7996
pub use original::*;
97+
98+
/// A [`Queue`] with dynamic capacity.
99+
///
100+
/// [`Queue`] coerces to `QueueView`. `QueueView` is `!Sized`, meaning it can only ever be used by reference.
101+
pub type QueueView<T> = QueueInner<T, ViewStorage>;
102+
103+
#[cfg(test)]
104+
mod tests {
105+
use static_assertions::assert_not_impl_any;
106+
107+
use super::{Queue, QueueView};
108+
109+
// Ensure a `Queue` containing `!Send` values stays `!Send` itself.
110+
assert_not_impl_any!(Queue<*const (), 4>: Send);
111+
112+
fn to_vec<T>(q: &QueueView<T>) -> Vec<T> {
113+
// inaccurate
114+
let mut ret = vec![];
115+
while let Some(v) = q.dequeue() {
116+
ret.push(v);
117+
}
118+
ret
119+
}
120+
121+
#[test]
122+
fn memory_leak() {
123+
droppable!();
124+
125+
let q = Queue::<_, 2>::new();
126+
q.enqueue(Droppable::new()).unwrap_or_else(|_| panic!());
127+
q.enqueue(Droppable::new()).unwrap_or_else(|_| panic!());
128+
drop(q);
129+
130+
assert_eq!(Droppable::count(), 0);
131+
}
132+
133+
#[test]
134+
fn sanity() {
135+
let q = Queue::<_, 2>::new();
136+
q.enqueue(0).unwrap();
137+
q.enqueue(1).unwrap();
138+
assert!(q.enqueue(2).is_err());
139+
140+
assert_eq!(q.dequeue(), Some(0));
141+
assert_eq!(q.dequeue(), Some(1));
142+
assert_eq!(q.dequeue(), None);
143+
}
144+
145+
#[test]
146+
fn drain_at_pos255() {
147+
let q = Queue::<_, 2>::new();
148+
for _ in 0..255 {
149+
assert!(q.enqueue(0).is_ok());
150+
assert_eq!(q.dequeue(), Some(0));
151+
}
152+
153+
// Queue is empty, this should not block forever.
154+
assert_eq!(q.dequeue(), None);
155+
}
156+
157+
#[test]
158+
fn full_at_wrapped_pos0() {
159+
let q = Queue::<_, 2>::new();
160+
for _ in 0..254 {
161+
assert!(q.enqueue(0).is_ok());
162+
assert_eq!(q.dequeue(), Some(0));
163+
}
164+
assert!(q.enqueue(0).is_ok());
165+
assert!(q.enqueue(0).is_ok());
166+
// this should not block forever
167+
assert!(q.enqueue(0).is_err());
168+
}
169+
170+
#[test]
171+
fn enqueue_full() {
172+
#[cfg(not(feature = "mpmc_large"))]
173+
const CAPACITY: usize = 128;
174+
175+
#[cfg(feature = "mpmc_large")]
176+
const CAPACITY: usize = 256;
177+
178+
let q: Queue<u8, CAPACITY> = Queue::new();
179+
180+
assert_eq!(q.capacity(), CAPACITY);
181+
182+
for _ in 0..CAPACITY {
183+
q.enqueue(0xAA).unwrap();
184+
}
185+
186+
// Queue is full, this should not block forever.
187+
q.enqueue(0x55).unwrap_err();
188+
}
189+
190+
#[test]
191+
fn issue_583_enqueue() {
192+
const N: usize = 4;
193+
194+
let q0 = Queue::<u8, N>::new();
195+
for i in 0..N {
196+
q0.enqueue(i as u8).expect("new enqueue");
197+
}
198+
eprintln!("start!");
199+
200+
std::thread::scope(|sc| {
201+
for _ in 0..2 {
202+
sc.spawn(|| {
203+
for k in 0..1000_000 {
204+
if let Some(v) = q0.dequeue() {
205+
q0.enqueue(v).unwrap_or_else(|v| {
206+
panic!("{k}: q0 -> q0: {v}, {:?}", to_vec(&q0))
207+
});
208+
}
209+
}
210+
});
211+
}
212+
});
213+
}
214+
215+
#[test]
216+
fn issue_583_dequeue() {
217+
const N: usize = 4;
218+
219+
let q0 = Queue::<u8, N>::new();
220+
eprintln!("start!");
221+
std::thread::scope(|sc| {
222+
for _ in 0..2 {
223+
sc.spawn(|| {
224+
for k in 0..1000_000 {
225+
q0.enqueue(k as u8).unwrap();
226+
if q0.dequeue().is_none() {
227+
panic!("{k}");
228+
}
229+
}
230+
});
231+
}
232+
});
233+
}
234+
235+
#[test]
236+
fn issue_583_enqueue_loom() {
237+
const N: usize = 4;
238+
239+
loom::model(|| {
240+
let q0 = loom::sync::Arc::new(Queue::<u8, N>::new());
241+
for i in 0..N {
242+
q0.enqueue(i as u8).expect("new enqueue");
243+
}
244+
eprintln!("start!");
245+
246+
let q1 = q0.clone();
247+
loom::thread::spawn(move || {
248+
for k in 0..1000_000 {
249+
if let Some(v) = q0.dequeue() {
250+
q0.enqueue(v)
251+
.unwrap_or_else(|v| panic!("{k}: q0 -> q0: {v}, {:?}", to_vec(&*q0)));
252+
}
253+
}
254+
});
255+
256+
loom::thread::spawn(move || {
257+
for k in 0..1000_000 {
258+
if let Some(v) = q1.dequeue() {
259+
q1.enqueue(v)
260+
.unwrap_or_else(|v| panic!("{k}: q0 -> q0: {v}, {:?}", to_vec(&*q1)));
261+
}
262+
}
263+
});
264+
});
265+
}
266+
267+
#[test]
268+
fn issue_583_enqueue_loom_scope() {
269+
const N: usize = 4;
270+
271+
loom::model(|| {
272+
let q0 = Queue::<u8, N>::new();
273+
for i in 0..N {
274+
q0.enqueue(i as u8).expect("new enqueue");
275+
}
276+
eprintln!("start!");
277+
278+
loom::thread::scope(|sc| {
279+
for _ in 0..2 {
280+
sc.spawn(|| {
281+
for k in 0..1000_000 {
282+
if let Some(v) = q0.dequeue() {
283+
q0.enqueue(v).unwrap_or_else(|v| {
284+
panic!("{k}: q0 -> q0: {v}, {:?}", to_vec(&q0))
285+
});
286+
}
287+
}
288+
});
289+
}
290+
});
291+
});
292+
}
293+
}

0 commit comments

Comments
 (0)