diff --git a/benches/bench.rs b/benches/bench.rs index d9e0db1..7ec2483 100644 --- a/benches/bench.rs +++ b/benches/bench.rs @@ -1,7 +1,7 @@ use std::iter; use criterion::{criterion_group, criterion_main, Criterion}; -use event_listener::{Event, Listener}; +use event_listener::{Event, Listener, QueueStrategy}; const COUNT: usize = 8000; @@ -20,6 +20,21 @@ fn bench_events(c: &mut Criterion) { } }); }); + + c.bench_function("notify_and_wait_lifo", |b| { + let ev = Event::new_with_queue_strategy(QueueStrategy::Lifo); + let mut handles = Vec::with_capacity(COUNT); + + b.iter(|| { + handles.extend(iter::repeat_with(|| ev.listen()).take(COUNT)); + + ev.notify(COUNT); + + for handle in handles.drain(..) { + handle.wait(); + } + }); + }); } criterion_group!(benches, bench_events); diff --git a/src/intrusive.rs b/src/intrusive.rs index 7150237..572880a 100644 --- a/src/intrusive.rs +++ b/src/intrusive.rs @@ -6,7 +6,7 @@ use crate::notify::{GenericNotify, Internal, Notification}; use crate::sync::atomic::Ordering; use crate::sync::cell::{Cell, UnsafeCell}; -use crate::{RegisterResult, State, TaskRef}; +use crate::{QueueStrategy, RegisterResult, State, TaskRef}; #[cfg(feature = "critical-section")] use core::cell::RefCell; @@ -42,17 +42,21 @@ struct Inner { /// The number of notified listeners. notified: usize, + + /// Strategy by which the list is organized. + strategy: QueueStrategy, } impl List { /// Create a new, empty event listener list. - pub(super) fn new() -> Self { + pub(super) fn new(strategy: QueueStrategy) -> Self { let inner = Inner { head: None, tail: None, next: None, len: 0, notified: 0, + strategy, }; #[cfg(feature = "critical-section")] @@ -149,39 +153,9 @@ impl crate::Inner { }) } - /// Add a new listener to the list. - pub(crate) fn insert(&self, mut listener: Pin<&mut Option>>) { - self.with_inner(|inner| { - listener.as_mut().set(Some(Listener { - link: UnsafeCell::new(Link { - state: Cell::new(State::Created), - prev: Cell::new(inner.tail), - next: Cell::new(None), - }), - _pin: PhantomPinned, - })); - let listener = listener.as_pin_mut().unwrap(); - - { - let entry_guard = listener.link.get(); - // SAFETY: We are locked, so we can access the inner `link`. - let entry = unsafe { entry_guard.deref() }; - - // Replace the tail with the new entry. - match inner.tail.replace(entry.into()) { - None => inner.head = Some(entry.into()), - Some(t) => unsafe { t.as_ref().next.set(Some(entry.into())) }, - }; - } - - // If there are no unnotified entries, this is the first one. - if inner.next.is_none() { - inner.next = inner.tail; - } - - // Bump the entry count. - inner.len += 1; - }); + /// Adds a listener to the list. + pub(crate) fn insert(&self, listener: Pin<&mut Option>>) { + self.with_inner(|inner| inner.insert(listener)) } /// Remove a listener from the list. @@ -248,6 +222,53 @@ impl crate::Inner { } impl Inner { + fn insert(&mut self, mut listener: Pin<&mut Option>>) { + use QueueStrategy::{Fifo, Lifo}; + + listener.as_mut().set(Some(Listener { + link: UnsafeCell::new(Link { + state: Cell::new(State::Created), + prev: Cell::new(self.tail.filter(|_| self.strategy == Fifo)), + next: Cell::new(self.head.filter(|_| self.strategy == Lifo)), + }), + _pin: PhantomPinned, + })); + let listener = listener.as_pin_mut().unwrap(); + + { + let entry_guard = listener.link.get(); + // SAFETY: We are locked, so we can access the inner `link`. + let entry = unsafe { entry_guard.deref() }; + + // Replace the head or tail with the new entry. + let replacing = match self.strategy { + Lifo => &mut self.head, + Fifo => &mut self.tail, + }; + + match replacing.replace(entry.into()) { + None => *replacing = Some(entry.into()), + Some(t) if self.strategy == Lifo => unsafe { + t.as_ref().prev.set(Some(entry.into())) + }, + Some(t) if self.strategy == Fifo => unsafe { + t.as_ref().next.set(Some(entry.into())) + }, + Some(_) => unimplemented!("unimplemented queue strategy"), + }; + } + + // If there are no unnotified entries, or if using LIFO strategy, this is the first one. + if self.strategy == Lifo { + self.next = self.head; + } else if self.next.is_none() { + self.next = self.tail; + } + + // Bump the entry count. + self.len += 1; + } + fn remove( &mut self, mut listener: Pin<&mut Option>>, @@ -413,7 +434,7 @@ mod tests { #[test] fn insert() { - let inner = crate::Inner::new(); + let inner = crate::Inner::new(QueueStrategy::Fifo); make_listeners!(listen1, listen2, listen3); // Register the listeners. @@ -434,7 +455,7 @@ mod tests { #[test] fn drop_non_notified() { - let inner = crate::Inner::new(); + let inner = crate::Inner::new(QueueStrategy::Fifo); make_listeners!(listen1, listen2, listen3); // Register the listeners. diff --git a/src/lib.rs b/src/lib.rs index cb6cdc0..1a12929 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -129,6 +129,16 @@ use sync::WithMut; use notify::NotificationPrivate; pub use notify::{IntoNotification, Notification}; +/// Queuing strategy for listeners. +#[derive(Clone, Copy, Debug, PartialEq)] +pub enum QueueStrategy { + /// First-in-first-out listeners are added to the back of the list. + Fifo, + + /// Last-in-first-out listeners are added to the front of the list. + Lifo, +} + /// Inner state of [`Event`]. struct Inner { /// The number of notified entries, or `usize::MAX` if all of them have been notified. @@ -145,10 +155,10 @@ struct Inner { } impl Inner { - fn new() -> Self { + fn new(queue_strategy: QueueStrategy) -> Self { Self { notified: AtomicUsize::new(usize::MAX), - list: sys::List::new(), + list: sys::List::new(queue_strategy), } } } @@ -179,6 +189,11 @@ pub struct Event { /// is an `Arc` so it's important to keep in mind that it contributes to the [`Arc`]'s /// reference count. inner: AtomicPtr>, + + /// Queuing strategy. + /// + /// Listeners waiting for notification will be arranged according to the strategy. + queue_strategy: QueueStrategy, } unsafe impl Send for Event {} @@ -240,6 +255,7 @@ impl Event { pub const fn with_tag() -> Self { Self { inner: AtomicPtr::new(ptr::null_mut()), + queue_strategy: QueueStrategy::Fifo, } } #[cfg(all(feature = "std", loom))] @@ -247,6 +263,7 @@ impl Event { pub fn with_tag() -> Self { Self { inner: AtomicPtr::new(ptr::null_mut()), + queue_strategy: QueueStrategy::Fifo, } } @@ -473,7 +490,7 @@ impl Event { // If this is the first use, initialize the state. if inner.is_null() { // Allocate the state on the heap. - let new = Arc::new(Inner::::new()); + let new = Arc::new(Inner::::new(self.queue_strategy)); // Convert the state to a raw pointer. let new = Arc::into_raw(new) as *mut Inner; @@ -558,16 +575,39 @@ impl Event<()> { #[inline] #[cfg(not(loom))] pub const fn new() -> Self { + Self::new_with_queue_strategy(QueueStrategy::Fifo) + } + + #[inline] + #[cfg(loom)] + pub fn new() -> Self { + Self::new_with_queue_strategy(QueueStrategy::Fifo) + } + + /// Creates a new [`Event`] with specific queue strategy. + /// + /// # Examples + /// + /// ``` + /// use event_listener::{Event, QueueStrategy}; + /// + /// let event = Event::new_with_queue_strategy(QueueStrategy::Fifo); + /// ``` + #[inline] + #[cfg(not(loom))] + pub const fn new_with_queue_strategy(queue_strategy: QueueStrategy) -> Self { Self { inner: AtomicPtr::new(ptr::null_mut()), + queue_strategy, } } #[inline] #[cfg(loom)] - pub fn new() -> Self { + pub fn new_with_queue_strategy(queue_strategy: QueueStrategy) -> Self { Self { inner: AtomicPtr::new(ptr::null_mut()), + queue_strategy, } } diff --git a/src/slab.rs b/src/slab.rs index 59e1c21..11e9a12 100644 --- a/src/slab.rs +++ b/src/slab.rs @@ -18,7 +18,7 @@ use crate::notify::{GenericNotify, Internal, Notification}; use crate::sync::atomic::{AtomicBool, Ordering}; use crate::sync::cell::{Cell, ConstPtr, UnsafeCell}; use crate::sync::Arc; -use crate::{RegisterResult, State, Task, TaskRef}; +use crate::{QueueStrategy, RegisterResult, State, Task, TaskRef}; use core::fmt; use core::marker::PhantomData; @@ -229,7 +229,12 @@ pub(crate) struct List { } impl List { - pub(super) fn new() -> List { + pub(super) fn new(strategy: QueueStrategy) -> List { + debug_assert!( + strategy == QueueStrategy::Fifo, + "Slab list only supports FIFO strategy" + ); + List { inner: Mutex::new(ListenerSlab::new()), queue: concurrent_queue::ConcurrentQueue::unbounded(), @@ -1362,7 +1367,7 @@ mod tests { #[test] fn uncontended_inner() { - let inner = crate::Inner::new(); + let inner = crate::Inner::new(QueueStrategy::Fifo); // Register two listeners. let (mut listener1, mut listener2, mut listener3) = (None, None, None); diff --git a/tests/notify.rs b/tests/notify.rs index c37dc9a..4ad4fe9 100644 --- a/tests/notify.rs +++ b/tests/notify.rs @@ -3,7 +3,7 @@ use std::pin::Pin; use std::sync::{Arc, Mutex}; use std::task::Context; -use event_listener::{Event, EventListener}; +use event_listener::{Event, EventListener, QueueStrategy}; use waker_fn::waker_fn; #[cfg(target_family = "wasm")] @@ -17,8 +17,17 @@ fn is_notified(listener: &mut EventListener) -> bool { } #[test] -fn notify() { - let event = Event::new(); +fn notify_fifo() { + notify(QueueStrategy::Fifo) +} + +#[test] +fn notify_lifo() { + notify(QueueStrategy::Lifo) +} + +fn notify(queue_strategy: QueueStrategy) { + let event = Event::new_with_queue_strategy(queue_strategy); let mut l1 = event.listen(); let mut l2 = event.listen(); @@ -31,14 +40,32 @@ fn notify() { assert_eq!(event.notify(2), 2); assert_eq!(event.notify(1), 0); - assert!(is_notified(&mut l1)); - assert!(is_notified(&mut l2)); - assert!(!is_notified(&mut l3)); + match queue_strategy { + QueueStrategy::Fifo => { + assert!(is_notified(&mut l1)); + assert!(is_notified(&mut l2)); + assert!(!is_notified(&mut l3)); + } + QueueStrategy::Lifo => { + assert!(is_notified(&mut l3)); + assert!(is_notified(&mut l2)); + assert!(!is_notified(&mut l1)); + } + } } #[test] -fn notify_additional() { - let event = Event::new(); +fn notify_additional_fifo() { + notify_additional(QueueStrategy::Fifo) +} + +#[test] +fn notify_additional_lifo() { + notify_additional(QueueStrategy::Lifo) +} + +fn notify_additional(queue_strategy: QueueStrategy) { + let event = Event::new_with_queue_strategy(queue_strategy); let mut l1 = event.listen(); let mut l2 = event.listen(); @@ -48,14 +75,32 @@ fn notify_additional() { assert_eq!(event.notify(1), 0); assert_eq!(event.notify_additional(1), 1); - assert!(is_notified(&mut l1)); - assert!(is_notified(&mut l2)); - assert!(!is_notified(&mut l3)); + match queue_strategy { + QueueStrategy::Fifo => { + assert!(is_notified(&mut l1)); + assert!(is_notified(&mut l2)); + assert!(!is_notified(&mut l3)); + } + QueueStrategy::Lifo => { + assert!(is_notified(&mut l3)); + assert!(is_notified(&mut l2)); + assert!(!is_notified(&mut l1)); + } + } } #[test] -fn notify_one() { - let event = Event::new(); +fn notify_one_fifo() { + notify_one(QueueStrategy::Fifo) +} + +#[test] +fn notify_one_lifo() { + notify_one(QueueStrategy::Lifo) +} + +fn notify_one(queue_strategy: QueueStrategy) { + let event = Event::new_with_queue_strategy(queue_strategy); let mut l1 = event.listen(); let mut l2 = event.listen(); @@ -64,16 +109,36 @@ fn notify_one() { assert!(!is_notified(&mut l2)); assert_eq!(event.notify(1), 1); - assert!(is_notified(&mut l1)); - assert!(!is_notified(&mut l2)); + match queue_strategy { + QueueStrategy::Fifo => { + assert!(is_notified(&mut l1)); + assert!(!is_notified(&mut l2)); + } + QueueStrategy::Lifo => { + assert!(is_notified(&mut l2)); + assert!(!is_notified(&mut l1)); + } + } assert_eq!(event.notify(1), 1); - assert!(is_notified(&mut l2)); + match queue_strategy { + QueueStrategy::Fifo => assert!(is_notified(&mut l2)), + QueueStrategy::Lifo => assert!(is_notified(&mut l1)), + } } #[test] -fn notify_all() { - let event = Event::new(); +fn notify_all_fifo() { + notify_all(QueueStrategy::Fifo) +} + +#[test] +fn notify_all_lifo() { + notify_all(QueueStrategy::Lifo) +} + +fn notify_all(queue_strategy: QueueStrategy) { + let event = Event::new_with_queue_strategy(queue_strategy); let mut l1 = event.listen(); let mut l2 = event.listen(); @@ -87,8 +152,8 @@ fn notify_all() { } #[test] -fn drop_notified() { - let event = Event::new(); +fn drop_notified_fifo() { + let event = Event::new_with_queue_strategy(QueueStrategy::Fifo); let l1 = event.listen(); let mut l2 = event.listen(); @@ -101,8 +166,22 @@ fn drop_notified() { } #[test] -fn drop_notified2() { - let event = Event::new(); +fn drop_notified_lifo() { + let event = Event::new_with_queue_strategy(QueueStrategy::Lifo); + + let mut l1 = event.listen(); + let mut l2 = event.listen(); + let l3 = event.listen(); + + assert_eq!(event.notify(1), 1); + drop(l3); + assert!(is_notified(&mut l2)); + assert!(!is_notified(&mut l1)); +} + +#[test] +fn drop_notified2_fifo() { + let event = Event::new_with_queue_strategy(QueueStrategy::Fifo); let l1 = event.listen(); let mut l2 = event.listen(); @@ -115,8 +194,22 @@ fn drop_notified2() { } #[test] -fn drop_notified_additional() { - let event = Event::new(); +fn drop_notified2_lifo() { + let event = Event::new_with_queue_strategy(QueueStrategy::Lifo); + + let mut l1 = event.listen(); + let mut l2 = event.listen(); + let l3 = event.listen(); + + assert_eq!(event.notify(2), 2); + drop(l3); + assert!(is_notified(&mut l2)); + assert!(!is_notified(&mut l1)); +} + +#[test] +fn drop_notified_additional_fifo() { + let event = Event::new_with_queue_strategy(QueueStrategy::Fifo); let l1 = event.listen(); let mut l2 = event.listen(); @@ -132,8 +225,25 @@ fn drop_notified_additional() { } #[test] -fn drop_non_notified() { - let event = Event::new(); +fn drop_notified_additional_lifo() { + let event = Event::new_with_queue_strategy(QueueStrategy::Lifo); + + let mut l1 = event.listen(); + let mut l2 = event.listen(); + let mut l3 = event.listen(); + let l4 = event.listen(); + + assert_eq!(event.notify_additional(1), 1); + assert_eq!(event.notify(2), 1); + drop(l4); + assert!(is_notified(&mut l3)); + assert!(is_notified(&mut l2)); + assert!(!is_notified(&mut l1)); +} + +#[test] +fn drop_non_notified_fifo() { + let event = Event::new_with_queue_strategy(QueueStrategy::Fifo); let mut l1 = event.listen(); let mut l2 = event.listen(); @@ -146,8 +256,31 @@ fn drop_non_notified() { } #[test] -fn notify_all_fair() { - let event = Event::new(); +fn drop_non_notified_lifo() { + let event = Event::new_with_queue_strategy(QueueStrategy::Lifo); + + let l1 = event.listen(); + let mut l2 = event.listen(); + let mut l3 = event.listen(); + + assert_eq!(event.notify(1), 1); + drop(l1); + assert!(is_notified(&mut l3)); + assert!(!is_notified(&mut l2)); +} + +#[test] +fn notify_all_fair_fifo() { + notify_all_fair(QueueStrategy::Fifo) +} + +#[test] +fn notify_all_fair_lifo() { + notify_all_fair(QueueStrategy::Lifo) +} + +fn notify_all_fair(queue_strategy: QueueStrategy) { + let event = Event::new_with_queue_strategy(queue_strategy); let v = Arc::new(Mutex::new(vec![])); let mut l1 = event.listen(); @@ -178,7 +311,11 @@ fn notify_all_fair() { .is_pending()); assert_eq!(event.notify(usize::MAX), 3); - assert_eq!(&*v.lock().unwrap(), &[1, 2, 3]); + + match queue_strategy { + QueueStrategy::Fifo => assert_eq!(&*v.lock().unwrap(), &[1, 2, 3]), + QueueStrategy::Lifo => assert_eq!(&*v.lock().unwrap(), &[3, 2, 1]), + } assert!(Pin::new(&mut l1) .poll(&mut Context::from_waker(&waker1))