Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions src/io_uring/cq.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,14 @@ impl Completions {
// If we have no completions we make a system call to wait for
// completion events.
log::trace!(timeout:?; "waiting for completion events");
shared.is_polling.store(true, Ordering::Release);
let timeout = if shared.polling.set_polling(true) {
// Got woken up, so polling without a timeout.
Some(Duration::ZERO)
} else {
timeout
};
let result = shared.enter(1, libc::IORING_ENTER_GETEVENTS, timeout);
shared.is_polling.store(false, Ordering::Release);
shared.polling.set_polling(false);
result?;
tail = load_kernel_shared(self.entries_tail);
}
Expand Down
11 changes: 4 additions & 7 deletions src/io_uring/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,11 @@ use std::cmp::min;
use std::mem::{drop as unlock, swap, take};
use std::os::fd::{AsRawFd, OwnedFd, RawFd};
use std::sync::Mutex;
use std::sync::atomic::{AtomicBool, AtomicU32, Ordering};
use std::sync::atomic::{AtomicU32, Ordering};
use std::time::Duration;
use std::{ptr, task};

use crate::{asan, lock, syscall, try_lock};
use crate::{PollingState, asan, lock, syscall, try_lock};

pub(crate) mod config;
pub(crate) mod cq;
Expand Down Expand Up @@ -80,10 +80,7 @@ pub(crate) struct Shared {
/// True if only a single thread can submit submissions, i.e. if
/// `IORING_SETUP_SINGLE_ISSUER` is enabled.
single_issuer: bool,
/// Boolean indicating a thread is [`Ring::poll`]ing.
///
/// [`Ring::poll`]: crate::Ring::poll
is_polling: AtomicBool,
polling: PollingState,
/// Futures that are waiting for a slot in submissions.
blocked_futures: Mutex<Vec<task::Waker>>,
/// File descriptor of the io_uring.
Expand Down Expand Up @@ -135,7 +132,7 @@ impl Shared {
submissions_len: parameters.sq_entries,
kernel_thread: (parameters.flags & libc::IORING_SETUP_SQPOLL) != 0,
single_issuer: (parameters.flags & libc::IORING_SETUP_SINGLE_ISSUER) != 0,
is_polling: AtomicBool::new(false),
polling: PollingState::new(),
blocked_futures: Mutex::new(Vec::new()),
rfd,
})
Expand Down
2 changes: 1 addition & 1 deletion src/io_uring/sq.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ impl Submissions {

pub(crate) fn wake(&self) -> io::Result<()> {
log::trace!("waking up ring");
if !self.shared.is_polling.load(Ordering::Acquire) {
if !self.shared.polling.wake() {
// If we're not polling we don't need to wake up.
log::trace!("skipping ring message as it's not polling");
return Ok(());
Expand Down
5 changes: 2 additions & 3 deletions src/kqueue/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,10 @@
use std::marker::PhantomData;
use std::os::fd::{AsRawFd, FromRawFd, OwnedFd};
use std::sync::Mutex;
use std::sync::atomic::AtomicBool;
use std::{io, mem, ptr};

use crate::kqueue::{Completions, Shared, Submissions, cq};
use crate::syscall;
use crate::{PollingState, syscall};

#[derive(Debug, Clone)]
pub(crate) struct Config<'r> {
Expand Down Expand Up @@ -86,7 +85,7 @@ impl<'r> crate::Config<'r> {
let shared = Shared {
max_change_list_size: self.sys.max_change_list_size,
change_list: Mutex::new(Vec::new()),
is_polling: AtomicBool::new(false),
polling: PollingState::new(),
kq,
};
let submissions = Submissions::new(shared);
Expand Down
11 changes: 8 additions & 3 deletions src/kqueue/cq.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
use std::mem::{drop as unlock, take};
use std::sync::atomic::Ordering;
use std::time::Duration;
use std::{cmp, io, ptr};

Expand All @@ -24,6 +23,13 @@ impl Completions {
pub(crate) fn poll(&mut self, shared: &Shared, timeout: Option<Duration>) -> io::Result<()> {
self.events.clear();

let timeout = if shared.polling.set_polling(true) {
// Got woken up, so polling without a timeout.
Some(Duration::ZERO)
} else {
timeout
};

let ts = timeout.map(|to| libc::timespec {
tv_sec: cmp::min(to.as_secs(), libc::time_t::MAX as u64).cast_signed(),
// `Duration::subsec_nanos` is guaranteed to be less than one
Expand All @@ -43,9 +49,8 @@ impl Completions {
unlock(change_list); // Unlock, to not block others.

log::trace!(submissions = changes.len(), timeout:?; "waiting for events");
shared.is_polling.store(true, Ordering::Release);
shared.kevent(&mut changes, UseEvents::Some(&mut self.events), ts.as_ref());
shared.is_polling.store(false, Ordering::Release);
shared.polling.set_polling(false);
shared.reuse_change_list(changes);
Ok(())
}
Expand Down
6 changes: 2 additions & 4 deletions src/kqueue/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,9 @@
use std::mem::{drop as unlock, swap};
use std::os::fd::{AsRawFd, OwnedFd};
use std::sync::Mutex;
use std::sync::atomic::AtomicBool;
use std::{fmt, ptr, task};

use crate::{debug_detail, lock, syscall};
use crate::{PollingState, debug_detail, lock, syscall};

pub(crate) mod config;
mod cq;
Expand Down Expand Up @@ -40,8 +39,7 @@ pub(crate) struct Shared {
max_change_list_size: u32,
/// Batched events to register.
change_list: Mutex<Vec<Event>>,
/// Boolean indicating a thread is [`Ring::poll`]ing.
is_polling: AtomicBool,
polling: PollingState,
/// kqueue(2) file descriptor.
kq: OwnedFd,
}
Expand Down
3 changes: 1 addition & 2 deletions src/kqueue/sq.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ use std::io;
use std::mem::{self, drop as unlock, take};
use std::os::fd::{AsRawFd, RawFd};
use std::sync::Arc;
use std::sync::atomic::Ordering;

use crate::kqueue::{Event, Shared, UseEvents, cq};
use crate::lock;
Expand Down Expand Up @@ -83,7 +82,7 @@ impl Submissions {

#[allow(clippy::unnecessary_wraps)]
pub(crate) fn wake(&self) -> io::Result<()> {
if !self.shared.is_polling.load(Ordering::Acquire) {
if !self.shared.polling.wake() {
// If we're not polling we don't need to wake up.
return Ok(());
}
Expand Down
48 changes: 48 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@
)))]
compile_error!("OS not supported");

use std::fmt;
use std::sync::atomic::{AtomicU8, Ordering};
use std::time::Duration;

// This must come before the other modules for the documentation.
Expand Down Expand Up @@ -506,3 +508,49 @@ impl<T> OpPollResult<T> for Option<io::Result<T>> {
None
}
}

/// Polling state of a [`Ring`].
pub(crate) struct PollingState(AtomicU8);

const IS_POLLING: u8 = 0b01;
#[allow(unused)] // Used in an assert below, but rustc doesn't seem to care.
const NOT_POLLING: u8 = 0b00;
const IS_AWOKEN: u8 = 0b10;
const NOT_AWOKEN: u8 = 0b00;

impl PollingState {
pub(crate) const fn new() -> PollingState {
PollingState(AtomicU8::new(0))
}

/// Set the state to currently (not) polling.
///
/// Returns a boolean indicating if the poll call should continue like
/// normal, or should be cut short (e.g. with a zero timeout).
#[allow(clippy::cast_lossless)]
pub(crate) fn set_polling(&self, is_polling: bool) -> bool {
const _BOOL_CAST_CHECK_TRUE: () = assert!(true as u8 == IS_POLLING);
const _BOOL_CAST_CHECK_FALSE: () = assert!(false as u8 == NOT_POLLING);
let state = self.0.swap(is_polling as u8 | NOT_AWOKEN, Ordering::AcqRel);
(state & IS_AWOKEN) != 0
}

/// Set the state to wake the polling thread.
///
/// Returns a boolean indicating if the caller should submit an event to
/// wake up the polling thread.
pub(crate) fn wake(&self) -> bool {
let state = self.0.fetch_or(IS_AWOKEN, Ordering::AcqRel);
state == (IS_POLLING | NOT_AWOKEN)
}
}

impl fmt::Debug for PollingState {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let state = self.0.load(Ordering::Relaxed);
f.debug_struct("PollingState")
.field("polling", &((state & IS_POLLING) != 0))
.field("awoken", &((state & IS_AWOKEN) != 0))
.finish()
}
}
Loading