Skip to content

Commit b63fc5c

Browse files
authored
Merge pull request #200 from Berrysoft/refactor/simplify-attacher
refactor: simplify attach logic
2 parents e1bcab5 + a9af894 commit b63fc5c

File tree

28 files changed

+708
-672
lines changed

28 files changed

+708
-672
lines changed

.github/workflows/ci_test.yml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,9 @@ jobs:
4343
target: "x86_64-pc-windows-gnu"
4444
- os: "windows-latest"
4545
target: "i686-pc-windows-msvc"
46+
- os: "windows-latest"
47+
target: "x86_64-pc-windows-msvc"
48+
features: "iocp-global"
4649
- os: "macos-12"
4750
- os: "macos-13"
4851
- os: "macos-14"

compio-dispatcher/tests/listener.rs

Lines changed: 6 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,10 @@
11
use std::{num::NonZeroUsize, panic::resume_unwind};
22

3-
use compio_buf::{arrayvec::ArrayVec, IntoInner};
3+
use compio_buf::arrayvec::ArrayVec;
44
use compio_dispatcher::Dispatcher;
55
use compio_io::{AsyncReadExt, AsyncWriteExt};
66
use compio_net::{TcpListener, TcpStream};
7-
use compio_runtime::{spawn, Unattached};
7+
use compio_runtime::spawn;
88
use futures_util::{stream::FuturesUnordered, StreamExt};
99

1010
#[compio_macros::test]
@@ -27,15 +27,11 @@ async fn listener_dispatch() {
2727
});
2828
let mut handles = FuturesUnordered::new();
2929
for _i in 0..CLIENT_NUM {
30-
let (srv, _) = listener.accept().await.unwrap();
31-
let srv = Unattached::new(srv).unwrap();
30+
let (mut srv, _) = listener.accept().await.unwrap();
3231
let handle = dispatcher
33-
.dispatch(move || {
34-
let mut srv = srv.into_inner();
35-
async move {
36-
let (_, buf) = srv.read_exact(ArrayVec::<u8, 12>::new()).await.unwrap();
37-
assert_eq!(buf.as_slice(), b"Hello world!");
38-
}
32+
.dispatch(move || async move {
33+
let (_, buf) = srv.read_exact(ArrayVec::<u8, 12>::new()).await.unwrap();
34+
assert_eq!(buf.as_slice(), b"Hello world!");
3935
})
4036
.unwrap();
4137
handles.push(handle.join());

compio-driver/Cargo.toml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,7 @@ polling = "3.3.0"
7070
os_pipe = { workspace = true }
7171

7272
[target.'cfg(unix)'.dependencies]
73+
crossbeam-channel = { workspace = true }
7374
crossbeam-queue = { workspace = true }
7475
libc = { workspace = true }
7576

@@ -83,6 +84,8 @@ polling = ["dep:polling", "dep:os_pipe"]
8384
io-uring-sqe128 = []
8485
io-uring-cqe32 = []
8586

87+
iocp-global = []
88+
8689
# Nightly features
8790
once_cell_try = []
8891
nightly = ["once_cell_try"]

compio-driver/src/fusion/mod.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -132,6 +132,13 @@ impl Driver {
132132
}
133133
}
134134

135+
pub fn create_op<T: OpCode + 'static>(&self, user_data: usize, op: T) -> RawOp {
136+
match &self.fuse {
137+
FuseDriver::Poll(driver) => driver.create_op(user_data, op),
138+
FuseDriver::IoUring(driver) => driver.create_op(user_data, op),
139+
}
140+
}
141+
135142
pub fn attach(&mut self, fd: RawFd) -> io::Result<()> {
136143
match &mut self.fuse {
137144
FuseDriver::Poll(driver) => driver.attach(fd),
Lines changed: 149 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,149 @@
1+
#[cfg(feature = "once_cell_try")]
2+
use std::sync::OnceLock;
3+
use std::{
4+
io,
5+
os::windows::io::{AsRawHandle, RawHandle},
6+
time::Duration,
7+
};
8+
9+
use compio_log::*;
10+
#[cfg(not(feature = "once_cell_try"))]
11+
use once_cell::sync::OnceCell as OnceLock;
12+
use windows_sys::Win32::System::IO::PostQueuedCompletionStatus;
13+
14+
use super::CompletionPort;
15+
use crate::{syscall, Entry, Overlapped, RawFd};
16+
17+
struct GlobalPort {
18+
port: CompletionPort,
19+
}
20+
21+
impl GlobalPort {
22+
pub fn new() -> io::Result<Self> {
23+
Ok(Self {
24+
port: CompletionPort::new()?,
25+
})
26+
}
27+
28+
pub fn attach(&self, fd: RawFd) -> io::Result<()> {
29+
self.port.attach(fd)
30+
}
31+
32+
pub fn post<T: ?Sized>(
33+
&self,
34+
res: io::Result<usize>,
35+
optr: *mut Overlapped<T>,
36+
) -> io::Result<()> {
37+
self.port.post(res, optr)
38+
}
39+
40+
pub fn post_raw<T: ?Sized>(&self, optr: *const Overlapped<T>) -> io::Result<()> {
41+
self.port.post_raw(optr)
42+
}
43+
}
44+
45+
impl AsRawHandle for GlobalPort {
46+
fn as_raw_handle(&self) -> RawHandle {
47+
self.port.as_raw_handle()
48+
}
49+
}
50+
51+
static IOCP_PORT: OnceLock<GlobalPort> = OnceLock::new();
52+
53+
#[inline]
54+
fn iocp_port() -> io::Result<&'static GlobalPort> {
55+
IOCP_PORT.get_or_try_init(GlobalPort::new)
56+
}
57+
58+
fn iocp_start() -> io::Result<()> {
59+
let port = iocp_port()?;
60+
std::thread::spawn(move || {
61+
instrument!(compio_log::Level::TRACE, "iocp_start");
62+
loop {
63+
for entry in port.port.poll_raw(None)? {
64+
// Any thin pointer is OK because we don't use the type of opcode.
65+
let overlapped_ptr: *mut Overlapped<()> = entry.lpOverlapped.cast();
66+
let overlapped = unsafe { &*overlapped_ptr };
67+
if let Err(_e) = syscall!(
68+
BOOL,
69+
PostQueuedCompletionStatus(
70+
overlapped.driver as _,
71+
entry.dwNumberOfBytesTransferred,
72+
entry.lpCompletionKey,
73+
entry.lpOverlapped,
74+
)
75+
) {
76+
error!(
77+
"fail to dispatch entry ({}, {}, {:p}) to driver {:p}: {:?}",
78+
entry.dwNumberOfBytesTransferred,
79+
entry.lpCompletionKey,
80+
entry.lpOverlapped,
81+
overlapped.driver,
82+
_e
83+
);
84+
}
85+
}
86+
}
87+
#[allow(unreachable_code)]
88+
io::Result::Ok(())
89+
});
90+
Ok(())
91+
}
92+
93+
static IOCP_INIT_ONCE: OnceLock<()> = OnceLock::new();
94+
95+
pub struct Port {
96+
port: CompletionPort,
97+
global_port: &'static GlobalPort,
98+
}
99+
100+
impl Port {
101+
pub fn new() -> io::Result<Self> {
102+
IOCP_INIT_ONCE.get_or_try_init(iocp_start)?;
103+
104+
Ok(Self {
105+
port: CompletionPort::new()?,
106+
global_port: iocp_port()?,
107+
})
108+
}
109+
110+
pub fn attach(&mut self, fd: RawFd) -> io::Result<()> {
111+
self.global_port.attach(fd)
112+
}
113+
114+
pub fn handle(&self) -> PortHandle {
115+
PortHandle::new(self.global_port)
116+
}
117+
118+
pub fn poll(&self, timeout: Option<Duration>) -> io::Result<impl Iterator<Item = Entry> + '_> {
119+
self.port.poll(timeout, None)
120+
}
121+
}
122+
123+
impl AsRawHandle for Port {
124+
fn as_raw_handle(&self) -> RawHandle {
125+
self.port.as_raw_handle()
126+
}
127+
}
128+
129+
pub struct PortHandle {
130+
port: &'static GlobalPort,
131+
}
132+
133+
impl PortHandle {
134+
fn new(port: &'static GlobalPort) -> Self {
135+
Self { port }
136+
}
137+
138+
pub fn post<T: ?Sized>(
139+
&self,
140+
res: io::Result<usize>,
141+
optr: *mut Overlapped<T>,
142+
) -> io::Result<()> {
143+
self.port.post(res, optr)
144+
}
145+
146+
pub fn post_raw<T: ?Sized>(&self, optr: *const Overlapped<T>) -> io::Result<()> {
147+
self.port.post_raw(optr)
148+
}
149+
}

0 commit comments

Comments
 (0)