Skip to content

Commit 834e1f0

Browse files
committed
feat: Add queuing strategies for listener lists.
Two strategies are available: - FIFO: The original round-robin queuing; listeners are inserted at the back. - LIFO: The new most-recent queuing; listeners are inserted at the front. LIFO queuing is beneficial for cache-efficiency with workloads that are tolerant of starvation. The same listener is repeatedly drawn from the list until the load dictates additional listeners be drawn from the list. These listeners expand outward as a "hot set" for optimal reuse of resources rather than continuously drawing from the coldest resources in a FIFO schedule. Signed-off-by: Jason Volk <[email protected]>
1 parent 0c18ca2 commit 834e1f0

File tree

5 files changed

+292
-74
lines changed

5 files changed

+292
-74
lines changed

benches/bench.rs

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
use std::iter;
22

33
use criterion::{criterion_group, criterion_main, Criterion};
4-
use event_listener::{Event, Listener};
4+
use event_listener::{Event, Listener, QueueStrategy};
55

66
const COUNT: usize = 8000;
77

@@ -20,6 +20,21 @@ fn bench_events(c: &mut Criterion) {
2020
}
2121
});
2222
});
23+
24+
c.bench_function("notify_and_wait_lifo", |b| {
25+
let ev = Event::new_with_queue_strategy(QueueStrategy::Lifo);
26+
let mut handles = Vec::with_capacity(COUNT);
27+
28+
b.iter(|| {
29+
handles.extend(iter::repeat_with(|| ev.listen()).take(COUNT));
30+
31+
ev.notify(COUNT);
32+
33+
for handle in handles.drain(..) {
34+
handle.wait();
35+
}
36+
});
37+
});
2338
}
2439

2540
criterion_group!(benches, bench_events);

src/intrusive.rs

Lines changed: 58 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
use crate::notify::{GenericNotify, Internal, Notification};
77
use crate::sync::atomic::Ordering;
88
use crate::sync::cell::{Cell, UnsafeCell};
9-
use crate::{RegisterResult, State, TaskRef};
9+
use crate::{QueueStrategy, RegisterResult, State, TaskRef};
1010

1111
#[cfg(feature = "critical-section")]
1212
use core::cell::RefCell;
@@ -42,17 +42,21 @@ struct Inner<T> {
4242

4343
/// The number of notified listeners.
4444
notified: usize,
45+
46+
/// Strategy by which the list is organized.
47+
strategy: QueueStrategy,
4548
}
4649

4750
impl<T> List<T> {
4851
/// Create a new, empty event listener list.
49-
pub(super) fn new() -> Self {
52+
pub(super) fn new(strategy: QueueStrategy) -> Self {
5053
let inner = Inner {
5154
head: None,
5255
tail: None,
5356
next: None,
5457
len: 0,
5558
notified: 0,
59+
strategy,
5660
};
5761

5862
#[cfg(feature = "critical-section")]
@@ -149,39 +153,9 @@ impl<T> crate::Inner<T> {
149153
})
150154
}
151155

152-
/// Add a new listener to the list.
153-
pub(crate) fn insert(&self, mut listener: Pin<&mut Option<Listener<T>>>) {
154-
self.with_inner(|inner| {
155-
listener.as_mut().set(Some(Listener {
156-
link: UnsafeCell::new(Link {
157-
state: Cell::new(State::Created),
158-
prev: Cell::new(inner.tail),
159-
next: Cell::new(None),
160-
}),
161-
_pin: PhantomPinned,
162-
}));
163-
let listener = listener.as_pin_mut().unwrap();
164-
165-
{
166-
let entry_guard = listener.link.get();
167-
// SAFETY: We are locked, so we can access the inner `link`.
168-
let entry = unsafe { entry_guard.deref() };
169-
170-
// Replace the tail with the new entry.
171-
match inner.tail.replace(entry.into()) {
172-
None => inner.head = Some(entry.into()),
173-
Some(t) => unsafe { t.as_ref().next.set(Some(entry.into())) },
174-
};
175-
}
176-
177-
// If there are no unnotified entries, this is the first one.
178-
if inner.next.is_none() {
179-
inner.next = inner.tail;
180-
}
181-
182-
// Bump the entry count.
183-
inner.len += 1;
184-
});
156+
/// Adds a listener to the list.
157+
pub(crate) fn insert(&self, listener: Pin<&mut Option<Listener<T>>>) {
158+
self.with_inner(|inner| inner.insert(listener))
185159
}
186160

187161
/// Remove a listener from the list.
@@ -248,6 +222,53 @@ impl<T> crate::Inner<T> {
248222
}
249223

250224
impl<T> Inner<T> {
225+
fn insert(&mut self, mut listener: Pin<&mut Option<Listener<T>>>) {
226+
use QueueStrategy::{Fifo, Lifo};
227+
228+
listener.as_mut().set(Some(Listener {
229+
link: UnsafeCell::new(Link {
230+
state: Cell::new(State::Created),
231+
prev: Cell::new(self.tail.filter(|_| self.strategy == Fifo)),
232+
next: Cell::new(self.head.filter(|_| self.strategy == Lifo)),
233+
}),
234+
_pin: PhantomPinned,
235+
}));
236+
let listener = listener.as_pin_mut().unwrap();
237+
238+
{
239+
let entry_guard = listener.link.get();
240+
// SAFETY: We are locked, so we can access the inner `link`.
241+
let entry = unsafe { entry_guard.deref() };
242+
243+
// Replace the head or tail with the new entry.
244+
let replacing = match self.strategy {
245+
Lifo => &mut self.head,
246+
Fifo => &mut self.tail,
247+
};
248+
249+
match replacing.replace(entry.into()) {
250+
None => *replacing = Some(entry.into()),
251+
Some(t) if self.strategy == Lifo => unsafe {
252+
t.as_ref().prev.set(Some(entry.into()))
253+
},
254+
Some(t) if self.strategy == Fifo => unsafe {
255+
t.as_ref().next.set(Some(entry.into()))
256+
},
257+
Some(_) => unimplemented!("unimplemented queue strategy"),
258+
};
259+
}
260+
261+
// If there are no unnotified entries, or if using LIFO strategy, this is the first one.
262+
if self.strategy == Lifo {
263+
self.next = self.head;
264+
} else if self.next.is_none() {
265+
self.next = self.tail;
266+
}
267+
268+
// Bump the entry count.
269+
self.len += 1;
270+
}
271+
251272
fn remove(
252273
&mut self,
253274
mut listener: Pin<&mut Option<Listener<T>>>,
@@ -413,7 +434,7 @@ mod tests {
413434

414435
#[test]
415436
fn insert() {
416-
let inner = crate::Inner::new();
437+
let inner = crate::Inner::new(QueueStrategy::Fifo);
417438
make_listeners!(listen1, listen2, listen3);
418439

419440
// Register the listeners.
@@ -434,7 +455,7 @@ mod tests {
434455

435456
#[test]
436457
fn drop_non_notified() {
437-
let inner = crate::Inner::new();
458+
let inner = crate::Inner::new(QueueStrategy::Fifo);
438459
make_listeners!(listen1, listen2, listen3);
439460

440461
// Register the listeners.

src/lib.rs

Lines changed: 44 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -129,6 +129,16 @@ use sync::WithMut;
129129
use notify::NotificationPrivate;
130130
pub use notify::{IntoNotification, Notification};
131131

132+
/// Queuing strategy for listeners.
133+
#[derive(Clone, Copy, Debug, PartialEq)]
134+
pub enum QueueStrategy {
135+
/// First-in-first-out listeners are added to the back of the list.
136+
Fifo,
137+
138+
/// Last-in-first-out listeners are added to the front of the list.
139+
Lifo,
140+
}
141+
132142
/// Inner state of [`Event`].
133143
struct Inner<T> {
134144
/// The number of notified entries, or `usize::MAX` if all of them have been notified.
@@ -145,10 +155,10 @@ struct Inner<T> {
145155
}
146156

147157
impl<T> Inner<T> {
148-
fn new() -> Self {
158+
fn new(queue_strategy: QueueStrategy) -> Self {
149159
Self {
150160
notified: AtomicUsize::new(usize::MAX),
151-
list: sys::List::new(),
161+
list: sys::List::new(queue_strategy),
152162
}
153163
}
154164
}
@@ -179,6 +189,11 @@ pub struct Event<T = ()> {
179189
/// is an `Arc<Inner>` so it's important to keep in mind that it contributes to the [`Arc`]'s
180190
/// reference count.
181191
inner: AtomicPtr<Inner<T>>,
192+
193+
/// Queuing strategy.
194+
///
195+
/// Listeners waiting for notification will be arranged according to the strategy.
196+
queue_strategy: QueueStrategy,
182197
}
183198

184199
unsafe impl<T: Send> Send for Event<T> {}
@@ -240,13 +255,15 @@ impl<T> Event<T> {
240255
pub const fn with_tag() -> Self {
241256
Self {
242257
inner: AtomicPtr::new(ptr::null_mut()),
258+
queue_strategy: QueueStrategy::Fifo,
243259
}
244260
}
245261
#[cfg(all(feature = "std", loom))]
246262
#[inline]
247263
pub fn with_tag() -> Self {
248264
Self {
249265
inner: AtomicPtr::new(ptr::null_mut()),
266+
queue_strategy: QueueStrategy::Fifo,
250267
}
251268
}
252269

@@ -473,7 +490,7 @@ impl<T> Event<T> {
473490
// If this is the first use, initialize the state.
474491
if inner.is_null() {
475492
// Allocate the state on the heap.
476-
let new = Arc::new(Inner::<T>::new());
493+
let new = Arc::new(Inner::<T>::new(self.queue_strategy));
477494

478495
// Convert the state to a raw pointer.
479496
let new = Arc::into_raw(new) as *mut Inner<T>;
@@ -558,16 +575,39 @@ impl Event<()> {
558575
#[inline]
559576
#[cfg(not(loom))]
560577
pub const fn new() -> Self {
578+
Self::new_with_queue_strategy(QueueStrategy::Fifo)
579+
}
580+
581+
#[inline]
582+
#[cfg(loom)]
583+
pub fn new() -> Self {
584+
Self::new_with_queue_strategy(QueueStrategy::Fifo)
585+
}
586+
587+
/// Creates a new [`Event`] with specific queue strategy.
588+
///
589+
/// # Examples
590+
///
591+
/// ```
592+
/// use event_listener::{Event, QueueStrategy};
593+
///
594+
/// let event = Event::new_with_queue_strategy(QueueStrategy::Fifo);
595+
/// ```
596+
#[inline]
597+
#[cfg(not(loom))]
598+
pub const fn new_with_queue_strategy(queue_strategy: QueueStrategy) -> Self {
561599
Self {
562600
inner: AtomicPtr::new(ptr::null_mut()),
601+
queue_strategy,
563602
}
564603
}
565604

566605
#[inline]
567606
#[cfg(loom)]
568-
pub fn new() -> Self {
607+
pub fn new_with_queue_strategy(queue_strategy: QueueStrategy) -> Self {
569608
Self {
570609
inner: AtomicPtr::new(ptr::null_mut()),
610+
queue_strategy,
571611
}
572612
}
573613

src/slab.rs

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ use crate::notify::{GenericNotify, Internal, Notification};
1818
use crate::sync::atomic::{AtomicBool, Ordering};
1919
use crate::sync::cell::{Cell, ConstPtr, UnsafeCell};
2020
use crate::sync::Arc;
21-
use crate::{RegisterResult, State, Task, TaskRef};
21+
use crate::{QueueStrategy, RegisterResult, State, Task, TaskRef};
2222

2323
use core::fmt;
2424
use core::marker::PhantomData;
@@ -229,7 +229,12 @@ pub(crate) struct List<T> {
229229
}
230230

231231
impl<T> List<T> {
232-
pub(super) fn new() -> List<T> {
232+
pub(super) fn new(strategy: QueueStrategy) -> List<T> {
233+
debug_assert!(
234+
strategy == QueueStrategy::Fifo,
235+
"Slab list only supports FIFO strategy"
236+
);
237+
233238
List {
234239
inner: Mutex::new(ListenerSlab::new()),
235240
queue: concurrent_queue::ConcurrentQueue::unbounded(),
@@ -1362,7 +1367,7 @@ mod tests {
13621367

13631368
#[test]
13641369
fn uncontended_inner() {
1365-
let inner = crate::Inner::new();
1370+
let inner = crate::Inner::new(QueueStrategy::Fifo);
13661371

13671372
// Register two listeners.
13681373
let (mut listener1, mut listener2, mut listener3) = (None, None, None);

0 commit comments

Comments
 (0)