Skip to content

Commit e9b7e6f

Browse files
committed
feat(driver,iocp): use IOCP instead of crossbeam channel
1 parent 312348d commit e9b7e6f

File tree

5 files changed

+72
-140
lines changed

5 files changed

+72
-140
lines changed

compio-driver/Cargo.toml

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,6 @@ socket2 = { workspace = true }
4242
[target.'cfg(windows)'.dependencies]
4343
compio-buf = { workspace = true, features = ["arrayvec"] }
4444
aligned-array = "1.0.1"
45-
crossbeam-skiplist = { workspace = true, optional = true }
4645
once_cell = { workspace = true }
4746
widestring = { workspace = true }
4847
windows-sys = { workspace = true, features = [
@@ -85,7 +84,7 @@ polling = ["dep:polling", "dep:os_pipe"]
8584
io-uring-sqe128 = []
8685
io-uring-cqe32 = []
8786

88-
iocp-global = ["dep:crossbeam-skiplist"]
87+
iocp-global = []
8988

9089
# Nightly features
9190
once_cell_try = []

compio-driver/src/iocp/cp/global.rs

Lines changed: 27 additions & 83 deletions
Original file line numberDiff line numberDiff line change
@@ -3,45 +3,27 @@ use std::sync::OnceLock;
33
use std::{
44
io,
55
os::windows::io::{AsRawHandle, RawHandle},
6-
sync::atomic::{AtomicUsize, Ordering},
76
time::Duration,
87
};
98

10-
use crossbeam_channel::{unbounded, Receiver, RecvTimeoutError, Sender, TryRecvError};
11-
use crossbeam_skiplist::SkipMap;
129
#[cfg(not(feature = "once_cell_try"))]
1310
use once_cell::sync::OnceCell as OnceLock;
14-
use windows_sys::Win32::{
15-
Foundation::ERROR_TIMEOUT,
16-
System::IO::{PostQueuedCompletionStatus, OVERLAPPED},
17-
};
11+
use windows_sys::Win32::System::IO::PostQueuedCompletionStatus;
1812

1913
use super::CompletionPort;
2014
use crate::{syscall, Entry, Overlapped, RawFd};
2115

2216
struct GlobalPort {
2317
port: CompletionPort,
24-
drivers: SkipMap<usize, Sender<Entry>>,
2518
}
2619

2720
impl GlobalPort {
2821
pub fn new() -> io::Result<Self> {
2922
Ok(Self {
3023
port: CompletionPort::new()?,
31-
drivers: SkipMap::new(),
3224
})
3325
}
3426

35-
pub fn register(&self, driver: usize) -> Receiver<Entry> {
36-
let (sender, receiver) = unbounded();
37-
self.drivers.insert(driver, sender);
38-
receiver
39-
}
40-
41-
pub fn deregister(&self, driver: usize) {
42-
self.drivers.remove(&driver);
43-
}
44-
4527
pub fn attach(&self, fd: RawFd) -> io::Result<()> {
4628
self.port.attach(fd)
4729
}
@@ -53,12 +35,6 @@ impl GlobalPort {
5335
) -> io::Result<()> {
5436
self.port.post(res, optr)
5537
}
56-
57-
pub fn push(&self, driver: usize, entry: Entry) {
58-
if let Some(e) = self.drivers.get(&driver) {
59-
e.value().send(entry).ok(); // It's OK if the driver has been dropped.
60-
}
61-
}
6238
}
6339

6440
impl AsRawHandle for GlobalPort {
@@ -78,8 +54,20 @@ fn iocp_start() -> io::Result<()> {
7854
let port = iocp_port()?;
7955
std::thread::spawn(move || {
8056
loop {
81-
for (driver, entry) in port.port.poll(None, None)? {
82-
port.push(driver.0, entry);
57+
for entry in port.port.poll_raw(None)? {
58+
// Any thin pointer is OK because we don't use the type of opcode.
59+
let overlapped_ptr: *mut Overlapped<()> = entry.lpOverlapped.cast();
60+
let overlapped = unsafe { &*overlapped_ptr };
61+
syscall!(
62+
BOOL,
63+
PostQueuedCompletionStatus(
64+
overlapped.driver,
65+
entry.dwNumberOfBytesTransferred,
66+
entry.lpCompletionKey,
67+
entry.lpOverlapped,
68+
)
69+
)
70+
.ok();
8371
}
8472
}
8573
#[allow(unreachable_code)]
@@ -88,68 +76,39 @@ fn iocp_start() -> io::Result<()> {
8876
Ok(())
8977
}
9078

91-
static DRIVER_COUNTER: AtomicUsize = AtomicUsize::new(0);
9279
static IOCP_INIT_ONCE: OnceLock<()> = OnceLock::new();
9380

9481
pub struct Port {
95-
id: usize,
96-
port: &'static GlobalPort,
97-
receiver: Receiver<Entry>,
82+
port: CompletionPort,
83+
global_port: &'static GlobalPort,
9884
}
9985

10086
impl Port {
10187
pub fn new() -> io::Result<Self> {
10288
IOCP_INIT_ONCE.get_or_try_init(iocp_start)?;
10389

104-
let id = DRIVER_COUNTER.fetch_add(1, Ordering::AcqRel);
105-
let port = iocp_port()?;
106-
let receiver = port.register(id);
107-
Ok(Self { id, port, receiver })
108-
}
109-
110-
pub fn id(&self) -> PortId {
111-
PortId(self.id)
90+
Ok(Self {
91+
port: CompletionPort::new()?,
92+
global_port: iocp_port()?,
93+
})
11294
}
11395

11496
pub fn attach(&mut self, fd: RawFd) -> io::Result<()> {
115-
self.port.attach(fd)
97+
self.global_port.attach(fd)
11698
}
11799

118100
pub fn handle(&self) -> PortHandle {
119-
PortHandle::new(self.port)
101+
PortHandle::new(self.global_port)
120102
}
121103

122104
pub fn poll(&self, timeout: Option<Duration>) -> io::Result<impl Iterator<Item = Entry> + '_> {
123-
let e = if let Some(timeout) = timeout {
124-
match self.receiver.recv_timeout(timeout) {
125-
Ok(e) => e,
126-
Err(e) => match e {
127-
RecvTimeoutError::Timeout => {
128-
return Err(io::Error::from_raw_os_error(ERROR_TIMEOUT as _));
129-
}
130-
RecvTimeoutError::Disconnected => {
131-
unreachable!("IOCP thread should not exit")
132-
}
133-
},
134-
}
135-
} else {
136-
self.receiver.recv().expect("IOCP thread should not exit")
137-
};
138-
Ok(Some(e)
139-
.into_iter()
140-
.chain(std::iter::from_fn(|| match self.receiver.try_recv() {
141-
Ok(e) => Some(e),
142-
Err(e) => match e {
143-
TryRecvError::Empty => None,
144-
TryRecvError::Disconnected => unreachable!("IOCP thread should not exit"),
145-
},
146-
})))
105+
self.port.poll(timeout, None)
147106
}
148107
}
149108

150-
impl Drop for Port {
151-
fn drop(&mut self) {
152-
self.port.deregister(self.id);
109+
impl AsRawHandle for Port {
110+
fn as_raw_handle(&self) -> RawHandle {
111+
self.port.as_raw_handle()
153112
}
154113
}
155114

@@ -170,18 +129,3 @@ impl PortHandle {
170129
self.port.post(res, optr)
171130
}
172131
}
173-
174-
/// The unique ID of IOCP driver.
175-
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
176-
pub struct PortId(usize);
177-
178-
impl PortId {
179-
/// Post raw entry to IOCP.
180-
pub fn post_raw(&self, transferred: u32, key: usize, optr: *mut OVERLAPPED) -> io::Result<()> {
181-
syscall!(
182-
BOOL,
183-
PostQueuedCompletionStatus(iocp_port()?.as_raw_handle() as _, transferred, key, optr)
184-
)?;
185-
Ok(())
186-
}
187-
}

compio-driver/src/iocp/cp/mod.rs

Lines changed: 23 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ use compio_log::*;
99
use windows_sys::Win32::{
1010
Foundation::{
1111
RtlNtStatusToDosError, ERROR_BAD_COMMAND, ERROR_HANDLE_EOF, ERROR_IO_INCOMPLETE,
12-
ERROR_NO_DATA, FACILITY_NTWIN32, INVALID_HANDLE_VALUE, NTSTATUS, STATUS_PENDING,
12+
ERROR_NO_DATA, FACILITY_NTWIN32, HANDLE, INVALID_HANDLE_VALUE, NTSTATUS, STATUS_PENDING,
1313
STATUS_SUCCESS,
1414
},
1515
Storage::FileSystem::SetFileCompletionNotificationModes,
@@ -86,13 +86,10 @@ impl CompletionPort {
8686
Ok(())
8787
}
8888

89-
// If current_driver is specified, any entry that doesn't belong the driver will
90-
// be reposted. The driver id will be used as IOCP handle.
91-
pub fn poll(
89+
pub fn poll_raw(
9290
&self,
9391
timeout: Option<Duration>,
94-
current_driver: Option<PortId>,
95-
) -> io::Result<impl Iterator<Item = (PortId, Entry)>> {
92+
) -> io::Result<impl Iterator<Item = OVERLAPPED_ENTRY>> {
9693
const DEFAULT_CAPACITY: usize = 1024;
9794

9895
let mut entries = ArrayVec::<OVERLAPPED_ENTRY, { DEFAULT_CAPACITY }>::new();
@@ -115,22 +112,33 @@ impl CompletionPort {
115112
trace!("recv_count: {recv_count}");
116113
unsafe { entries.set_len(recv_count as _) };
117114

118-
Ok(entries.into_iter().map(move |entry| {
119-
let transferred = entry.dwNumberOfBytesTransferred;
120-
trace!("entry transferred: {transferred}");
115+
Ok(entries.into_iter())
116+
}
117+
118+
// If current_driver is specified, any entry that doesn't belong the driver will
119+
// be reposted. The driver id will be used as IOCP handle.
120+
pub fn poll(
121+
&self,
122+
timeout: Option<Duration>,
123+
current_driver: Option<HANDLE>,
124+
) -> io::Result<impl Iterator<Item = Entry>> {
125+
Ok(self.poll_raw(timeout)?.map(move |entry| {
121126
// Any thin pointer is OK because we don't use the type of opcode.
122127
let overlapped_ptr: *mut Overlapped<()> = entry.lpOverlapped.cast();
123128
let overlapped = unsafe { &*overlapped_ptr };
124129
if let Some(current_driver) = current_driver {
125130
if overlapped.driver != current_driver {
126-
overlapped
127-
.driver
128-
.post_raw(
131+
// Repose the entry to correct port.
132+
syscall!(
133+
BOOL,
134+
PostQueuedCompletionStatus(
135+
overlapped.driver,
129136
entry.dwNumberOfBytesTransferred,
130137
entry.lpCompletionKey,
131138
entry.lpOverlapped,
132139
)
133-
.ok();
140+
)
141+
.ok();
134142
}
135143
}
136144
let res = if matches!(
@@ -140,7 +148,7 @@ impl CompletionPort {
140148
if entry.lpCompletionKey != 0 {
141149
Ok(entry.lpCompletionKey)
142150
} else {
143-
Ok(transferred as _)
151+
Ok(entry.dwNumberOfBytesTransferred as _)
144152
}
145153
} else {
146154
let error = unsafe { RtlNtStatusToDosError(overlapped.base.Internal as _) };
@@ -149,7 +157,7 @@ impl CompletionPort {
149157
_ => Err(io::Error::from_raw_os_error(error as _)),
150158
}
151159
};
152-
(overlapped.driver, Entry::new(overlapped.user_data, res))
160+
Entry::new(overlapped.user_data, res)
153161
}))
154162
}
155163
}

compio-driver/src/iocp/cp/multi.rs

Lines changed: 14 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,12 @@
1-
use std::{io, os::windows::io::AsRawHandle, sync::Arc, time::Duration};
2-
3-
use windows_sys::Win32::{
4-
Foundation::HANDLE,
5-
System::IO::{PostQueuedCompletionStatus, OVERLAPPED},
1+
use std::{
2+
io,
3+
os::windows::io::{AsRawHandle, RawHandle},
4+
sync::Arc,
5+
time::Duration,
66
};
77

88
use super::CompletionPort;
9-
use crate::{syscall, Entry, Overlapped, RawFd};
9+
use crate::{Entry, Overlapped, RawFd};
1010

1111
pub struct Port {
1212
port: Arc<CompletionPort>,
@@ -19,10 +19,6 @@ impl Port {
1919
})
2020
}
2121

22-
pub fn id(&self) -> PortId {
23-
PortId(self.port.as_raw_handle() as _)
24-
}
25-
2622
pub fn attach(&mut self, fd: RawFd) -> io::Result<()> {
2723
self.port.attach(fd)
2824
}
@@ -32,14 +28,14 @@ impl Port {
3228
}
3329

3430
pub fn poll(&self, timeout: Option<Duration>) -> io::Result<impl Iterator<Item = Entry> + '_> {
35-
let current_id = self.id();
36-
self.port.poll(timeout, Some(current_id)).map(move |it| {
37-
it.filter_map(
38-
move |(id, entry)| {
39-
if id == current_id { Some(entry) } else { None }
40-
},
41-
)
42-
})
31+
let current_id = self.as_raw_handle() as _;
32+
self.port.poll(timeout, Some(current_id))
33+
}
34+
}
35+
36+
impl AsRawHandle for Port {
37+
fn as_raw_handle(&self) -> RawHandle {
38+
self.port.as_raw_handle()
4339
}
4440
}
4541

@@ -60,18 +56,3 @@ impl PortHandle {
6056
self.port.post(res, optr)
6157
}
6258
}
63-
64-
/// The unique ID of IOCP driver.
65-
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
66-
pub struct PortId(HANDLE);
67-
68-
impl PortId {
69-
/// Post raw entry to IOCP.
70-
pub fn post_raw(&self, transferred: u32, key: usize, optr: *mut OVERLAPPED) -> io::Result<()> {
71-
syscall!(
72-
BOOL,
73-
PostQueuedCompletionStatus(self.0, transferred, key, optr)
74-
)?;
75-
Ok(())
76-
}
77-
}

0 commit comments

Comments
 (0)