Skip to content

Commit ec8b7a6

Browse files
committed
fix(driver): repost entry from other IOCP
1 parent be6b299 commit ec8b7a6

File tree

5 files changed

+23
-11
lines changed

5 files changed

+23
-11
lines changed

compio-driver/src/iocp/cp/global.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ fn iocp_start() -> io::Result<()> {
7474
let port = iocp_port()?;
7575
std::thread::spawn(move || {
7676
loop {
77-
for (driver, entry) in port.port.poll(None)? {
77+
for (driver, entry) in port.port.poll(None, None)? {
7878
port.push(driver, entry);
7979
}
8080
}

compio-driver/src/iocp/cp/mod.rs

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -86,9 +86,12 @@ impl CompletionPort {
8686
Ok(())
8787
}
8888

89+
// If current_driver is specified, any entry that doesn't belong the driver will
90+
// be reposted. The driver id will be used as IOCP handle.
8991
pub fn poll(
9092
&self,
9193
timeout: Option<Duration>,
94+
current_driver: Option<usize>,
9295
) -> io::Result<impl Iterator<Item = (usize, Entry)>> {
9396
const DEFAULT_CAPACITY: usize = 1024;
9497

@@ -112,12 +115,26 @@ impl CompletionPort {
112115
trace!("recv_count: {recv_count}");
113116
unsafe { entries.set_len(recv_count as _) };
114117

115-
Ok(entries.into_iter().map(|entry| {
118+
Ok(entries.into_iter().map(move |entry| {
116119
let transferred = entry.dwNumberOfBytesTransferred;
117120
trace!("entry transferred: {transferred}");
118121
// Any thin pointer is OK because we don't use the type of opcode.
119122
let overlapped_ptr: *mut Overlapped<()> = entry.lpOverlapped.cast();
120123
let overlapped = unsafe { &*overlapped_ptr };
124+
if let Some(current_driver) = current_driver {
125+
if overlapped.driver != current_driver {
126+
syscall!(
127+
BOOL,
128+
PostQueuedCompletionStatus(
129+
overlapped.driver as _,
130+
entry.dwNumberOfBytesTransferred,
131+
entry.lpCompletionKey,
132+
entry.lpOverlapped
133+
)
134+
)
135+
.ok();
136+
}
137+
}
121138
let res = if matches!(
122139
overlapped.base.Internal as NTSTATUS,
123140
STATUS_SUCCESS | STATUS_PENDING

compio-driver/src/iocp/cp/multi.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ impl Port {
2828

2929
pub fn poll(&self, timeout: Option<Duration>) -> io::Result<impl Iterator<Item = Entry> + '_> {
3030
let current_id = self.id();
31-
self.port.poll(timeout).map(move |it| {
31+
self.port.poll(timeout, Some(current_id)).map(move |it| {
3232
it.filter_map(
3333
move |(id, entry)| {
3434
if id == current_id { Some(entry) } else { None }

compio/Cargo.toml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,6 @@ libc = { workspace = true }
7373
default = ["runtime", "io-uring"]
7474
io-uring = ["compio-driver/io-uring"]
7575
polling = ["compio-driver/polling"]
76-
iocp-global = ["compio-driver/iocp-global"]
7776
io = ["dep:compio-io"]
7877
io-compat = ["io", "compio-io/compat"]
7978
runtime = ["dep:compio-runtime", "dep:compio-fs", "dep:compio-net", "io"]

compio/tests/runtime.rs

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -7,12 +7,11 @@ use compio::{
77
fs::File,
88
io::{AsyncReadAt, AsyncReadExt, AsyncWriteAt, AsyncWriteExt},
99
net::{TcpListener, TcpStream},
10+
runtime::TryClone,
1011
};
11-
use compio_runtime::TryClone;
1212
use tempfile::NamedTempFile;
1313

1414
#[compio_macros::test]
15-
#[cfg(any(not(windows), feature = "iocp-global"))]
1615
async fn multi_threading() {
1716
const DATA: &str = "Hello world!";
1817

@@ -29,16 +28,13 @@ async fn multi_threading() {
2928
assert_eq!(n, buffer.len());
3029
assert_eq!(DATA, String::from_utf8(buffer).unwrap());
3130

32-
if let Err(e) = std::thread::spawn(move || {
31+
compio::runtime::spawn_blocking(move || {
3332
compio::runtime::Runtime::new().unwrap().block_on(async {
3433
let ((), buffer) = rx.read_exact(Vec::with_capacity(DATA.len())).await.unwrap();
3534
assert_eq!(DATA, String::from_utf8(buffer).unwrap());
3635
});
3736
})
38-
.join()
39-
{
40-
std::panic::resume_unwind(e)
41-
}
37+
.await
4238
}
4339

4440
#[compio_macros::test]

0 commit comments

Comments
 (0)