Skip to content

Commit b817462

Browse files
committed
feat: Add queuing strategies for listener lists.
Two strategies are available: - FIFO: The original round-robin queuing; listeners are inserted at the back. - LIFO: The new most-recent queuing; listeners are inserted at the front. LIFO queuing is beneficial for cache-efficiency with workloads that are tolerant of starvation. The same listener is repeatedly drawn from the list until the load dictates additional listeners be drawn from the list. These listeners expand outward as a "hot set" for optimal reuse of resources rather than continuously drawing from the coldest resources in a FIFO schedule. Signed-off-by: Jason Volk <[email protected]>
1 parent a8aefba commit b817462

File tree

3 files changed

+110
-44
lines changed

3 files changed

+110
-44
lines changed

src/intrusive.rs

Lines changed: 58 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
use crate::notify::{GenericNotify, Internal, Notification};
77
use crate::sync::atomic::Ordering;
88
use crate::sync::cell::{Cell, UnsafeCell};
9-
use crate::{RegisterResult, State, TaskRef};
9+
use crate::{QueueStrategy, RegisterResult, State, TaskRef};
1010

1111
#[cfg(feature = "critical-section")]
1212
use core::cell::RefCell;
@@ -42,17 +42,21 @@ struct Inner<T> {
4242

4343
/// The number of notified listeners.
4444
notified: usize,
45+
46+
/// Strategy by which the list is organized.
47+
strategy: QueueStrategy,
4548
}
4649

4750
impl<T> List<T> {
4851
/// Create a new, empty event listener list.
49-
pub(super) fn new() -> Self {
52+
pub(super) fn new(strategy: QueueStrategy) -> Self {
5053
let inner = Inner {
5154
head: None,
5255
tail: None,
5356
next: None,
5457
len: 0,
5558
notified: 0,
59+
strategy,
5660
};
5761

5862
#[cfg(feature = "critical-section")]
@@ -149,39 +153,9 @@ impl<T> crate::Inner<T> {
149153
})
150154
}
151155

152-
/// Add a new listener to the list.
153-
pub(crate) fn insert(&self, mut listener: Pin<&mut Option<Listener<T>>>) {
154-
self.with_inner(|inner| {
155-
listener.as_mut().set(Some(Listener {
156-
link: UnsafeCell::new(Link {
157-
state: Cell::new(State::Created),
158-
prev: Cell::new(inner.tail),
159-
next: Cell::new(None),
160-
}),
161-
_pin: PhantomPinned,
162-
}));
163-
let listener = listener.as_pin_mut().unwrap();
164-
165-
{
166-
let entry_guard = listener.link.get();
167-
// SAFETY: We are locked, so we can access the inner `link`.
168-
let entry = unsafe { entry_guard.deref() };
169-
170-
// Replace the tail with the new entry.
171-
match mem::replace(&mut inner.tail, Some(entry.into())) {
172-
None => inner.head = Some(entry.into()),
173-
Some(t) => unsafe { t.as_ref().next.set(Some(entry.into())) },
174-
};
175-
}
176-
177-
// If there are no unnotified entries, this is the first one.
178-
if inner.next.is_none() {
179-
inner.next = inner.tail;
180-
}
181-
182-
// Bump the entry count.
183-
inner.len += 1;
184-
});
156+
/// Adds a listener to the list.
157+
pub(crate) fn insert(&self, listener: Pin<&mut Option<Listener<T>>>) {
158+
self.with_inner(|inner| inner.insert(listener))
185159
}
186160

187161
/// Remove a listener from the list.
@@ -248,6 +222,53 @@ impl<T> crate::Inner<T> {
248222
}
249223

250224
impl<T> Inner<T> {
225+
fn insert(&mut self, mut listener: Pin<&mut Option<Listener<T>>>) {
226+
use QueueStrategy::{Fifo, Lifo};
227+
228+
listener.as_mut().set(Some(Listener {
229+
link: UnsafeCell::new(Link {
230+
state: Cell::new(State::Created),
231+
prev: Cell::new(self.tail.filter(|_| self.strategy == Fifo)),
232+
next: Cell::new(self.head.filter(|_| self.strategy == Lifo)),
233+
}),
234+
_pin: PhantomPinned,
235+
}));
236+
let listener = listener.as_pin_mut().unwrap();
237+
238+
{
239+
let entry_guard = listener.link.get();
240+
// SAFETY: We are locked, so we can access the inner `link`.
241+
let entry = unsafe { entry_guard.deref() };
242+
243+
// Replace the head or tail with the new entry.
244+
let replacing = match self.strategy {
245+
Lifo => &mut self.head,
246+
Fifo => &mut self.tail,
247+
};
248+
249+
match replacing.replace(entry.into()) {
250+
None => *replacing = Some(entry.into()),
251+
Some(t) if self.strategy == Lifo => unsafe {
252+
t.as_ref().prev.set(Some(entry.into()))
253+
},
254+
Some(t) if self.strategy == Fifo => unsafe {
255+
t.as_ref().next.set(Some(entry.into()))
256+
},
257+
Some(_) => unimplemented!("unimplemented queue strategy"),
258+
};
259+
}
260+
261+
// If there are no unnotified entries, or if using LIFO strategy, this is the first one.
262+
if self.strategy == Lifo {
263+
self.next = self.head;
264+
} else if self.next.is_none() {
265+
self.next = self.tail;
266+
}
267+
268+
// Bump the entry count.
269+
self.len += 1;
270+
}
271+
251272
fn remove(
252273
&mut self,
253274
mut listener: Pin<&mut Option<Listener<T>>>,
@@ -413,7 +434,7 @@ mod tests {
413434

414435
#[test]
415436
fn insert() {
416-
let inner = crate::Inner::new();
437+
let inner = crate::Inner::new(QueueStrategy::Fifo);
417438
make_listeners!(listen1, listen2, listen3);
418439

419440
// Register the listeners.
@@ -434,7 +455,7 @@ mod tests {
434455

435456
#[test]
436457
fn drop_non_notified() {
437-
let inner = crate::Inner::new();
458+
let inner = crate::Inner::new(QueueStrategy::Fifo);
438459
make_listeners!(listen1, listen2, listen3);
439460

440461
// Register the listeners.

src/lib.rs

Lines changed: 44 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,16 @@ use sync::WithMut;
127127
use notify::NotificationPrivate;
128128
pub use notify::{IntoNotification, Notification};
129129

130+
/// Queuing strategy for listeners.
131+
#[derive(Clone, Copy, Debug, PartialEq)]
132+
pub enum QueueStrategy {
133+
/// First-in-first-out listeners are added to the back of the list.
134+
Fifo,
135+
136+
/// Last-in-first-out listeners are added to the front of the list.
137+
Lifo,
138+
}
139+
130140
/// Inner state of [`Event`].
131141
struct Inner<T> {
132142
/// The number of notified entries, or `usize::MAX` if all of them have been notified.
@@ -143,10 +153,10 @@ struct Inner<T> {
143153
}
144154

145155
impl<T> Inner<T> {
146-
fn new() -> Self {
156+
fn new(queue_strategy: QueueStrategy) -> Self {
147157
Self {
148158
notified: AtomicUsize::new(usize::MAX),
149-
list: sys::List::new(),
159+
list: sys::List::new(queue_strategy),
150160
}
151161
}
152162
}
@@ -177,6 +187,11 @@ pub struct Event<T = ()> {
177187
/// is an `Arc<Inner>` so it's important to keep in mind that it contributes to the [`Arc`]'s
178188
/// reference count.
179189
inner: AtomicPtr<Inner<T>>,
190+
191+
/// Queuing strategy.
192+
///
193+
/// Listeners waiting for notification will be arranged according to the strategy.
194+
queue_strategy: QueueStrategy,
180195
}
181196

182197
unsafe impl<T: Send> Send for Event<T> {}
@@ -238,13 +253,15 @@ impl<T> Event<T> {
238253
pub const fn with_tag() -> Self {
239254
Self {
240255
inner: AtomicPtr::new(ptr::null_mut()),
256+
queue_strategy: QueueStrategy::Fifo,
241257
}
242258
}
243259
#[cfg(all(feature = "std", loom))]
244260
#[inline]
245261
pub fn with_tag() -> Self {
246262
Self {
247263
inner: AtomicPtr::new(ptr::null_mut()),
264+
queue_strategy: QueueStrategy::Fifo,
248265
}
249266
}
250267

@@ -471,7 +488,7 @@ impl<T> Event<T> {
471488
// If this is the first use, initialize the state.
472489
if inner.is_null() {
473490
// Allocate the state on the heap.
474-
let new = Arc::new(Inner::<T>::new());
491+
let new = Arc::new(Inner::<T>::new(self.queue_strategy));
475492

476493
// Convert the state to a raw pointer.
477494
let new = Arc::into_raw(new) as *mut Inner<T>;
@@ -556,16 +573,39 @@ impl Event<()> {
556573
#[inline]
557574
#[cfg(not(loom))]
558575
pub const fn new() -> Self {
576+
Self::new_with_queue_strategy(QueueStrategy::Fifo)
577+
}
578+
579+
#[inline]
580+
#[cfg(loom)]
581+
pub fn new() -> Self {
582+
Self::new_with_queue_strategy(QueueStrategy::Fifo)
583+
}
584+
585+
/// Creates a new [`Event`] with specific queue strategy.
586+
///
587+
/// # Examples
588+
///
589+
/// ```
590+
/// use event_listener::{Event, QueueStrategy};
591+
///
592+
/// let event = Event::new_with_queue_strategy(QueueStrategy::Fifo);
593+
/// ```
594+
#[inline]
595+
#[cfg(not(loom))]
596+
pub const fn new_with_queue_strategy(queue_strategy: QueueStrategy) -> Self {
559597
Self {
560598
inner: AtomicPtr::new(ptr::null_mut()),
599+
queue_strategy,
561600
}
562601
}
563602

564603
#[inline]
565604
#[cfg(loom)]
566-
pub fn new() -> Self {
605+
pub fn new_with_queue_strategy(queue_strategy: QueueStrategy) -> Self {
567606
Self {
568607
inner: AtomicPtr::new(ptr::null_mut()),
608+
queue_strategy,
569609
}
570610
}
571611

src/slab.rs

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ use crate::notify::{GenericNotify, Internal, Notification};
1818
use crate::sync::atomic::{AtomicBool, Ordering};
1919
use crate::sync::cell::{Cell, ConstPtr, UnsafeCell};
2020
use crate::sync::Arc;
21-
use crate::{RegisterResult, State, Task, TaskRef};
21+
use crate::{QueueStrategy, RegisterResult, State, Task, TaskRef};
2222

2323
use core::fmt;
2424
use core::marker::PhantomData;
@@ -229,7 +229,12 @@ pub(crate) struct List<T> {
229229
}
230230

231231
impl<T> List<T> {
232-
pub(super) fn new() -> List<T> {
232+
pub(super) fn new(strategy: QueueStrategy) -> List<T> {
233+
debug_assert!(
234+
strategy == QueueStrategy::Fifo,
235+
"Slab list only supports FIFO strategy"
236+
);
237+
233238
List {
234239
inner: Mutex::new(ListenerSlab::new()),
235240
queue: concurrent_queue::ConcurrentQueue::unbounded(),
@@ -1362,7 +1367,7 @@ mod tests {
13621367

13631368
#[test]
13641369
fn uncontended_inner() {
1365-
let inner = crate::Inner::new();
1370+
let inner = crate::Inner::new(QueueStrategy::Fifo);
13661371

13671372
// Register two listeners.
13681373
let (mut listener1, mut listener2, mut listener3) = (None, None, None);

0 commit comments

Comments
 (0)