Skip to content

Commit be7cedc

Browse files
committed
feat(driver,iocp): add iocp-global feature
1 parent acc88f8 commit be7cedc

File tree

5 files changed

+407
-220
lines changed

5 files changed

+407
-220
lines changed

compio-driver/Cargo.toml

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ socket2 = { workspace = true }
4242
[target.'cfg(windows)'.dependencies]
4343
compio-buf = { workspace = true, features = ["arrayvec"] }
4444
aligned-array = "1.0.1"
45-
crossbeam-skiplist = { workspace = true }
45+
crossbeam-skiplist = { workspace = true, optional = true }
4646
once_cell = { workspace = true }
4747
widestring = { workspace = true }
4848
windows-sys = { workspace = true, features = [
@@ -71,6 +71,7 @@ polling = "3.3.0"
7171
os_pipe = { workspace = true }
7272

7373
[target.'cfg(unix)'.dependencies]
74+
crossbeam-channel = { workspace = true }
7475
crossbeam-queue = { workspace = true }
7576
libc = { workspace = true }
7677

@@ -84,6 +85,8 @@ polling = ["dep:polling", "dep:os_pipe"]
8485
io-uring-sqe128 = []
8586
io-uring-cqe32 = []
8687

88+
iocp-global = ["dep:crossbeam-skiplist"]
89+
8790
# Nightly features
8891
once_cell_try = []
8992
nightly = ["once_cell_try"]
Lines changed: 162 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,162 @@
1+
#[cfg(feature = "once_cell_try")]
2+
use std::sync::OnceLock;
3+
use std::{
4+
io,
5+
os::windows::io::{AsRawHandle, RawHandle},
6+
sync::atomic::{AtomicUsize, Ordering},
7+
time::Duration,
8+
};
9+
10+
use crossbeam_channel::{unbounded, Receiver, RecvTimeoutError, Sender, TryRecvError};
11+
use crossbeam_skiplist::SkipMap;
12+
#[cfg(not(feature = "once_cell_try"))]
13+
use once_cell::sync::OnceCell as OnceLock;
14+
use windows_sys::Win32::Foundation::ERROR_TIMEOUT;
15+
16+
use super::CompletionPort;
17+
use crate::{Entry, Overlapped, RawFd};
18+
19+
struct GlobalPort {
20+
port: CompletionPort,
21+
drivers: SkipMap<usize, Sender<Entry>>,
22+
}
23+
24+
impl GlobalPort {
25+
pub fn new() -> io::Result<Self> {
26+
Ok(Self {
27+
port: CompletionPort::new()?,
28+
drivers: SkipMap::new(),
29+
})
30+
}
31+
32+
pub fn register(&self, driver: usize) -> Receiver<Entry> {
33+
let (sender, receiver) = unbounded();
34+
self.drivers.insert(driver, sender);
35+
receiver
36+
}
37+
38+
pub fn attach(&self, fd: RawFd) -> io::Result<()> {
39+
self.port.attach(fd)
40+
}
41+
42+
pub fn post<T: ?Sized>(
43+
&self,
44+
res: io::Result<usize>,
45+
optr: *mut Overlapped<T>,
46+
) -> io::Result<()> {
47+
self.port.post(res, optr)
48+
}
49+
50+
pub fn push(&self, driver: usize, entry: Entry) {
51+
self.drivers
52+
.get(&driver)
53+
.expect("driver should register first")
54+
.value()
55+
.send(entry)
56+
.ok(); // It's OK if the driver has been dropped.
57+
}
58+
}
59+
60+
impl AsRawHandle for GlobalPort {
61+
fn as_raw_handle(&self) -> RawHandle {
62+
self.port.as_raw_handle()
63+
}
64+
}
65+
66+
static IOCP_PORT: OnceLock<GlobalPort> = OnceLock::new();
67+
68+
#[inline]
69+
fn iocp_port() -> io::Result<&'static GlobalPort> {
70+
IOCP_PORT.get_or_try_init(GlobalPort::new)
71+
}
72+
73+
fn iocp_start() -> io::Result<()> {
74+
let port = iocp_port()?;
75+
std::thread::spawn(move || {
76+
loop {
77+
for (driver, entry) in port.port.poll(None)? {
78+
port.push(driver, entry);
79+
}
80+
}
81+
#[allow(unreachable_code)]
82+
io::Result::Ok(())
83+
});
84+
Ok(())
85+
}
86+
87+
static DRIVER_COUNTER: AtomicUsize = AtomicUsize::new(0);
88+
static IOCP_INIT_ONCE: OnceLock<()> = OnceLock::new();
89+
90+
pub struct Port {
91+
id: usize,
92+
port: &'static GlobalPort,
93+
receiver: Receiver<Entry>,
94+
}
95+
96+
impl Port {
97+
pub fn new() -> io::Result<Self> {
98+
IOCP_INIT_ONCE.get_or_try_init(iocp_start)?;
99+
100+
let id = DRIVER_COUNTER.fetch_add(1, Ordering::AcqRel);
101+
let port = iocp_port()?;
102+
let receiver = port.register(id);
103+
Ok(Self { id, port, receiver })
104+
}
105+
106+
pub fn id(&self) -> usize {
107+
self.id
108+
}
109+
110+
pub fn attach(&mut self, fd: RawFd) -> io::Result<()> {
111+
self.port.attach(fd)
112+
}
113+
114+
pub fn handle(&self) -> PortHandle {
115+
PortHandle::new(self.port)
116+
}
117+
118+
pub fn poll(&self, timeout: Option<Duration>) -> io::Result<impl Iterator<Item = Entry> + '_> {
119+
let e = if let Some(timeout) = timeout {
120+
match self.receiver.recv_timeout(timeout) {
121+
Ok(e) => e,
122+
Err(e) => match e {
123+
RecvTimeoutError::Timeout => {
124+
return Err(io::Error::from_raw_os_error(ERROR_TIMEOUT as _));
125+
}
126+
RecvTimeoutError::Disconnected => {
127+
unreachable!("IOCP thread should not exit")
128+
}
129+
},
130+
}
131+
} else {
132+
self.receiver.recv().expect("IOCP thread should not exit")
133+
};
134+
Ok(Some(e)
135+
.into_iter()
136+
.chain(std::iter::from_fn(|| match self.receiver.try_recv() {
137+
Ok(e) => Some(e),
138+
Err(e) => match e {
139+
TryRecvError::Empty => None,
140+
TryRecvError::Disconnected => unreachable!("IOCP thread should not exit"),
141+
},
142+
})))
143+
}
144+
}
145+
146+
pub struct PortHandle {
147+
port: &'static GlobalPort,
148+
}
149+
150+
impl PortHandle {
151+
fn new(port: &'static GlobalPort) -> Self {
152+
Self { port }
153+
}
154+
155+
pub fn post<T: ?Sized>(
156+
&self,
157+
res: io::Result<usize>,
158+
optr: *mut Overlapped<T>,
159+
) -> io::Result<()> {
160+
self.port.post(res, optr)
161+
}
162+
}

compio-driver/src/iocp/cp/mod.rs

Lines changed: 155 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,155 @@
1+
use std::{
2+
io,
3+
os::windows::io::{AsRawHandle, FromRawHandle, OwnedHandle, RawHandle},
4+
time::Duration,
5+
};
6+
7+
use compio_buf::arrayvec::ArrayVec;
8+
use compio_log::*;
9+
use windows_sys::Win32::{
10+
Foundation::{
11+
RtlNtStatusToDosError, ERROR_BAD_COMMAND, ERROR_HANDLE_EOF, ERROR_IO_INCOMPLETE,
12+
ERROR_NO_DATA, FACILITY_NTWIN32, INVALID_HANDLE_VALUE, NTSTATUS, STATUS_PENDING,
13+
STATUS_SUCCESS,
14+
},
15+
Storage::FileSystem::SetFileCompletionNotificationModes,
16+
System::{
17+
SystemServices::ERROR_SEVERITY_ERROR,
18+
Threading::INFINITE,
19+
WindowsProgramming::{FILE_SKIP_COMPLETION_PORT_ON_SUCCESS, FILE_SKIP_SET_EVENT_ON_HANDLE},
20+
IO::{
21+
CreateIoCompletionPort, GetQueuedCompletionStatusEx, PostQueuedCompletionStatus,
22+
OVERLAPPED_ENTRY,
23+
},
24+
},
25+
};
26+
27+
use crate::{syscall, Entry, Overlapped, RawFd};
28+
29+
cfg_if::cfg_if! {
30+
if #[cfg(feature = "iocp-global")] {
31+
mod global;
32+
pub use global::*;
33+
} else {
34+
mod multi;
35+
pub use multi::*;
36+
}
37+
}
38+
39+
struct CompletionPort {
40+
port: OwnedHandle,
41+
}
42+
43+
impl CompletionPort {
44+
pub fn new() -> io::Result<Self> {
45+
let port = syscall!(BOOL, CreateIoCompletionPort(INVALID_HANDLE_VALUE, 0, 0, 1))?;
46+
trace!("new iocp handle: {port}");
47+
let port = unsafe { OwnedHandle::from_raw_handle(port as _) };
48+
Ok(Self { port })
49+
}
50+
51+
pub fn attach(&self, fd: RawFd) -> io::Result<()> {
52+
syscall!(
53+
BOOL,
54+
CreateIoCompletionPort(fd as _, self.port.as_raw_handle() as _, 0, 0)
55+
)?;
56+
syscall!(
57+
BOOL,
58+
SetFileCompletionNotificationModes(
59+
fd as _,
60+
(FILE_SKIP_COMPLETION_PORT_ON_SUCCESS | FILE_SKIP_SET_EVENT_ON_HANDLE) as _
61+
)
62+
)?;
63+
Ok(())
64+
}
65+
66+
pub fn post<T: ?Sized>(
67+
&self,
68+
res: io::Result<usize>,
69+
optr: *mut Overlapped<T>,
70+
) -> io::Result<()> {
71+
if let Err(e) = &res {
72+
let code = e.raw_os_error().unwrap_or(ERROR_BAD_COMMAND as _);
73+
unsafe { &mut *optr }.base.Internal = ntstatus_from_win32(code) as _;
74+
}
75+
// We have to use CompletionKey to transfer the result because it is large
76+
// enough. It is OK because we set it to zero when attaching handles to IOCP.
77+
syscall!(
78+
BOOL,
79+
PostQueuedCompletionStatus(
80+
self.port.as_raw_handle() as _,
81+
0,
82+
res.unwrap_or_default(),
83+
optr.cast()
84+
)
85+
)?;
86+
Ok(())
87+
}
88+
89+
pub fn poll(
90+
&self,
91+
timeout: Option<Duration>,
92+
) -> io::Result<impl Iterator<Item = (usize, Entry)>> {
93+
const DEFAULT_CAPACITY: usize = 1024;
94+
95+
let mut entries = ArrayVec::<OVERLAPPED_ENTRY, { DEFAULT_CAPACITY }>::new();
96+
let mut recv_count = 0;
97+
let timeout = match timeout {
98+
Some(timeout) => timeout.as_millis() as u32,
99+
None => INFINITE,
100+
};
101+
syscall!(
102+
BOOL,
103+
GetQueuedCompletionStatusEx(
104+
self.port.as_raw_handle() as _,
105+
entries.as_mut_ptr(),
106+
DEFAULT_CAPACITY as _,
107+
&mut recv_count,
108+
timeout,
109+
0
110+
)
111+
)?;
112+
trace!("recv_count: {recv_count}");
113+
unsafe { entries.set_len(recv_count as _) };
114+
115+
Ok(entries.into_iter().map(|entry| {
116+
let transferred = entry.dwNumberOfBytesTransferred;
117+
trace!("entry transferred: {transferred}");
118+
// Any thin pointer is OK because we don't use the type of opcode.
119+
let overlapped_ptr: *mut Overlapped<()> = entry.lpOverlapped.cast();
120+
let overlapped = unsafe { &*overlapped_ptr };
121+
let res = if matches!(
122+
overlapped.base.Internal as NTSTATUS,
123+
STATUS_SUCCESS | STATUS_PENDING
124+
) {
125+
if entry.lpCompletionKey != 0 {
126+
Ok(entry.lpCompletionKey)
127+
} else {
128+
Ok(transferred as _)
129+
}
130+
} else {
131+
let error = unsafe { RtlNtStatusToDosError(overlapped.base.Internal as _) };
132+
match error {
133+
ERROR_IO_INCOMPLETE | ERROR_HANDLE_EOF | ERROR_NO_DATA => Ok(0),
134+
_ => Err(io::Error::from_raw_os_error(error as _)),
135+
}
136+
};
137+
(overlapped.driver, Entry::new(overlapped.user_data, res))
138+
}))
139+
}
140+
}
141+
142+
impl AsRawHandle for CompletionPort {
143+
fn as_raw_handle(&self) -> RawHandle {
144+
self.port.as_raw_handle()
145+
}
146+
}
147+
148+
#[inline]
149+
fn ntstatus_from_win32(x: i32) -> NTSTATUS {
150+
if x <= 0 {
151+
x
152+
} else {
153+
((x) & 0x0000FFFF) | (FACILITY_NTWIN32 << 16) as NTSTATUS | ERROR_SEVERITY_ERROR as NTSTATUS
154+
}
155+
}

0 commit comments

Comments
 (0)