Skip to content

Commit 69d5df8

Browse files
committed
feat: add SQPOLL support for io_uring
This adds `uring_setup_sqpoll` to the Runtime `Builder`, allowing the io_uring driver to be configured with SQPOLL enabled. This offloads submission queue polling to a kernel thread, which can significantly reduce syscall overhead. - Added `uring_setup_sqpoll` to `tokio::runtime::Builder`. - Updated `UringContext` to sync the submission queue and check `need_wakeup`. - Added tests to verify `sqpoll` in both single-thread and multi-thread runtimes.
1 parent e67cd87 commit 69d5df8

File tree

4 files changed

+151
-7
lines changed

4 files changed

+151
-7
lines changed

tokio/src/runtime/builder.rs

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,9 @@ pub struct Builder {
5858
enable_io: bool,
5959
nevents: usize,
6060

61+
#[cfg(all(tokio_unstable, feature = "io-uring"))]
62+
uring_setup_sqpoll: Option<u32>,
63+
6164
/// Whether or not to enable the time driver
6265
enable_time: bool,
6366

@@ -275,6 +278,9 @@ impl Builder {
275278
enable_io: false,
276279
nevents: 1024,
277280

281+
#[cfg(all(tokio_unstable, feature = "io-uring"))]
282+
uring_setup_sqpoll: None,
283+
278284
// Time defaults to "off"
279285
enable_time: false,
280286

@@ -1598,6 +1604,13 @@ impl Builder {
15981604
cfg.timer_flavor = TimerFlavor::Traditional;
15991605
let (driver, driver_handle) = driver::Driver::new(cfg)?;
16001606

1607+
#[cfg(all(tokio_unstable, feature = "io-uring", target_os = "linux"))]
1608+
if let Some(idle) = self.uring_setup_sqpoll {
1609+
if let Some(io) = driver_handle.io.as_ref() {
1610+
io.setup_uring_sqpoll(idle);
1611+
}
1612+
}
1613+
16011614
// Blocking pool
16021615
let blocking_pool = blocking::create_blocking_pool(self, self.max_blocking_threads);
16031616
let blocking_spawner = blocking_pool.spawner().clone();
@@ -1742,6 +1755,28 @@ cfg_io_uring! {
17421755
self.enable_io = true;
17431756
self
17441757
}
1758+
1759+
/// Enables SQPOLL for the io_uring driver and sets the idle timeout.
1760+
///
1761+
/// When SQPOLL is enabled, a kernel thread is created to poll the
1762+
/// submission queue. This can reduce syscall overhead.
1763+
///
1764+
/// # Examples
1765+
///
1766+
/// ```
1767+
/// use tokio::runtime;
1768+
///
1769+
/// let rt = runtime::Builder::new_multi_thread()
1770+
/// .enable_io_uring()
1771+
/// .uring_setup_sqpoll(2000)
1772+
/// .build()
1773+
/// .unwrap();
1774+
/// ```
1775+
#[cfg_attr(docsrs, doc(cfg(feature = "io-uring")))]
1776+
pub fn uring_setup_sqpoll(&mut self, idle: u32) -> &mut Self {
1777+
self.uring_setup_sqpoll = Some(idle);
1778+
self
1779+
}
17451780
}
17461781
}
17471782

@@ -1781,6 +1816,13 @@ cfg_rt_multi_thread! {
17811816

17821817
let (driver, driver_handle) = driver::Driver::new(self.get_cfg())?;
17831818

1819+
#[cfg(all(tokio_unstable, feature = "io-uring", target_os = "linux"))]
1820+
if let Some(idle) = self.uring_setup_sqpoll {
1821+
if let Some(io) = driver_handle.io.as_ref() {
1822+
io.setup_uring_sqpoll(idle);
1823+
}
1824+
}
1825+
17841826
// Create the blocking pool
17851827
let blocking_pool =
17861828
blocking::create_blocking_pool(self, self.max_blocking_threads + worker_threads);

tokio/src/runtime/driver.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ pub(crate) struct Cfg {
4545

4646
impl Driver {
4747
pub(crate) fn new(cfg: Cfg) -> io::Result<(Self, Handle)> {
48-
let (io_stack, io_handle, signal_handle) = create_io_stack(cfg.enable_io, cfg.nevents)?;
48+
let (io_stack, io_handle, signal_handle) = create_io_stack(&cfg)?;
4949

5050
let clock = create_clock(cfg.enable_pause_time, cfg.start_paused);
5151

@@ -146,12 +146,12 @@ cfg_io_driver! {
146146
Disabled(UnparkThread),
147147
}
148148

149-
fn create_io_stack(enabled: bool, nevents: usize) -> io::Result<(IoStack, IoHandle, SignalHandle)> {
149+
fn create_io_stack(cfg: &Cfg) -> io::Result<(IoStack, IoHandle, SignalHandle)> {
150150
#[cfg(loom)]
151-
assert!(!enabled);
151+
assert!(!cfg.enable_io);
152152

153-
let ret = if enabled {
154-
let (io_driver, io_handle) = crate::runtime::io::Driver::new(nevents)?;
153+
let ret = if cfg.enable_io {
154+
let (io_driver, io_handle) = crate::runtime::io::Driver::new(cfg.nevents)?;
155155

156156
let (signal_driver, signal_handle) = create_signal_driver(io_driver, &io_handle)?;
157157
let process_driver = create_process_driver(signal_driver);
@@ -212,7 +212,7 @@ cfg_not_io_driver! {
212212
#[derive(Debug)]
213213
pub(crate) struct IoStack(ParkThread);
214214

215-
fn create_io_stack(_enabled: bool, _nevents: usize) -> io::Result<(IoStack, IoHandle, SignalHandle)> {
215+
fn create_io_stack(_cfg: &Cfg) -> io::Result<(IoStack, IoHandle, SignalHandle)> {
216216
let park_thread = ParkThread::new();
217217
let unpark_thread = park_thread.unpark();
218218
Ok((IoStack(park_thread), unpark_thread, Default::default()))

tokio/src/runtime/io/driver/uring.rs

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,13 +15,15 @@ const DEFAULT_RING_SIZE: u32 = 256;
1515
pub(crate) struct UringContext {
1616
pub(crate) uring: Option<io_uring::IoUring>,
1717
pub(crate) ops: slab::Slab<Lifecycle>,
18+
pub(crate) sqpoll_idle: Option<u32>,
1819
}
1920

2021
impl UringContext {
2122
pub(crate) fn new() -> Self {
2223
Self {
2324
ops: Slab::new(),
2425
uring: None,
26+
sqpoll_idle: None,
2527
}
2628
}
2729

@@ -44,7 +46,11 @@ impl UringContext {
4446
return Ok(false);
4547
}
4648

47-
let uring = IoUring::new(DEFAULT_RING_SIZE)?;
49+
let uring = if let Some(idle) = self.sqpoll_idle {
50+
IoUring::builder().setup_sqpoll(idle).build(DEFAULT_RING_SIZE)?
51+
} else {
52+
IoUring::new(DEFAULT_RING_SIZE)?
53+
};
4854

4955
match uring.submitter().register_probe(probe) {
5056
Ok(_) => {}
@@ -97,6 +103,14 @@ impl UringContext {
97103
}
98104

99105
pub(crate) fn submit(&mut self) -> io::Result<()> {
106+
if self.sqpoll_idle.is_some() {
107+
let mut sq = self.ring_mut().submission();
108+
sq.sync();
109+
if !sq.need_wakeup() {
110+
return Ok(());
111+
}
112+
}
113+
100114
loop {
101115
// Errors from io_uring_enter: https://man7.org/linux/man-pages/man2/io_uring_enter.2.html#ERRORS
102116
match self.ring().submit() {
@@ -164,6 +178,11 @@ impl Handle {
164178
&self.uring_context
165179
}
166180

181+
pub(crate) fn setup_uring_sqpoll(&self, idle: u32) {
182+
let mut guard = self.get_uring().lock();
183+
guard.sqpoll_idle = Some(idle);
184+
}
185+
167186
/// Check if the io_uring context is initialized. If not, it will try to initialize it.
168187
/// Then, check if the provided opcode is supported.
169188
///

tokio/tests/fs_uring_sqpoll.rs

Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
#![cfg(all(
2+
tokio_unstable,
3+
feature = "io-uring",
4+
feature = "rt",
5+
feature = "fs",
6+
target_os = "linux"
7+
))]
8+
9+
use std::io::{Read, Seek, SeekFrom};
10+
use tempfile::NamedTempFile;
11+
use tokio::io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt};
12+
use tokio::runtime::Builder;
13+
14+
#[test]
15+
fn test_sqpoll_current_thread() {
16+
let rt = Builder::new_current_thread()
17+
.enable_all()
18+
.uring_setup_sqpoll(1000)
19+
.build()
20+
.unwrap();
21+
22+
rt.block_on(async {
23+
let mut temp = NamedTempFile::new().unwrap();
24+
let path = temp.path().to_path_buf();
25+
26+
let mut file = tokio::fs::OpenOptions::new()
27+
.read(true)
28+
.write(true)
29+
.open(&path)
30+
.await
31+
.unwrap();
32+
33+
file.write_all(b"hello").await.unwrap();
34+
file.flush().await.unwrap();
35+
36+
// Check if data was actually written to the underlying file
37+
let mut buf = vec![0; 5];
38+
temp.as_file_mut().seek(SeekFrom::Start(0)).unwrap();
39+
temp.as_file_mut().read_exact(&mut buf).unwrap();
40+
assert_eq!(&buf, b"hello");
41+
42+
file.seek(std::io::SeekFrom::Start(0)).await.unwrap();
43+
let mut buf = vec![0; 5];
44+
file.read_exact(&mut buf).await.unwrap();
45+
assert_eq!(&buf, b"hello");
46+
});
47+
}
48+
49+
#[test]
50+
fn test_sqpoll_multi_thread() {
51+
let rt = Builder::new_multi_thread()
52+
.worker_threads(2)
53+
.enable_all()
54+
.uring_setup_sqpoll(1000)
55+
.build()
56+
.unwrap();
57+
58+
rt.block_on(async {
59+
let mut temp = NamedTempFile::new().unwrap();
60+
let path = temp.path().to_path_buf();
61+
62+
let mut file = tokio::fs::OpenOptions::new()
63+
.read(true)
64+
.write(true)
65+
.open(&path)
66+
.await
67+
.unwrap();
68+
69+
file.write_all(b"world").await.unwrap();
70+
file.flush().await.unwrap();
71+
72+
// Check if data was actually written to the underlying file
73+
let mut buf = vec![0; 5];
74+
temp.as_file_mut().seek(SeekFrom::Start(0)).unwrap();
75+
temp.as_file_mut().read_exact(&mut buf).unwrap();
76+
assert_eq!(&buf, b"world");
77+
78+
file.seek(std::io::SeekFrom::Start(0)).await.unwrap();
79+
let mut buf = vec![0; 5];
80+
file.read_exact(&mut buf).await.unwrap();
81+
assert_eq!(&buf, b"world");
82+
});
83+
}

0 commit comments

Comments
 (0)