Skip to content

Commit 085fc02

Browse files
committed
Make waking more robust
This adds a new type, PollingState, that replaces the old is polling boolean. The old boolean was inherently racy because of the timing when the boolean was set, the waker checked the value and the actual polling began. PollingState keep tracks of previous calls to wake and lets the polling thread know if it's been awoken before the call to poll. If this is the case we'll only poll with a zero timeout to ensure it's quick to return, but it will submit all submissions and process any already ready completions.
2 parents 820fdbd + adbb4c5 commit 085fc02

File tree

8 files changed

+73
-22
lines changed

8 files changed

+73
-22
lines changed

src/io_uring/cq.rs

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -61,9 +61,14 @@ impl Completions {
6161
// If we have no completions we make a system call to wait for
6262
// completion events.
6363
log::trace!(timeout:?; "waiting for completion events");
64-
shared.is_polling.store(true, Ordering::Release);
64+
let timeout = if shared.polling.set_polling(true) {
65+
// Got woken up, so polling without a timeout.
66+
Some(Duration::ZERO)
67+
} else {
68+
timeout
69+
};
6570
let result = shared.enter(1, libc::IORING_ENTER_GETEVENTS, timeout);
66-
shared.is_polling.store(false, Ordering::Release);
71+
shared.polling.set_polling(false);
6772
result?;
6873
tail = load_kernel_shared(self.entries_tail);
6974
}

src/io_uring/mod.rs

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -7,11 +7,11 @@ use std::cmp::min;
77
use std::mem::{drop as unlock, swap, take};
88
use std::os::fd::{AsRawFd, OwnedFd, RawFd};
99
use std::sync::Mutex;
10-
use std::sync::atomic::{AtomicBool, AtomicU32, Ordering};
10+
use std::sync::atomic::{AtomicU32, Ordering};
1111
use std::time::Duration;
1212
use std::{ptr, task};
1313

14-
use crate::{asan, lock, syscall, try_lock};
14+
use crate::{PollingState, asan, lock, syscall, try_lock};
1515

1616
pub(crate) mod config;
1717
pub(crate) mod cq;
@@ -80,10 +80,7 @@ pub(crate) struct Shared {
8080
/// True if only a single thread can submit submissions, i.e. if
8181
/// `IORING_SETUP_SINGLE_ISSUER` is enabled.
8282
single_issuer: bool,
83-
/// Boolean indicating a thread is [`Ring::poll`]ing.
84-
///
85-
/// [`Ring::poll`]: crate::Ring::poll
86-
is_polling: AtomicBool,
83+
polling: PollingState,
8784
/// Futures that are waiting for a slot in submissions.
8885
blocked_futures: Mutex<Vec<task::Waker>>,
8986
/// File descriptor of the io_uring.
@@ -135,7 +132,7 @@ impl Shared {
135132
submissions_len: parameters.sq_entries,
136133
kernel_thread: (parameters.flags & libc::IORING_SETUP_SQPOLL) != 0,
137134
single_issuer: (parameters.flags & libc::IORING_SETUP_SINGLE_ISSUER) != 0,
138-
is_polling: AtomicBool::new(false),
135+
polling: PollingState::new(),
139136
blocked_futures: Mutex::new(Vec::new()),
140137
rfd,
141138
})

src/io_uring/sq.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,7 @@ impl Submissions {
8989

9090
pub(crate) fn wake(&self) -> io::Result<()> {
9191
log::trace!("waking up ring");
92-
if !self.shared.is_polling.load(Ordering::Acquire) {
92+
if !self.shared.polling.wake() {
9393
// If we're not polling we don't need to wake up.
9494
log::trace!("skipping ring message as it's not polling");
9595
return Ok(());

src/kqueue/config.rs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,10 @@
33
use std::marker::PhantomData;
44
use std::os::fd::{AsRawFd, FromRawFd, OwnedFd};
55
use std::sync::Mutex;
6-
use std::sync::atomic::AtomicBool;
76
use std::{io, mem, ptr};
87

98
use crate::kqueue::{Completions, Shared, Submissions, cq};
10-
use crate::syscall;
9+
use crate::{PollingState, syscall};
1110

1211
#[derive(Debug, Clone)]
1312
pub(crate) struct Config<'r> {
@@ -86,7 +85,7 @@ impl<'r> crate::Config<'r> {
8685
let shared = Shared {
8786
max_change_list_size: self.sys.max_change_list_size,
8887
change_list: Mutex::new(Vec::new()),
89-
is_polling: AtomicBool::new(false),
88+
polling: PollingState::new(),
9089
kq,
9190
};
9291
let submissions = Submissions::new(shared);

src/kqueue/cq.rs

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
use std::mem::{drop as unlock, take};
2-
use std::sync::atomic::Ordering;
32
use std::time::Duration;
43
use std::{cmp, io, ptr};
54

@@ -24,6 +23,13 @@ impl Completions {
2423
pub(crate) fn poll(&mut self, shared: &Shared, timeout: Option<Duration>) -> io::Result<()> {
2524
self.events.clear();
2625

26+
let timeout = if shared.polling.set_polling(true) {
27+
// Got woken up, so polling without a timeout.
28+
Some(Duration::ZERO)
29+
} else {
30+
timeout
31+
};
32+
2733
let ts = timeout.map(|to| libc::timespec {
2834
tv_sec: cmp::min(to.as_secs(), libc::time_t::MAX as u64).cast_signed(),
2935
// `Duration::subsec_nanos` is guaranteed to be less than one
@@ -43,9 +49,8 @@ impl Completions {
4349
unlock(change_list); // Unlock, to not block others.
4450

4551
log::trace!(submissions = changes.len(), timeout:?; "waiting for events");
46-
shared.is_polling.store(true, Ordering::Release);
4752
shared.kevent(&mut changes, UseEvents::Some(&mut self.events), ts.as_ref());
48-
shared.is_polling.store(false, Ordering::Release);
53+
shared.polling.set_polling(false);
4954
shared.reuse_change_list(changes);
5055
Ok(())
5156
}

src/kqueue/mod.rs

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,10 +9,9 @@
99
use std::mem::{drop as unlock, swap};
1010
use std::os::fd::{AsRawFd, OwnedFd};
1111
use std::sync::Mutex;
12-
use std::sync::atomic::AtomicBool;
1312
use std::{fmt, ptr, task};
1413

15-
use crate::{debug_detail, lock, syscall};
14+
use crate::{PollingState, debug_detail, lock, syscall};
1615

1716
pub(crate) mod config;
1817
mod cq;
@@ -40,8 +39,7 @@ pub(crate) struct Shared {
4039
max_change_list_size: u32,
4140
/// Batched events to register.
4241
change_list: Mutex<Vec<Event>>,
43-
/// Boolean indicating a thread is [`Ring::poll`]ing.
44-
is_polling: AtomicBool,
42+
polling: PollingState,
4543
/// kqueue(2) file descriptor.
4644
kq: OwnedFd,
4745
}

src/kqueue/sq.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@ use std::io;
22
use std::mem::{self, drop as unlock, take};
33
use std::os::fd::{AsRawFd, RawFd};
44
use std::sync::Arc;
5-
use std::sync::atomic::Ordering;
65

76
use crate::kqueue::{Event, Shared, UseEvents, cq};
87
use crate::lock;
@@ -83,7 +82,7 @@ impl Submissions {
8382

8483
#[allow(clippy::unnecessary_wraps)]
8584
pub(crate) fn wake(&self) -> io::Result<()> {
86-
if !self.shared.is_polling.load(Ordering::Acquire) {
85+
if !self.shared.polling.wake() {
8786
// If we're not polling we don't need to wake up.
8887
return Ok(());
8988
}

src/lib.rs

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,8 @@
7171
)))]
7272
compile_error!("OS not supported");
7373

74+
use std::fmt;
75+
use std::sync::atomic::{AtomicU8, Ordering};
7476
use std::time::Duration;
7577

7678
// This must come before the other modules for the documentation.
@@ -506,3 +508,49 @@ impl<T> OpPollResult<T> for Option<io::Result<T>> {
506508
None
507509
}
508510
}
511+
512+
/// Polling state of a [`Ring`].
513+
pub(crate) struct PollingState(AtomicU8);
514+
515+
const IS_POLLING: u8 = 0b01;
516+
#[allow(unused)] // Used in an assert below, but rustc doesn't seem to care.
517+
const NOT_POLLING: u8 = 0b00;
518+
const IS_AWOKEN: u8 = 0b10;
519+
const NOT_AWOKEN: u8 = 0b00;
520+
521+
impl PollingState {
522+
pub(crate) const fn new() -> PollingState {
523+
PollingState(AtomicU8::new(0))
524+
}
525+
526+
/// Set the state to currently (not) polling.
527+
///
528+
/// Returns a boolean indicating if the poll call should continue like
529+
/// normal, or should be cut short (e.g. with a zero timeout).
530+
#[allow(clippy::cast_lossless)]
531+
pub(crate) fn set_polling(&self, is_polling: bool) -> bool {
532+
const _BOOL_CAST_CHECK_TRUE: () = assert!(true as u8 == IS_POLLING);
533+
const _BOOL_CAST_CHECK_FALSE: () = assert!(false as u8 == NOT_POLLING);
534+
let state = self.0.swap(is_polling as u8 | NOT_AWOKEN, Ordering::AcqRel);
535+
(state & IS_AWOKEN) != 0
536+
}
537+
538+
/// Set the state to wake the polling thread.
539+
///
540+
/// Returns a boolean indicating if the caller should submit an event to
541+
/// wake up the polling thread.
542+
pub(crate) fn wake(&self) -> bool {
543+
let state = self.0.fetch_or(IS_AWOKEN, Ordering::AcqRel);
544+
state == (IS_POLLING | NOT_AWOKEN)
545+
}
546+
}
547+
548+
impl fmt::Debug for PollingState {
549+
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
550+
let state = self.0.load(Ordering::Relaxed);
551+
f.debug_struct("PollingState")
552+
.field("polling", &((state & IS_POLLING) != 0))
553+
.field("awoken", &((state & IS_AWOKEN) != 0))
554+
.finish()
555+
}
556+
}

0 commit comments

Comments
 (0)