Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion spellcheck.dic
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
312
313
&
+
<
Expand Down Expand Up @@ -240,6 +240,7 @@ spawner
Splitter
spmc
spsc
SQPOLL
src
stabilised
startup
Expand Down
72 changes: 71 additions & 1 deletion tokio/src/runtime/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,16 @@ pub struct Builder {
enable_io: bool,
nevents: usize,

/// Idle timeout (in milliseconds) for the io_uring SQPOLL kernel thread.
#[cfg(all(
tokio_unstable,
feature = "io-uring",
feature = "rt",
feature = "fs",
target_os = "linux"
))]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add some rustdoc here

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, I've added a line for rustdoc above. Thanks.

uring_setup_sqpoll: Option<u32>,

/// Whether or not to enable the time driver
enable_time: bool,

Expand Down Expand Up @@ -275,6 +285,15 @@ impl Builder {
enable_io: false,
nevents: 1024,

#[cfg(all(
tokio_unstable,
feature = "io-uring",
feature = "rt",
feature = "fs",
target_os = "linux"
))]
uring_setup_sqpoll: None,

// Time defaults to "off"
enable_time: false,

Expand Down Expand Up @@ -1598,6 +1617,19 @@ impl Builder {
cfg.timer_flavor = TimerFlavor::Traditional;
let (driver, driver_handle) = driver::Driver::new(cfg)?;

#[cfg(all(
tokio_unstable,
feature = "io-uring",
feature = "rt",
feature = "fs",
target_os = "linux"
))]
if let Some(idle_timeout) = self.uring_setup_sqpoll {
if let Some(io) = driver_handle.io.as_ref() {
io.setup_uring_sqpoll(idle_timeout);
}
}

// Blocking pool
let blocking_pool = blocking::create_blocking_pool(self, self.max_blocking_threads);
let blocking_spawner = blocking_pool.spawner().clone();
Expand Down Expand Up @@ -1736,12 +1768,43 @@ cfg_io_uring! {
/// .build()
/// .unwrap();
/// ```
#[cfg_attr(docsrs, doc(cfg(feature = "io-uring")))]
#[cfg_attr(docsrs, doc(cfg(all(feature = "io-uring", feature = "rt", feature = "fs"))))]
pub fn enable_io_uring(&mut self) -> &mut Self {
// Currently, the uring flag is equivalent to `enable_io`.
self.enable_io = true;
self
}

/// Enables SQPOLL for the io_uring driver and sets the idle timeout (in milliseconds).
///
/// When SQPOLL is enabled, a kernel thread is created to poll the
/// submission queue. This can reduce syscall overhead.
///
/// # Prerequisites
///
/// SQPOLL requires Linux kernel 5.1 or later.
/// Until Linux 5.10, using this flag required the `CAP_SYS_ADMIN` capability.
/// From Linux 5.11, the `CAP_SYS_ADMIN` capability is no longer required.
///
/// If SQPOLL is enabled but not supported by the kernel, the first I/O
/// operation using `io_uring` will fail.
///
/// # Examples
///
/// ```
/// use tokio::runtime;
///
/// let rt = runtime::Builder::new_multi_thread()
/// .enable_io_uring()
/// .uring_setup_sqpoll(2000)
/// .build()
/// .unwrap();
/// ```
#[cfg_attr(docsrs, doc(cfg(all(feature = "io-uring", feature = "rt", feature = "fs"))))]
pub fn uring_setup_sqpoll(&mut self, idle_timeout: u32) -> &mut Self {
self.uring_setup_sqpoll = Some(idle_timeout);
self
}
}
}

Expand Down Expand Up @@ -1781,6 +1844,13 @@ cfg_rt_multi_thread! {

let (driver, driver_handle) = driver::Driver::new(self.get_cfg())?;

#[cfg(all(tokio_unstable, feature = "io-uring", feature = "rt", feature = "fs", target_os = "linux"))]
if let Some(idle_timeout) = self.uring_setup_sqpoll {
if let Some(io) = driver_handle.io.as_ref() {
io.setup_uring_sqpoll(idle_timeout);
}
}

// Create the blocking pool
let blocking_pool =
blocking::create_blocking_pool(self, self.max_blocking_threads + worker_threads);
Expand Down
12 changes: 6 additions & 6 deletions tokio/src/runtime/driver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ pub(crate) struct Cfg {

impl Driver {
pub(crate) fn new(cfg: Cfg) -> io::Result<(Self, Handle)> {
let (io_stack, io_handle, signal_handle) = create_io_stack(cfg.enable_io, cfg.nevents)?;
let (io_stack, io_handle, signal_handle) = create_io_stack(&cfg)?;

let clock = create_clock(cfg.enable_pause_time, cfg.start_paused);

Expand Down Expand Up @@ -146,12 +146,12 @@ cfg_io_driver! {
Disabled(UnparkThread),
}

fn create_io_stack(enabled: bool, nevents: usize) -> io::Result<(IoStack, IoHandle, SignalHandle)> {
fn create_io_stack(cfg: &Cfg) -> io::Result<(IoStack, IoHandle, SignalHandle)> {
#[cfg(loom)]
assert!(!enabled);
assert!(!cfg.enable_io);

let ret = if enabled {
let (io_driver, io_handle) = crate::runtime::io::Driver::new(nevents)?;
let ret = if cfg.enable_io {
let (io_driver, io_handle) = crate::runtime::io::Driver::new(cfg.nevents)?;

let (signal_driver, signal_handle) = create_signal_driver(io_driver, &io_handle)?;
let process_driver = create_process_driver(signal_driver);
Expand Down Expand Up @@ -212,7 +212,7 @@ cfg_not_io_driver! {
#[derive(Debug)]
pub(crate) struct IoStack(ParkThread);

fn create_io_stack(_enabled: bool, _nevents: usize) -> io::Result<(IoStack, IoHandle, SignalHandle)> {
fn create_io_stack(_cfg: &Cfg) -> io::Result<(IoStack, IoHandle, SignalHandle)> {
let park_thread = ParkThread::new();
let unpark_thread = park_thread.unpark();
Ok((IoStack(park_thread), unpark_thread, Default::default()))
Expand Down
23 changes: 22 additions & 1 deletion tokio/src/runtime/io/driver/uring.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,15 @@ const DEFAULT_RING_SIZE: u32 = 256;
pub(crate) struct UringContext {
pub(crate) uring: Option<io_uring::IoUring>,
pub(crate) ops: slab::Slab<Lifecycle>,
pub(crate) sqpoll_idle: Option<u32>,
}

impl UringContext {
pub(crate) fn new() -> Self {
Self {
ops: Slab::new(),
uring: None,
sqpoll_idle: None,
}
}

Expand All @@ -44,7 +46,13 @@ impl UringContext {
return Ok(false);
}

let uring = IoUring::new(DEFAULT_RING_SIZE)?;
let uring = if let Some(idle_timeout) = self.sqpoll_idle {
IoUring::builder()
.setup_sqpoll(idle_timeout)
.build(DEFAULT_RING_SIZE)?
} else {
IoUring::new(DEFAULT_RING_SIZE)?
};

match uring.submitter().register_probe(probe) {
Ok(_) => {}
Expand Down Expand Up @@ -97,6 +105,14 @@ impl UringContext {
}

pub(crate) fn submit(&mut self) -> io::Result<()> {
if self.sqpoll_idle.is_some() {
let mut sq = self.ring_mut().submission();
sq.sync();
if !sq.need_wakeup() {
return Ok(());
}
}

loop {
// Errors from io_uring_enter: https://man7.org/linux/man-pages/man2/io_uring_enter.2.html#ERRORS
match self.ring().submit() {
Expand Down Expand Up @@ -164,6 +180,11 @@ impl Handle {
&self.uring_context
}

pub(crate) fn setup_uring_sqpoll(&self, idle_timeout: u32) {
let mut guard = self.get_uring().lock();
guard.sqpoll_idle = Some(idle_timeout);
}

/// Check if the io_uring context is initialized. If not, it will try to initialize it.
/// Then, check if the provided opcode is supported.
///
Expand Down
83 changes: 83 additions & 0 deletions tokio/tests/fs_uring_sqpoll.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
#![cfg(all(
tokio_unstable,
feature = "io-uring",
feature = "rt",
feature = "fs",
target_os = "linux"
))]

use std::io::{Read, Seek, SeekFrom};
use tempfile::NamedTempFile;
use tokio::io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt};
use tokio::runtime::Builder;

#[test]
fn test_sqpoll_current_thread() {
let rt = Builder::new_current_thread()
.enable_all()
.uring_setup_sqpoll(1000)
.build()
.unwrap();

rt.block_on(async {
let mut temp = NamedTempFile::new().unwrap();
let path = temp.path().to_path_buf();

let mut file = tokio::fs::OpenOptions::new()
.read(true)
.write(true)
.open(&path)
.await
.unwrap();

file.write_all(b"hello").await.unwrap();
file.flush().await.unwrap();

// Check if data was actually written to the underlying file
let mut buf = vec![0; 5];
temp.as_file_mut().seek(SeekFrom::Start(0)).unwrap();
temp.as_file_mut().read_exact(&mut buf).unwrap();
assert_eq!(&buf, b"hello");

file.seek(std::io::SeekFrom::Start(0)).await.unwrap();
let mut buf = vec![0; 5];
file.read_exact(&mut buf).await.unwrap();
assert_eq!(&buf, b"hello");
});
}

#[test]
fn test_sqpoll_multi_thread() {
let rt = Builder::new_multi_thread()
.worker_threads(2)
.enable_all()
.uring_setup_sqpoll(1000)
.build()
.unwrap();

rt.block_on(async {
let mut temp = NamedTempFile::new().unwrap();
let path = temp.path().to_path_buf();

let mut file = tokio::fs::OpenOptions::new()
.read(true)
.write(true)
.open(&path)
.await
.unwrap();

file.write_all(b"world").await.unwrap();
file.flush().await.unwrap();

// Check if data was actually written to the underlying file
let mut buf = vec![0; 5];
temp.as_file_mut().seek(SeekFrom::Start(0)).unwrap();
temp.as_file_mut().read_exact(&mut buf).unwrap();
assert_eq!(&buf, b"world");

file.seek(std::io::SeekFrom::Start(0)).await.unwrap();
let mut buf = vec![0; 5];
file.read_exact(&mut buf).await.unwrap();
assert_eq!(&buf, b"world");
});
}
Loading