Skip to content

Commit 67895b4

Browse files
committed
basic support IOCP Operator
1 parent 1303607 commit 67895b4

File tree

15 files changed

+1067
-52
lines changed

15 files changed

+1067
-52
lines changed

.github/workflows/ci-preemptive.sh

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,3 +34,13 @@ if [ "${TARGET}" = "x86_64-unknown-linux-gnu" ]; then
3434
"${CARGO}" test --target "${TARGET}" --no-default-features --features io_uring,preemptive,ci
3535
"${CARGO}" test --target "${TARGET}" --no-default-features --features io_uring,preemptive,ci --release
3636
fi
37+
38+
# test IOCP
39+
if [ "${OS}" = "windows-latest" ]; then
40+
cd "${PROJECT_DIR}"/core
41+
"${CARGO}" test --target "${TARGET}" --no-default-features --features iocp,preemptive,ci
42+
"${CARGO}" test --target "${TARGET}" --no-default-features --features iocp,preemptive,ci --release
43+
cd "${PROJECT_DIR}"/open-coroutine
44+
"${CARGO}" test --target "${TARGET}" --no-default-features --features iocp,preemptive,ci
45+
"${CARGO}" test --target "${TARGET}" --no-default-features --features iocp,preemptive,ci --release
46+
fi

.github/workflows/ci.sh

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,3 +34,13 @@ if [ "${TARGET}" = "x86_64-unknown-linux-gnu" ]; then
3434
"${CARGO}" test --target "${TARGET}" --no-default-features --features io_uring,ci
3535
"${CARGO}" test --target "${TARGET}" --no-default-features --features io_uring,ci --release
3636
fi
37+
38+
# test IOCP
39+
if [ "${OS}" = "windows-latest" ]; then
40+
cd "${PROJECT_DIR}"/core
41+
"${CARGO}" test --target "${TARGET}" --no-default-features --features iocp,ci
42+
"${CARGO}" test --target "${TARGET}" --no-default-features --features iocp,ci --release
43+
cd "${PROJECT_DIR}"/open-coroutine
44+
"${CARGO}" test --target "${TARGET}" --no-default-features --features iocp,ci
45+
"${CARGO}" test --target "${TARGET}" --no-default-features --features iocp,ci --release
46+
fi

core/Cargo.toml

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@ windows-sys = { workspace = true, features = [
5858
"Win32_Networking_WinSock",
5959
"Win32_System_SystemInformation",
6060
"Win32_System_Diagnostics_Debug",
61+
"Win32_System_WindowsProgramming",
6162
] }
6263
polling = { workspace = true, optional = true }
6364

@@ -95,5 +96,11 @@ net = ["korosensei", "polling", "mio", "crossbeam-utils", "core_affinity"]
9596
# Provide io_uring adaptation, this feature only works in linux.
9697
io_uring = ["net", "io-uring"]
9798

99+
# Provide IOCP adaptation, this feature only works in windows.
100+
iocp = ["net"]
101+
102+
# Provide completion IO adaptation
103+
completion_io = ["io_uring", "iocp"]
104+
98105
# Provide syscall implementation.
99106
syscall = ["net"]

core/src/net/event_loop.rs

Lines changed: 105 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -24,16 +24,35 @@ cfg_if::cfg_if! {
2424
}
2525
}
2626

27+
cfg_if::cfg_if! {
28+
if #[cfg(all(windows, feature = "iocp"))] {
29+
use dashmap::DashMap;
30+
use std::ffi::{c_longlong, c_uint};
31+
use windows_sys::core::{PCSTR, PSTR};
32+
use windows_sys::Win32::Networking::WinSock::{
33+
setsockopt, LPWSAOVERLAPPED_COMPLETION_ROUTINE, SEND_RECV_FLAGS, SOCKADDR, SOCKET, SOL_SOCKET,
34+
SO_UPDATE_ACCEPT_CONTEXT, WSABUF,
35+
};
36+
use windows_sys::Win32::System::IO::OVERLAPPED;
37+
}
38+
}
39+
2740
#[repr(C)]
2841
#[derive(Debug)]
2942
pub(crate) struct EventLoop<'e> {
3043
stop: Arc<(Mutex<bool>, Condvar)>,
3144
shared_stop: Arc<(Mutex<AtomicUsize>, Condvar)>,
3245
cpu: usize,
33-
#[cfg(all(target_os = "linux", feature = "io_uring"))]
46+
#[cfg(any(
47+
all(target_os = "linux", feature = "io_uring"),
48+
all(windows, feature = "iocp")
49+
))]
3450
operator: crate::net::operator::Operator<'e>,
3551
#[allow(clippy::type_complexity)]
36-
#[cfg(all(target_os = "linux", feature = "io_uring"))]
52+
#[cfg(any(
53+
all(target_os = "linux", feature = "io_uring"),
54+
all(windows, feature = "iocp")
55+
))]
3756
syscall_wait_table: DashMap<usize, Arc<(Mutex<Option<c_longlong>>, Condvar)>>,
3857
selector: Poller,
3958
pool: CoroutinePool<'e>,
@@ -87,9 +106,15 @@ impl<'e> EventLoop<'e> {
87106
stop: Arc::new((Mutex::new(false), Condvar::new())),
88107
shared_stop,
89108
cpu,
90-
#[cfg(all(target_os = "linux", feature = "io_uring"))]
109+
#[cfg(any(
110+
all(target_os = "linux", feature = "io_uring"),
111+
all(windows, feature = "iocp")
112+
))]
91113
operator: crate::net::operator::Operator::new(cpu)?,
92-
#[cfg(all(target_os = "linux", feature = "io_uring"))]
114+
#[cfg(any(
115+
all(target_os = "linux", feature = "io_uring"),
116+
all(windows, feature = "iocp")
117+
))]
93118
syscall_wait_table: DashMap::new(),
94119
selector: Poller::new()?,
95120
pool: CoroutinePool::new(name, stack_size, min_size, max_size, keep_alive_time),
@@ -222,6 +247,8 @@ impl<'e> EventLoop<'e> {
222247
cfg_if::cfg_if! {
223248
if #[cfg(all(target_os = "linux", feature = "io_uring"))] {
224249
left_time = self.adapt_io_uring(left_time)?;
250+
} else if #[cfg(all(windows, feature = "iocp"))] {
251+
left_time = self.adapt_iocp(left_time)?;
225252
}
226253
}
227254

@@ -267,6 +294,52 @@ impl<'e> EventLoop<'e> {
267294
Ok(left_time)
268295
}
269296

297+
#[cfg(all(windows, feature = "iocp"))]
298+
fn adapt_iocp(&self, mut left_time: Option<Duration>) -> std::io::Result<Option<Duration>> {
299+
// use IOCP
300+
let (count, mut cq, left) = self.operator.select(left_time, 0)?;
301+
if count > 0 {
302+
for cqe in &mut cq {
303+
let token = cqe.token;
304+
let bytes_transferred = cqe.bytes_transferred;
305+
// resolve completed read/write tasks
306+
// todo refactor IOCP impl
307+
let result = match cqe.syscall_name {
308+
SyscallName::accept => unsafe {
309+
if setsockopt(
310+
cqe.socket,
311+
SOL_SOCKET,
312+
SO_UPDATE_ACCEPT_CONTEXT,
313+
std::ptr::from_ref(&cqe.from_fd).cast(),
314+
c_int::try_from(size_of::<SOCKET>()).expect("overflow"),
315+
) == 0
316+
{
317+
cqe.socket.try_into().expect("result overflow")
318+
} else {
319+
-c_longlong::from(windows_sys::Win32::Foundation::GetLastError())
320+
}
321+
},
322+
SyscallName::recv
323+
| SyscallName::WSARecv
324+
| SyscallName::send
325+
| SyscallName::WSASend => bytes_transferred.into(),
326+
_ => panic!("unsupported"),
327+
};
328+
if let Some((_, pair)) = self.syscall_wait_table.remove(&token) {
329+
let (lock, cvar) = &*pair;
330+
let mut pending = lock.lock().expect("lock failed");
331+
*pending = Some(result);
332+
cvar.notify_one();
333+
}
334+
unsafe { self.resume(token) };
335+
}
336+
}
337+
if left != left_time {
338+
left_time = Some(left.unwrap_or(Duration::ZERO));
339+
}
340+
Ok(left_time)
341+
}
342+
270343
unsafe fn resume(&self, token: usize) {
271344
if COROUTINE_TOKENS.remove(&token).is_none() {
272345
return;
@@ -446,6 +519,34 @@ impl_io_uring!(mkdirat(dirfd: c_int, pathname: *const c_char, mode: mode_t) -> c
446519
impl_io_uring!(renameat(olddirfd: c_int, oldpath: *const c_char, newdirfd: c_int, newpath: *const c_char) -> c_int);
447520
impl_io_uring!(renameat2(olddirfd: c_int, oldpath: *const c_char, newdirfd: c_int, newpath: *const c_char, flags: c_uint) -> c_int);
448521

522+
macro_rules! impl_iocp {
523+
( $syscall: ident($($arg: ident : $arg_type: ty),*) -> $result: ty ) => {
524+
#[cfg(all(windows, feature = "iocp"))]
525+
impl EventLoop<'_> {
526+
#[allow(non_snake_case, clippy::too_many_arguments)]
527+
pub(super) fn $syscall(
528+
&self,
529+
$($arg: $arg_type),*
530+
) -> std::io::Result<Arc<(Mutex<Option<c_longlong>>, Condvar)>> {
531+
let token = EventLoop::token(SyscallName::$syscall);
532+
self.operator.$syscall(token, $($arg, )*)?;
533+
let arc = Arc::new((Mutex::new(None), Condvar::new()));
534+
assert!(
535+
self.syscall_wait_table.insert(token, arc.clone()).is_none(),
536+
"The previous token was not retrieved in a timely manner"
537+
);
538+
Ok(arc)
539+
}
540+
}
541+
}
542+
}
543+
544+
impl_iocp!(accept(fd: SOCKET, addr: *mut SOCKADDR, len: *mut c_int) -> c_int);
545+
impl_iocp!(recv(fd: SOCKET, buf: PSTR, len: c_int, flags: SEND_RECV_FLAGS) -> c_int);
546+
impl_iocp!(WSARecv(fd: SOCKET, buf: *const WSABUF, dwbuffercount: c_uint, lpnumberofbytesrecvd: *mut c_uint, lpflags : *mut c_uint, lpoverlapped: *mut OVERLAPPED, lpcompletionroutine : LPWSAOVERLAPPED_COMPLETION_ROUTINE) -> c_int);
547+
impl_iocp!(send(fd: SOCKET, buf: PCSTR, len: c_int, flags: SEND_RECV_FLAGS) -> c_int);
548+
impl_iocp!(WSASend(fd: SOCKET, buf: *const WSABUF, dwbuffercount: c_uint, lpnumberofbytesrecvd: *mut c_uint, dwflags : c_uint, lpoverlapped: *mut OVERLAPPED, lpcompletionroutine : LPWSAOVERLAPPED_COMPLETION_ROUTINE) -> c_int);
549+
449550
#[cfg(all(test, not(all(unix, feature = "preemptive"))))]
450551
mod tests {
451552
use crate::net::event_loop::EventLoop;

core/src/net/mod.rs

Lines changed: 36 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,13 +18,27 @@ cfg_if::cfg_if! {
1818
}
1919
}
2020

21+
cfg_if::cfg_if! {
22+
if #[cfg(all(windows, feature = "iocp"))] {
23+
use std::ffi::c_uint;
24+
use windows_sys::core::{PCSTR, PSTR};
25+
use windows_sys::Win32::Networking::WinSock::{
26+
LPWSAOVERLAPPED_COMPLETION_ROUTINE, SEND_RECV_FLAGS, SOCKADDR, SOCKET, WSABUF,
27+
};
28+
use windows_sys::Win32::System::IO::OVERLAPPED;
29+
}
30+
}
31+
2132
/// 做C兼容时会用到
2233
pub type UserFunc = extern "C" fn(usize) -> usize;
2334

2435
mod selector;
2536

2637
#[allow(clippy::too_many_arguments)]
27-
#[cfg(all(target_os = "linux", feature = "io_uring"))]
38+
#[cfg(any(
39+
all(target_os = "linux", feature = "io_uring"),
40+
all(windows, feature = "iocp")
41+
))]
2842
mod operator;
2943

3044
#[allow(missing_docs)]
@@ -280,3 +294,24 @@ impl_io_uring!(fsync(fd: c_int) -> c_int);
280294
impl_io_uring!(mkdirat(dirfd: c_int, pathname: *const c_char, mode: mode_t) -> c_int);
281295
impl_io_uring!(renameat(olddirfd: c_int, oldpath: *const c_char, newdirfd: c_int, newpath: *const c_char) -> c_int);
282296
impl_io_uring!(renameat2(olddirfd: c_int, oldpath: *const c_char, newdirfd: c_int, newpath: *const c_char, flags: c_uint) -> c_int);
297+
298+
macro_rules! impl_iocp {
299+
( $syscall: ident($($arg: ident : $arg_type: ty),*) -> $result: ty ) => {
300+
#[allow(non_snake_case)]
301+
#[cfg(all(windows, feature = "iocp"))]
302+
impl EventLoops {
303+
#[allow(missing_docs)]
304+
pub fn $syscall(
305+
$($arg: $arg_type),*
306+
) -> std::io::Result<Arc<(Mutex<Option<c_longlong>>, Condvar)>> {
307+
Self::event_loop().$syscall($($arg, )*)
308+
}
309+
}
310+
}
311+
}
312+
313+
impl_iocp!(accept(fd: SOCKET, addr: *mut SOCKADDR, len: *mut c_int) -> c_int);
314+
impl_iocp!(recv(fd: SOCKET, buf: PSTR, len: c_int, flags: SEND_RECV_FLAGS) -> c_int);
315+
impl_iocp!(WSARecv(fd: SOCKET, buf: *const WSABUF, dwbuffercount: c_uint, lpnumberofbytesrecvd: *mut c_uint, lpflags : *mut c_uint, lpoverlapped: *mut OVERLAPPED, lpcompletionroutine : LPWSAOVERLAPPED_COMPLETION_ROUTINE) -> c_int);
316+
impl_iocp!(send(fd: SOCKET, buf: PCSTR, len: c_int, flags: SEND_RECV_FLAGS) -> c_int);
317+
impl_iocp!(WSASend(fd: SOCKET, buf: *const WSABUF, dwbuffercount: c_uint, lpnumberofbytesrecvd: *mut c_uint, dwflags : c_uint, lpoverlapped: *mut OVERLAPPED, lpcompletionroutine : LPWSAOVERLAPPED_COMPLETION_ROUTINE) -> c_int);

core/src/net/operator/mod.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,3 +2,9 @@
22
mod linux;
33
#[cfg(all(target_os = "linux", feature = "io_uring"))]
44
pub(crate) use linux::*;
5+
6+
#[allow(non_snake_case)]
7+
#[cfg(all(windows, feature = "iocp"))]
8+
mod windows;
9+
#[cfg(all(windows, feature = "iocp"))]
10+
pub(crate) use windows::*;

0 commit comments

Comments
 (0)