Skip to content

Commit 20de091

Browse files
committed
📝 add epoch-list-queue bench
1 parent 200d215 commit 20de091

File tree

1 file changed

+251
-0
lines changed

1 file changed

+251
-0
lines changed

benches/queue.rs

Lines changed: 251 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,10 @@ const THREADS: usize = 20;
77
const ITEMS: usize = 1000;
88

99
fn concurrent_queue(c: &mut Criterion) {
10+
c.bench_function("queue-epoch-list-queue", |b| {
11+
b.iter(run::<epoch_list::ListQueue<usize>>)
12+
});
13+
1014
c.bench_function("queue-rcu-single-list", |b| {
1115
b.iter(run::<rcu_single_list::ListQueue<usize>>)
1216
});
@@ -31,6 +35,10 @@ fn concurrent_queue(c: &mut Criterion) {
3135
}
3236

3337
fn single_queue(c: &mut Criterion) {
38+
c.bench_function("single_queue-epoch-list-queue", |b| {
39+
b.iter(single_run::<epoch_list::ListQueue<usize>>)
40+
});
41+
3442
c.bench_function("single_queue-rcu-single-list", |b| {
3543
b.iter(single_run::<rcu_single_list::ListQueue<usize>>)
3644
});
@@ -300,3 +308,246 @@ mod crossbem_seg_queue {
300308
}
301309
}
302310
}
311+
312+
mod epoch_list {
313+
//! Michael-Scott lock-free queue.
314+
//!
315+
//! Usable with any number of producers and consumers.
316+
//!
317+
//! Michael and Scott. Simple, Fast, and Practical Non-Blocking and Blocking Concurrent Queue
318+
//! Algorithms. PODC 1996. <http://dl.acm.org/citation.cfm?id=248106>
319+
//!
320+
//! Simon Doherty, Lindsay Groves, Victor Luchangco, and Mark Moir. 2004b. Formal Verification of a
321+
//! Practical Lock-Free Queue Algorithm. <https://doi.org/10.1007/978-3-540-30232-2_7>
322+
323+
use core::mem::MaybeUninit;
324+
use core::sync::atomic::Ordering::{Acquire, Relaxed, Release};
325+
326+
use crossbeam_utils::CachePadded;
327+
328+
use crossbeam_epoch::{unprotected, Atomic, Guard, Owned, Shared};
329+
330+
// The representation here is a singly-linked list, with a sentinel node at the front. In general
331+
// the `tail` pointer may lag behind the actual tail. Non-sentinel nodes are either all `Data` or
332+
// all `Blocked` (requests for data from blocked threads).
333+
#[derive(Debug)]
334+
pub(crate) struct ListQueue<T> {
335+
head: CachePadded<Atomic<Node<T>>>,
336+
tail: CachePadded<Atomic<Node<T>>>,
337+
}
338+
339+
struct Node<T> {
340+
/// The slot in which a value of type `T` can be stored.
341+
///
342+
/// The type of `data` is `MaybeUninit<T>` because a `Node<T>` doesn't always contain a `T`.
343+
/// For example, the sentinel node in a queue never contains a value: its slot is always empty.
344+
/// Other nodes start their life with a push operation and contain a value until it gets popped
345+
/// out. After that such empty nodes get added to the collector for destruction.
346+
data: MaybeUninit<T>,
347+
348+
next: Atomic<Node<T>>,
349+
}
350+
351+
// Any particular `T` should never be accessed concurrently, so no need for `Sync`.
352+
unsafe impl<T: Send> Sync for ListQueue<T> {}
353+
unsafe impl<T: Send> Send for ListQueue<T> {}
354+
355+
impl<T> ListQueue<T> {
356+
/// Create a new, empty queue.
357+
pub(crate) fn new() -> Self {
358+
let q = Self {
359+
head: CachePadded::new(Atomic::null()),
360+
tail: CachePadded::new(Atomic::null()),
361+
};
362+
let sentinel = Owned::new(Node {
363+
data: MaybeUninit::uninit(),
364+
next: Atomic::null(),
365+
});
366+
unsafe {
367+
let guard = unprotected();
368+
let sentinel = sentinel.into_shared(guard);
369+
q.head.store(sentinel, Relaxed);
370+
q.tail.store(sentinel, Relaxed);
371+
q
372+
}
373+
}
374+
375+
/// Attempts to atomically place `n` into the `next` pointer of `onto`, and returns `true` on
376+
/// success. The queue's `tail` pointer may be updated.
377+
#[inline(always)]
378+
fn push_internal(
379+
&self,
380+
onto: Shared<'_, Node<T>>,
381+
new: Shared<'_, Node<T>>,
382+
guard: &Guard,
383+
) -> bool {
384+
// is `onto` the actual tail?
385+
let o = unsafe { onto.deref() };
386+
let next = o.next.load(Acquire, guard);
387+
if unsafe { next.as_ref().is_some() } {
388+
// if not, try to "help" by moving the tail pointer forward
389+
let _ = self
390+
.tail
391+
.compare_exchange(onto, next, Release, Relaxed, guard);
392+
false
393+
} else {
394+
// looks like the actual tail; attempt to link in `n`
395+
let result = o
396+
.next
397+
.compare_exchange(Shared::null(), new, Release, Relaxed, guard)
398+
.is_ok();
399+
if result {
400+
// try to move the tail pointer forward
401+
let _ = self
402+
.tail
403+
.compare_exchange(onto, new, Release, Relaxed, guard);
404+
}
405+
result
406+
}
407+
}
408+
409+
/// Adds `t` to the back of the queue, possibly waking up threads blocked on `pop`.
410+
pub(crate) fn push(&self, t: T, guard: &Guard) {
411+
let new = Owned::new(Node {
412+
data: MaybeUninit::new(t),
413+
next: Atomic::null(),
414+
});
415+
let new = Owned::into_shared(new, guard);
416+
417+
loop {
418+
// We push onto the tail, so we'll start optimistically by looking there first.
419+
let tail = self.tail.load(Acquire, guard);
420+
421+
// Attempt to push onto the `tail` snapshot; fails if `tail.next` has changed.
422+
if self.push_internal(tail, new, guard) {
423+
break;
424+
}
425+
}
426+
}
427+
428+
/// Attempts to pop a data node. `Ok(None)` if queue is empty; `Err(())` if lost race to pop.
429+
#[inline(always)]
430+
fn pop_internal(&self, guard: &Guard) -> Result<Option<T>, ()> {
431+
let head = self.head.load(Acquire, guard);
432+
let h = unsafe { head.deref() };
433+
let next = h.next.load(Acquire, guard);
434+
match unsafe { next.as_ref() } {
435+
Some(n) => unsafe {
436+
self.head
437+
.compare_exchange(head, next, Release, Relaxed, guard)
438+
.map(|_| {
439+
let tail = self.tail.load(Relaxed, guard);
440+
// Advance the tail so that we don't retire a pointer to a reachable node.
441+
if head == tail {
442+
let _ = self
443+
.tail
444+
.compare_exchange(tail, next, Release, Relaxed, guard);
445+
}
446+
guard.defer_destroy(head);
447+
Some(n.data.assume_init_read())
448+
})
449+
.map_err(|_| ())
450+
},
451+
None => Ok(None),
452+
}
453+
}
454+
455+
// /// Attempts to pop a data node, if the data satisfies the given condition. `Ok(None)` if queue
456+
// /// is empty or the data does not satisfy the condition; `Err(())` if lost race to pop.
457+
// #[inline(always)]
458+
// fn pop_if_internal<F>(&self, condition: F, guard: &Guard) -> Result<Option<T>, ()>
459+
// where
460+
// T: Sync,
461+
// F: Fn(&T) -> bool,
462+
// {
463+
// let head = self.head.load(Acquire, guard);
464+
// let h = unsafe { head.deref() };
465+
// let next = h.next.load(Acquire, guard);
466+
// match unsafe { next.as_ref() } {
467+
// Some(n) if condition(unsafe { &*n.data.as_ptr() }) => unsafe {
468+
// self.head
469+
// .compare_exchange(head, next, Release, Relaxed, guard)
470+
// .map(|_| {
471+
// let tail = self.tail.load(Relaxed, guard);
472+
// // Advance the tail so that we don't retire a pointer to a reachable node.
473+
// if head == tail {
474+
// let _ = self
475+
// .tail
476+
// .compare_exchange(tail, next, Release, Relaxed, guard);
477+
// }
478+
// guard.defer_destroy(head);
479+
// Some(n.data.assume_init_read())
480+
// })
481+
// .map_err(|_| ())
482+
// },
483+
// None | Some(_) => Ok(None),
484+
// }
485+
// }
486+
487+
/// Attempts to dequeue from the front.
488+
///
489+
/// Returns `None` if the queue is observed to be empty.
490+
pub(crate) fn try_pop(&self, guard: &Guard) -> Option<T> {
491+
loop {
492+
if let Ok(head) = self.pop_internal(guard) {
493+
return head;
494+
}
495+
}
496+
}
497+
498+
// /// Attempts to dequeue from the front, if the item satisfies the given condition.
499+
// ///
500+
// /// Returns `None` if the queue is observed to be empty, or the head does not satisfy the given
501+
// /// condition.
502+
// pub(crate) fn try_pop_if<F>(&self, condition: F, guard: &Guard) -> Option<T>
503+
// where
504+
// T: Sync,
505+
// F: Fn(&T) -> bool,
506+
// {
507+
// loop {
508+
// if let Ok(head) = self.pop_if_internal(&condition, guard) {
509+
// return head;
510+
// }
511+
// }
512+
// }
513+
514+
pub(crate) fn is_empty(&self) -> bool {
515+
let guard = &unsafe { unprotected() };
516+
self.head.load(Relaxed, guard) == self.tail.load(Relaxed, guard)
517+
}
518+
}
519+
520+
impl<T> Drop for ListQueue<T> {
521+
fn drop(&mut self) {
522+
unsafe {
523+
let guard = unprotected();
524+
525+
while self.try_pop(guard).is_some() {}
526+
527+
// Destroy the remaining sentinel node.
528+
let sentinel = self.head.load(Relaxed, guard);
529+
drop(sentinel.into_owned());
530+
}
531+
}
532+
}
533+
534+
impl<T> super::Queue<T> for ListQueue<T> {
535+
fn new() -> ListQueue<T> {
536+
ListQueue::new()
537+
}
538+
539+
fn push(&self, value: T) {
540+
let guard = &crossbeam_epoch::pin();
541+
self.push(value, guard);
542+
}
543+
544+
fn pop(&self) -> Option<T> {
545+
let guard = &crossbeam_epoch::pin();
546+
self.try_pop(guard)
547+
}
548+
549+
fn is_empty(&self) -> bool {
550+
self.is_empty()
551+
}
552+
}
553+
}

0 commit comments

Comments
 (0)