Skip to content

Commit 23df36c

Browse files
committed
feat: add SQPOLL support for io_uring
This adds `uring_setup_sqpoll` to the Runtime `Builder`, allowing the io_uring driver to be configured with SQPOLL enabled. This offloads submission queue polling to a kernel thread, which can significantly reduce syscall overhead. - Added `uring_setup_sqpoll` to `tokio::runtime::Builder`. - Updated `UringContext` to sync the submission queue and check `need_wakeup`. - Added tests to verify `sqpoll` in both single-thread and multi-thread runtimes.
1 parent aa1e7e2 commit 23df36c

File tree

4 files changed

+181
-8
lines changed

4 files changed

+181
-8
lines changed

tokio/src/runtime/builder.rs

Lines changed: 70 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,15 @@ pub struct Builder {
5858
enable_io: bool,
5959
nevents: usize,
6060

61+
#[cfg(all(
62+
tokio_unstable,
63+
feature = "io-uring",
64+
feature = "rt",
65+
feature = "fs",
66+
target_os = "linux"
67+
))]
68+
uring_setup_sqpoll: Option<u32>,
69+
6170
/// Whether or not to enable the time driver
6271
enable_time: bool,
6372

@@ -275,6 +284,15 @@ impl Builder {
275284
enable_io: false,
276285
nevents: 1024,
277286

287+
#[cfg(all(
288+
tokio_unstable,
289+
feature = "io-uring",
290+
feature = "rt",
291+
feature = "fs",
292+
target_os = "linux"
293+
))]
294+
uring_setup_sqpoll: None,
295+
278296
// Time defaults to "off"
279297
enable_time: false,
280298

@@ -1598,6 +1616,19 @@ impl Builder {
15981616
cfg.timer_flavor = TimerFlavor::Traditional;
15991617
let (driver, driver_handle) = driver::Driver::new(cfg)?;
16001618

1619+
#[cfg(all(
1620+
tokio_unstable,
1621+
feature = "io-uring",
1622+
feature = "rt",
1623+
feature = "fs",
1624+
target_os = "linux"
1625+
))]
1626+
if let Some(idle_timeout) = self.uring_setup_sqpoll {
1627+
if let Some(io) = driver_handle.io.as_ref() {
1628+
io.setup_uring_sqpoll(idle_timeout);
1629+
}
1630+
}
1631+
16011632
// Blocking pool
16021633
let blocking_pool = blocking::create_blocking_pool(self, self.max_blocking_threads);
16031634
let blocking_spawner = blocking_pool.spawner().clone();
@@ -1736,12 +1767,43 @@ cfg_io_uring! {
17361767
/// .build()
17371768
/// .unwrap();
17381769
/// ```
1739-
#[cfg_attr(docsrs, doc(cfg(feature = "io-uring")))]
1770+
#[cfg_attr(docsrs, doc(cfg(all(feature = "io-uring", feature = "rt", feature = "fs"))))]
17401771
pub fn enable_io_uring(&mut self) -> &mut Self {
17411772
// Currently, the uring flag is equivalent to `enable_io`.
17421773
self.enable_io = true;
17431774
self
17441775
}
1776+
1777+
/// Enables SQPOLL for the io_uring driver and sets the idle timeout (in milliseconds).
1778+
///
1779+
/// When SQPOLL is enabled, a kernel thread is created to poll the
1780+
/// submission queue. This can reduce syscall overhead.
1781+
///
1782+
/// # Prerequisites
1783+
///
1784+
/// SQPOLL requires Linux kernel 5.1 or later.
1785+
/// Until Linux 5.10, using this flag required the `CAP_SYS_ADMIN` capability.
1786+
/// From Linux 5.11, the `CAP_SYS_ADMIN` capability is no longer required.
1787+
///
1788+
/// If SQPOLL is enabled but not supported by the kernel, the first I/O
1789+
/// operation using `io_uring` will fail.
1790+
///
1791+
/// # Examples
1792+
///
1793+
/// ```
1794+
/// use tokio::runtime;
1795+
///
1796+
/// let rt = runtime::Builder::new_multi_thread()
1797+
/// .enable_io_uring()
1798+
/// .uring_setup_sqpoll(2000)
1799+
/// .build()
1800+
/// .unwrap();
1801+
/// ```
1802+
#[cfg_attr(docsrs, doc(cfg(all(feature = "io-uring", feature = "rt", feature = "fs"))))]
1803+
pub fn uring_setup_sqpoll(&mut self, idle_timeout: u32) -> &mut Self {
1804+
self.uring_setup_sqpoll = Some(idle_timeout);
1805+
self
1806+
}
17451807
}
17461808
}
17471809

@@ -1781,6 +1843,13 @@ cfg_rt_multi_thread! {
17811843

17821844
let (driver, driver_handle) = driver::Driver::new(self.get_cfg())?;
17831845

1846+
#[cfg(all(tokio_unstable, feature = "io-uring", feature = "rt", feature = "fs", target_os = "linux"))]
1847+
if let Some(idle_timeout) = self.uring_setup_sqpoll {
1848+
if let Some(io) = driver_handle.io.as_ref() {
1849+
io.setup_uring_sqpoll(idle_timeout);
1850+
}
1851+
}
1852+
17841853
// Create the blocking pool
17851854
let blocking_pool =
17861855
blocking::create_blocking_pool(self, self.max_blocking_threads + worker_threads);

tokio/src/runtime/driver.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ pub(crate) struct Cfg {
4545

4646
impl Driver {
4747
pub(crate) fn new(cfg: Cfg) -> io::Result<(Self, Handle)> {
48-
let (io_stack, io_handle, signal_handle) = create_io_stack(cfg.enable_io, cfg.nevents)?;
48+
let (io_stack, io_handle, signal_handle) = create_io_stack(&cfg)?;
4949

5050
let clock = create_clock(cfg.enable_pause_time, cfg.start_paused);
5151

@@ -146,12 +146,12 @@ cfg_io_driver! {
146146
Disabled(UnparkThread),
147147
}
148148

149-
fn create_io_stack(enabled: bool, nevents: usize) -> io::Result<(IoStack, IoHandle, SignalHandle)> {
149+
fn create_io_stack(cfg: &Cfg) -> io::Result<(IoStack, IoHandle, SignalHandle)> {
150150
#[cfg(loom)]
151-
assert!(!enabled);
151+
assert!(!cfg.enable_io);
152152

153-
let ret = if enabled {
154-
let (io_driver, io_handle) = crate::runtime::io::Driver::new(nevents)?;
153+
let ret = if cfg.enable_io {
154+
let (io_driver, io_handle) = crate::runtime::io::Driver::new(cfg.nevents)?;
155155

156156
let (signal_driver, signal_handle) = create_signal_driver(io_driver, &io_handle)?;
157157
let process_driver = create_process_driver(signal_driver);
@@ -212,7 +212,7 @@ cfg_not_io_driver! {
212212
#[derive(Debug)]
213213
pub(crate) struct IoStack(ParkThread);
214214

215-
fn create_io_stack(_enabled: bool, _nevents: usize) -> io::Result<(IoStack, IoHandle, SignalHandle)> {
215+
fn create_io_stack(_cfg: &Cfg) -> io::Result<(IoStack, IoHandle, SignalHandle)> {
216216
let park_thread = ParkThread::new();
217217
let unpark_thread = park_thread.unpark();
218218
Ok((IoStack(park_thread), unpark_thread, Default::default()))

tokio/src/runtime/io/driver/uring.rs

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,13 +15,15 @@ const DEFAULT_RING_SIZE: u32 = 256;
1515
pub(crate) struct UringContext {
1616
pub(crate) uring: Option<io_uring::IoUring>,
1717
pub(crate) ops: slab::Slab<Lifecycle>,
18+
pub(crate) sqpoll_idle: Option<u32>,
1819
}
1920

2021
impl UringContext {
2122
pub(crate) fn new() -> Self {
2223
Self {
2324
ops: Slab::new(),
2425
uring: None,
26+
sqpoll_idle: None,
2527
}
2628
}
2729

@@ -44,7 +46,13 @@ impl UringContext {
4446
return Ok(false);
4547
}
4648

47-
let uring = IoUring::new(DEFAULT_RING_SIZE)?;
49+
let uring = if let Some(idle_timeout) = self.sqpoll_idle {
50+
IoUring::builder()
51+
.setup_sqpoll(idle_timeout)
52+
.build(DEFAULT_RING_SIZE)?
53+
} else {
54+
IoUring::new(DEFAULT_RING_SIZE)?
55+
};
4856

4957
match uring.submitter().register_probe(probe) {
5058
Ok(_) => {}
@@ -97,6 +105,14 @@ impl UringContext {
97105
}
98106

99107
pub(crate) fn submit(&mut self) -> io::Result<()> {
108+
if self.sqpoll_idle.is_some() {
109+
let mut sq = self.ring_mut().submission();
110+
sq.sync();
111+
if !sq.need_wakeup() {
112+
return Ok(());
113+
}
114+
}
115+
100116
loop {
101117
// Errors from io_uring_enter: https://man7.org/linux/man-pages/man2/io_uring_enter.2.html#ERRORS
102118
match self.ring().submit() {
@@ -164,6 +180,11 @@ impl Handle {
164180
&self.uring_context
165181
}
166182

183+
pub(crate) fn setup_uring_sqpoll(&self, idle_timeout: u32) {
184+
let mut guard = self.get_uring().lock();
185+
guard.sqpoll_idle = Some(idle_timeout);
186+
}
187+
167188
/// Check if the io_uring context is initialized. If not, it will try to initialize it.
168189
/// Then, check if the provided opcode is supported.
169190
///

tokio/tests/fs_uring_sqpoll.rs

Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
#![cfg(all(
2+
tokio_unstable,
3+
feature = "io-uring",
4+
feature = "rt",
5+
feature = "fs",
6+
target_os = "linux"
7+
))]
8+
9+
use std::io::{Read, Seek, SeekFrom};
10+
use tempfile::NamedTempFile;
11+
use tokio::io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt};
12+
use tokio::runtime::Builder;
13+
14+
#[test]
15+
fn test_sqpoll_current_thread() {
16+
let rt = Builder::new_current_thread()
17+
.enable_all()
18+
.uring_setup_sqpoll(1000)
19+
.build()
20+
.unwrap();
21+
22+
rt.block_on(async {
23+
let mut temp = NamedTempFile::new().unwrap();
24+
let path = temp.path().to_path_buf();
25+
26+
let mut file = tokio::fs::OpenOptions::new()
27+
.read(true)
28+
.write(true)
29+
.open(&path)
30+
.await
31+
.unwrap();
32+
33+
file.write_all(b"hello").await.unwrap();
34+
file.flush().await.unwrap();
35+
36+
// Check if data was actually written to the underlying file
37+
let mut buf = vec![0; 5];
38+
temp.as_file_mut().seek(SeekFrom::Start(0)).unwrap();
39+
temp.as_file_mut().read_exact(&mut buf).unwrap();
40+
assert_eq!(&buf, b"hello");
41+
42+
file.seek(std::io::SeekFrom::Start(0)).await.unwrap();
43+
let mut buf = vec![0; 5];
44+
file.read_exact(&mut buf).await.unwrap();
45+
assert_eq!(&buf, b"hello");
46+
});
47+
}
48+
49+
#[test]
50+
fn test_sqpoll_multi_thread() {
51+
let rt = Builder::new_multi_thread()
52+
.worker_threads(2)
53+
.enable_all()
54+
.uring_setup_sqpoll(1000)
55+
.build()
56+
.unwrap();
57+
58+
rt.block_on(async {
59+
let mut temp = NamedTempFile::new().unwrap();
60+
let path = temp.path().to_path_buf();
61+
62+
let mut file = tokio::fs::OpenOptions::new()
63+
.read(true)
64+
.write(true)
65+
.open(&path)
66+
.await
67+
.unwrap();
68+
69+
file.write_all(b"world").await.unwrap();
70+
file.flush().await.unwrap();
71+
72+
// Check if data was actually written to the underlying file
73+
let mut buf = vec![0; 5];
74+
temp.as_file_mut().seek(SeekFrom::Start(0)).unwrap();
75+
temp.as_file_mut().read_exact(&mut buf).unwrap();
76+
assert_eq!(&buf, b"world");
77+
78+
file.seek(std::io::SeekFrom::Start(0)).await.unwrap();
79+
let mut buf = vec![0; 5];
80+
file.read_exact(&mut buf).await.unwrap();
81+
assert_eq!(&buf, b"world");
82+
});
83+
}

0 commit comments

Comments
 (0)