Skip to content

Commit def1dda

Browse files
authored
Handle overflown cqes when IORING_FEAT_NODROP supported (#349)
* fix: handle overflown cqes when IORING_FEAT_NODROP supported * test: add test for cq overflow * fix: remove duplicated deps * fix: create buffer per read
1 parent a934f54 commit def1dda

File tree

5 files changed

+184
-4
lines changed

5 files changed

+184
-4
lines changed

io-uring-test/src/main.rs

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,11 @@ fn main() -> anyhow::Result<()> {
1515
let entries = 8;
1616

1717
test::<squeue::Entry, cqueue::Entry>(IoUring::new(entries)?)?;
18+
test_sqpoll::<squeue::Entry, cqueue::Entry>(
19+
IoUring::builder()
20+
.setup_sqpoll(1000) // Enable SQPOLL with 1000ms idle time
21+
.build(entries)?,
22+
)?;
1823

1924
#[cfg(not(feature = "ci"))]
2025
{
@@ -181,3 +186,45 @@ fn test<S: squeue::EntryMarker, C: cqueue::EntryMarker>(
181186

182187
Ok(())
183188
}
189+
190+
fn test_sqpoll<S: squeue::EntryMarker, C: cqueue::EntryMarker>(
191+
mut ring: IoUring<S, C>,
192+
) -> anyhow::Result<()> {
193+
if !ring.params().is_setup_sqpoll() {
194+
return Err(anyhow::anyhow!(
195+
"IORING_SETUP_SQPOLL must be enabled to run this test"
196+
));
197+
}
198+
199+
let mut probe = Probe::new();
200+
201+
if ring.submitter().register_probe(&mut probe).is_err() {
202+
eprintln!("No probe supported");
203+
}
204+
205+
println!();
206+
println!(
207+
"ring type: IoUring<{}, {}>",
208+
std::any::type_name::<S>()
209+
.strip_prefix("io_uring::")
210+
.unwrap(),
211+
std::any::type_name::<C>()
212+
.strip_prefix("io_uring::")
213+
.unwrap(),
214+
);
215+
println!("params: {:#?}", ring.params());
216+
println!("probe: {:?}", probe);
217+
println!();
218+
219+
let test = Test {
220+
probe,
221+
target: std::env::args().nth(1),
222+
count: Cell::new(0),
223+
};
224+
225+
tests::sqpoll::test_sqpoll_cq_overflow(&mut ring, &test)?;
226+
227+
println!("Test count: {}", test.count.get());
228+
229+
Ok(())
230+
}

io-uring-test/src/tests/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,3 +12,5 @@ pub mod register_buffers;
1212
pub mod register_sync_cancel;
1313
pub mod regression;
1414
pub mod timeout;
15+
16+
pub mod sqpoll;

io-uring-test/src/tests/sqpoll.rs

Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,109 @@
1+
use io_uring::{cqueue, opcode, squeue, types, IoUring};
2+
use std::fs::File;
3+
use std::io::Write;
4+
use std::os::unix::io::AsRawFd;
5+
use std::path::Path;
6+
use tempfile::tempdir;
7+
8+
use crate::Test;
9+
10+
/// Test to reproduce SQPOLL CQ overflow issue
11+
///
12+
/// This test demonstrates the issue when:
13+
/// 1) SQPOLL is enabled
14+
/// 2) IORING_FEAT_NODROP feature is supported
15+
/// 3) We submit more concurrent I/O requests than the CQ can handle
16+
pub fn test_sqpoll_cq_overflow<S: squeue::EntryMarker, C: cqueue::EntryMarker>(
17+
ring: &mut IoUring<S, C>,
18+
test: &Test,
19+
) -> anyhow::Result<()> {
20+
if !ring.params().is_feature_nodrop() {
21+
println!("IORING_FEAT_NODROP is not supported by the kernel, skip");
22+
return Ok(());
23+
}
24+
25+
require! {
26+
test;
27+
}
28+
29+
println!("test sqpoll_cq_overflow");
30+
31+
let idle = ring.params().sq_thread_idle();
32+
let entries = ring.params().sq_entries() as usize;
33+
let num_requests = entries * 4;
34+
35+
// Create a temporary directory for test files
36+
let temp_dir = tempdir().expect("Failed to create temp directory");
37+
let test_dir = temp_dir.path();
38+
39+
// Create test files for I/O operations
40+
let test_files = create_test_files(test_dir, num_requests);
41+
let mut bufs = (0..num_requests)
42+
.map(|_| vec![0u8; 1024])
43+
.collect::<Vec<_>>();
44+
45+
let start = std::time::Instant::now();
46+
47+
test_files
48+
.iter()
49+
.zip(bufs.iter_mut())
50+
.for_each(|(file, buf)| {
51+
let fd = file.as_raw_fd();
52+
let entry = opcode::Read::new(types::Fd(fd), buf.as_mut_ptr(), buf.len() as _)
53+
.build()
54+
.into();
55+
while unsafe { ring.submission().push(&entry).is_err() } {
56+
ring.submit().expect("Failed to submit");
57+
}
58+
});
59+
60+
let mut completed_count = 0;
61+
// Try to collect all completions
62+
while completed_count < num_requests {
63+
while ring.completion().next().is_some() {
64+
completed_count += 1;
65+
}
66+
67+
if ring.submission().cq_overflow() {
68+
// Call `io_uring_enter` to make the kernel flush overflowed completions
69+
println!("CQ overflow, syscall to flush");
70+
ring.submit().expect("Failed to submit");
71+
}
72+
}
73+
74+
let end = start.elapsed();
75+
76+
assert_eq!(completed_count, num_requests);
77+
// After idle time, `IORING_SQ_NEED_WAKEUP` flag will be set, and `io_uring_enter` will be called.
78+
// That call will flush the overflowed completions eventually.
79+
// To distinguish this case from the case where the fix of PR #349 is applied,
80+
// we need to check the elapsed time.
81+
assert!(
82+
end.as_millis() < idle as _,
83+
"Overflown completions should be flushed within the idle time"
84+
);
85+
86+
Ok(())
87+
}
88+
89+
/// Create test files for I/O operations
90+
fn create_test_files(dir: &Path, count: usize) -> Vec<File> {
91+
let mut files = Vec::new();
92+
93+
for i in 0..count {
94+
let file_path = dir.join(format!("test_file_{}.txt", i));
95+
let mut file = File::create(&file_path).expect("Failed to create test file");
96+
97+
// Write some data to the file
98+
let content = format!("Test content for file {}", i);
99+
file.write_all(content.as_bytes())
100+
.expect("Failed to write to test file");
101+
file.flush().expect("Failed to flush file");
102+
103+
// Reopen for reading
104+
let read_file = File::open(&file_path).expect("Failed to open test file for reading");
105+
files.push(read_file);
106+
}
107+
108+
files
109+
}

src/lib.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -602,6 +602,11 @@ impl Parameters {
602602
self.0.sq_entries
603603
}
604604

605+
/// The idle time of the SQ poll thread in milliseconds.
606+
pub fn sq_thread_idle(&self) -> u32 {
607+
self.0.sq_thread_idle
608+
}
609+
605610
/// The number of completion queue entries allocated.
606611
pub fn cq_entries(&self) -> u32 {
607612
self.0.cq_entries

src/submit.rs

Lines changed: 21 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -123,7 +123,18 @@ impl<'a> Submitter<'a> {
123123
// the IORING_ENTER_SQ_WAKEUP bit is required in all paths where sqpoll
124124
// is setup when consolidating the reads.
125125

126-
if want > 0 || self.params.is_setup_iopoll() || self.sq_cq_overflow() {
126+
let sq_cq_overflow = self.sq_cq_overflow();
127+
128+
// When IORING_FEAT_NODROP is enabled and CQ overflows, the kernel buffers
129+
// completion events internally but doesn't automatically flush them when
130+
// CQ space becomes available. We must explicitly call io_uring_enter()
131+
// to flush these buffered events, even with SQPOLL enabled.
132+
//
133+
// Without this, completions remain stuck in kernel's internal buffer
134+
// after draining CQ, causing missing completion notifications.
135+
let need_syscall_for_overflow = sq_cq_overflow && self.params.is_feature_nodrop();
136+
137+
if want > 0 || self.params.is_setup_iopoll() || sq_cq_overflow {
127138
flags |= sys::IORING_ENTER_GETEVENTS;
128139
}
129140

@@ -132,9 +143,12 @@ impl<'a> Submitter<'a> {
132143
atomic::fence(atomic::Ordering::SeqCst);
133144
if self.sq_need_wakeup() {
134145
flags |= sys::IORING_ENTER_SQ_WAKEUP;
135-
} else if want == 0 {
146+
} else if want == 0 && !need_syscall_for_overflow {
136147
// The kernel thread is polling and hasn't fallen asleep, so we don't need to tell
137148
// it to process events or wake it up
149+
150+
// However, if the CQ ring is overflown, we need to tell the kernel to process events
151+
// by calling io_uring_enter with the IORING_ENTER_GETEVENTS flag.
138152
return Ok(len);
139153
}
140154
}
@@ -155,7 +169,10 @@ impl<'a> Submitter<'a> {
155169
let len = self.sq_len();
156170
let mut flags = sys::IORING_ENTER_EXT_ARG;
157171

158-
if want > 0 || self.params.is_setup_iopoll() || self.sq_cq_overflow() {
172+
let sq_cq_overflow = self.sq_cq_overflow();
173+
let need_syscall = sq_cq_overflow & self.params.is_feature_nodrop();
174+
175+
if want > 0 || self.params.is_setup_iopoll() || sq_cq_overflow {
159176
flags |= sys::IORING_ENTER_GETEVENTS;
160177
}
161178

@@ -164,7 +181,7 @@ impl<'a> Submitter<'a> {
164181
atomic::fence(atomic::Ordering::SeqCst);
165182
if self.sq_need_wakeup() {
166183
flags |= sys::IORING_ENTER_SQ_WAKEUP;
167-
} else if want == 0 {
184+
} else if want == 0 && !need_syscall {
168185
// The kernel thread is polling and hasn't fallen asleep, so we don't need to tell
169186
// it to process events or wake it up
170187
return Ok(len);

0 commit comments

Comments
 (0)