Skip to content

Commit 9072d51

Browse files
committed
support IOCP accept
1 parent 540b816 commit 9072d51

File tree

23 files changed

+1307
-768
lines changed

23 files changed

+1307
-768
lines changed

.github/workflows/ci-preemptive.sh

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,3 +34,13 @@ if [ "${TARGET}" = "x86_64-unknown-linux-gnu" ]; then
3434
"${CARGO}" test --target "${TARGET}" --no-default-features --features io_uring,preemptive
3535
"${CARGO}" test --target "${TARGET}" --no-default-features --features io_uring,preemptive --release
3636
fi
37+
38+
# test IOCP
39+
if [ "${OS}" = "windows-latest" ]; then
40+
cd "${PROJECT_DIR}"/core
41+
"${CARGO}" test --target "${TARGET}" --no-default-features --features iocp,preemptive
42+
"${CARGO}" test --target "${TARGET}" --no-default-features --features iocp,preemptive --release
43+
cd "${PROJECT_DIR}"/open-coroutine
44+
"${CARGO}" test --target "${TARGET}" --no-default-features --features iocp,preemptive
45+
"${CARGO}" test --target "${TARGET}" --no-default-features --features iocp,preemptive --release
46+
fi

.github/workflows/ci.sh

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,3 +34,13 @@ if [ "${TARGET}" = "x86_64-unknown-linux-gnu" ]; then
3434
"${CARGO}" test --target "${TARGET}" --no-default-features --features io_uring
3535
"${CARGO}" test --target "${TARGET}" --no-default-features --features io_uring --release
3636
fi
37+
38+
# test IOCP
39+
if [ "${OS}" = "windows-latest" ]; then
40+
cd "${PROJECT_DIR}"/core
41+
"${CARGO}" test --target "${TARGET}" --no-default-features --features iocp
42+
"${CARGO}" test --target "${TARGET}" --no-default-features --features iocp --release
43+
cd "${PROJECT_DIR}"/open-coroutine
44+
"${CARGO}" test --target "${TARGET}" --no-default-features --features iocp
45+
"${CARGO}" test --target "${TARGET}" --no-default-features --features iocp --release
46+
fi

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ mio = { version = "1.0", default-features = false }
2828

2929
cfg-if = "1.0.0"
3030
polling = "2.8.0"
31+
educe = "0.6.0"
3132

3233
libc = "0.2"
3334
rand = "0.8"
@@ -50,7 +51,6 @@ once_cell = "1"
5051
dashmap = "6"
5152
num_cpus = "1"
5253
uuid = "1"
53-
derivative = "2"
5454
tempfile = "3"
5555
cc = "1"
5656
syn = "2"

core/Cargo.toml

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ uuid = { workspace = true, features = [
2929
"v4",
3030
"fast-rng",
3131
], optional = true }
32-
derivative = { workspace = true, optional = true }
32+
educe = { workspace = true, optional = true }
3333
core_affinity = { workspace = true, optional = true }
3434
crossbeam-utils = { workspace = true, optional = true }
3535
psm.workspace = true
@@ -77,7 +77,7 @@ backtrace.workspace = true
7777
log = ["tracing", "tracing-subscriber", "time"]
7878

7979
# low-level raw coroutine
80-
korosensei = ["corosensei", "uuid", "nix/pthread", "derivative"]
80+
korosensei = ["corosensei", "uuid", "nix/pthread", "educe"]
8181

8282
# Provide preemptive scheduling implementation.
8383
# Enable for default.
@@ -89,6 +89,12 @@ net = ["korosensei", "polling", "mio", "crossbeam-utils", "core_affinity"]
8989
# Provide io_uring adaptation, this feature only works in linux.
9090
io_uring = ["net", "io-uring"]
9191

92+
# Provide IOCP adaptation, this feature only works in windows.
93+
iocp = ["net"]
94+
95+
# Provide completion IOCP adaptation
96+
completion_io = ["io_uring", "iocp"]
97+
9298
# Provide syscall implementation.
9399
syscall = ["net"]
94100

core/src/co_pool/task.rs

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,15 @@
11
use crate::catch;
2-
use derivative::Derivative;
32

43
/// 做C兼容时会用到
54
pub type UserTaskFunc = extern "C" fn(usize) -> usize;
65

76
/// The task impls.
87
#[repr(C)]
9-
#[derive(Derivative)]
10-
#[derivative(Debug)]
8+
#[derive(educe::Educe)]
9+
#[educe(Debug)]
1110
pub struct Task<'t> {
1211
name: String,
13-
#[derivative(Debug = "ignore")]
12+
#[educe(Debug(ignore))]
1413
func: Box<dyn FnOnce(Option<usize>) -> Option<usize> + 't>,
1514
param: Option<usize>,
1615
}

core/src/common/timer.rs

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
use crate::impl_display_by_debug;
2-
use derivative::Derivative;
32
use std::collections::{BTreeMap, VecDeque};
43
use std::ops::{Deref, DerefMut};
54
use std::sync::atomic::{AtomicUsize, Ordering};
@@ -62,11 +61,11 @@ impl_display_by_debug!(TimerEntry<T>);
6261

6362
/// A queue for managing multiple `TimerEntry`.
6463
#[repr(C)]
65-
#[derive(Derivative)]
66-
#[derivative(Debug, Eq, PartialEq)]
64+
#[derive(educe::Educe)]
65+
#[educe(Debug, Eq, PartialEq)]
6766
pub struct TimerList<T> {
6867
inner: BTreeMap<u64, TimerEntry<T>>,
69-
#[derivative(PartialEq = "ignore")]
68+
#[educe(PartialEq(ignore))]
7069
total: AtomicUsize,
7170
}
7271

core/src/coroutine/suspender.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ use std::collections::VecDeque;
55
use std::time::Duration;
66

77
thread_local! {
8+
#[allow(clippy::missing_const_for_thread_local)]
89
static TIMESTAMP: RefCell<VecDeque<u64>> = const { RefCell::new(VecDeque::new()) };
910
}
1011

@@ -52,14 +53,13 @@ pub use korosensei::Suspender;
5253
#[cfg(feature = "korosensei")]
5354
mod korosensei {
5455
use corosensei::Yielder;
55-
use derivative::Derivative;
5656

5757
/// Ths suspender implemented for coroutine.
5858
#[repr(C)]
59-
#[derive(Derivative)]
60-
#[derivative(Debug)]
59+
#[derive(educe::Educe)]
60+
#[educe(Debug)]
6161
pub struct Suspender<'s, Param, Yield> {
62-
#[derivative(Debug = "ignore")]
62+
#[educe(Debug(ignore))]
6363
inner: &'s Yielder<Param, Yield>,
6464
}
6565

core/src/net/event_loop.rs

Lines changed: 110 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@ use crate::scheduler::SchedulableCoroutine;
66
use crate::{error, impl_current_for, impl_display_by_debug, info};
77
use crossbeam_utils::atomic::AtomicCell;
88
use dashmap::DashSet;
9+
#[cfg(all(target_os = "linux", feature = "io_uring"))]
10+
use libc::{epoll_event, iovec, msghdr, off_t, size_t, sockaddr, socklen_t, ssize_t};
911
use once_cell::sync::Lazy;
1012
use rand::Rng;
1113
use std::ffi::{c_char, c_int, c_void, CStr, CString};
@@ -16,11 +18,15 @@ use std::sync::atomic::{AtomicUsize, Ordering};
1618
use std::sync::{Arc, Condvar, Mutex};
1719
use std::thread::JoinHandle;
1820
use std::time::Duration;
21+
#[cfg(all(windows, feature = "iocp"))]
22+
use windows_sys::Win32::Networking::WinSock::{
23+
setsockopt, SOCKADDR, SOCKET, SOL_SOCKET, SO_UPDATE_ACCEPT_CONTEXT,
24+
};
1925

2026
cfg_if::cfg_if! {
21-
if #[cfg(all(target_os = "linux", feature = "io_uring"))] {
22-
use libc::{epoll_event, iovec, msghdr, off_t, size_t, sockaddr, socklen_t, ssize_t};
27+
if #[cfg(any(all(target_os = "linux", feature = "io_uring"), all(windows, feature = "iocp")))] {
2328
use dashmap::DashMap;
29+
use std::ffi::c_longlong;
2430
}
2531
}
2632

@@ -32,11 +38,17 @@ pub(crate) struct EventLoop<'e> {
3238
stop: Arc<(Mutex<bool>, Condvar)>,
3339
shared_stop: Arc<(Mutex<AtomicUsize>, Condvar)>,
3440
cpu: usize,
35-
#[cfg(all(target_os = "linux", feature = "io_uring"))]
41+
#[cfg(any(
42+
all(target_os = "linux", feature = "io_uring"),
43+
all(windows, feature = "iocp")
44+
))]
3645
operator: crate::net::operator::Operator<'e>,
3746
#[allow(clippy::type_complexity)]
38-
#[cfg(all(target_os = "linux", feature = "io_uring"))]
39-
syscall_wait_table: DashMap<usize, Arc<(Mutex<Option<ssize_t>>, Condvar)>>,
47+
#[cfg(any(
48+
all(target_os = "linux", feature = "io_uring"),
49+
all(windows, feature = "iocp")
50+
))]
51+
syscall_wait_table: DashMap<usize, Arc<(Mutex<Option<c_longlong>>, Condvar)>>,
4052
selector: Poller,
4153
pool: CoroutinePool<'e>,
4254
phantom_data: PhantomData<&'e EventLoop<'e>>,
@@ -90,9 +102,15 @@ impl<'e> EventLoop<'e> {
90102
stop: Arc::new((Mutex::new(false), Condvar::new())),
91103
shared_stop,
92104
cpu,
93-
#[cfg(all(target_os = "linux", feature = "io_uring"))]
105+
#[cfg(any(
106+
all(target_os = "linux", feature = "io_uring"),
107+
all(windows, feature = "iocp")
108+
))]
94109
operator: crate::net::operator::Operator::new(cpu)?,
95-
#[cfg(all(target_os = "linux", feature = "io_uring"))]
110+
#[cfg(any(
111+
all(target_os = "linux", feature = "io_uring"),
112+
all(windows, feature = "iocp")
113+
))]
96114
syscall_wait_table: DashMap::new(),
97115
selector: Poller::new()?,
98116
pool: CoroutinePool::new(name, stack_size, min_size, max_size, keep_alive_time),
@@ -223,7 +241,29 @@ impl<'e> EventLoop<'e> {
223241
}
224242
}
225243

226-
#[cfg(all(target_os = "linux", feature = "io_uring"))]
244+
cfg_if::cfg_if! {
245+
if #[cfg(all(target_os = "linux", feature = "io_uring"))] {
246+
left_time = self.adapt_io_uring(left_time)?;
247+
} else if #[cfg(all(windows, feature = "iocp"))] {
248+
left_time = self.adapt_iocp(left_time)?;
249+
}
250+
}
251+
252+
// use epoll/kevent/iocp
253+
let mut events = Events::with_capacity(1024);
254+
self.selector.select(&mut events, left_time)?;
255+
#[allow(clippy::explicit_iter_loop)]
256+
for event in events.iter() {
257+
let token = event.get_token();
258+
if event.readable() || event.writable() {
259+
unsafe { self.resume(token) };
260+
}
261+
}
262+
Ok(())
263+
}
264+
265+
#[cfg(all(target_os = "linux", feature = "io_uring"))]
266+
fn adapt_io_uring(&self, mut left_time: Option<Duration>) -> std::io::Result<Option<Duration>> {
227267
if crate::net::operator::support_io_uring() {
228268
// use io_uring
229269
let (count, mut cq, left) = self.operator.select(left_time, 0)?;
@@ -239,7 +279,7 @@ impl<'e> EventLoop<'e> {
239279
if let Some((_, pair)) = self.syscall_wait_table.remove(&token) {
240280
let (lock, cvar) = &*pair;
241281
let mut pending = lock.lock().expect("lock failed");
242-
*pending = Some(result);
282+
*pending = Some(result.try_into().expect("result overflow"));
243283
cvar.notify_one();
244284
}
245285
unsafe { self.resume(token) };
@@ -249,18 +289,47 @@ impl<'e> EventLoop<'e> {
249289
left_time = Some(left.unwrap_or(Duration::ZERO));
250290
}
251291
}
292+
Ok(left_time)
293+
}
252294

253-
// use epoll/kevent/iocp
254-
let mut events = Events::with_capacity(1024);
255-
self.selector.select(&mut events, left_time)?;
256-
#[allow(clippy::explicit_iter_loop)]
257-
for event in events.iter() {
258-
let token = event.get_token();
259-
if event.readable() || event.writable() {
295+
#[cfg(all(windows, feature = "iocp"))]
296+
fn adapt_iocp(&self, mut left_time: Option<Duration>) -> std::io::Result<Option<Duration>> {
297+
// use IOCP
298+
let (count, mut cq, left) = self.operator.select(left_time, 0)?;
299+
if count > 0 {
300+
for cqe in &mut cq {
301+
let token = cqe.token;
302+
// resolve completed read/write tasks
303+
// todo refactor IOCP impl
304+
let result = match cqe.syscall {
305+
Syscall::accept => {
306+
unsafe {
307+
_ = setsockopt(
308+
cqe.socket,
309+
SOL_SOCKET,
310+
SO_UPDATE_ACCEPT_CONTEXT,
311+
(&cqe.from_fd as *const SOCKET).cast(),
312+
size_of::<SOCKET>() as c_int,
313+
)
314+
};
315+
cqe.socket.try_into().expect("result overflow")
316+
}
317+
_ => panic!("unsupported"),
318+
};
319+
eprintln!("IOCP finish {token} {result}");
320+
if let Some((_, pair)) = self.syscall_wait_table.remove(&token) {
321+
let (lock, cvar) = &*pair;
322+
let mut pending = lock.lock().expect("lock failed");
323+
*pending = Some(result);
324+
cvar.notify_one();
325+
}
260326
unsafe { self.resume(token) };
261327
}
262328
}
263-
Ok(())
329+
if left != left_time {
330+
left_time = Some(left.unwrap_or(Duration::ZERO));
331+
}
332+
Ok(left_time)
264333
}
265334

266335
#[allow(clippy::unused_self)]
@@ -404,7 +473,7 @@ macro_rules! impl_io_uring {
404473
pub(super) fn $syscall(
405474
&self,
406475
$($arg: $arg_type),*
407-
) -> std::io::Result<Arc<(Mutex<Option<ssize_t>>, Condvar)>> {
476+
) -> std::io::Result<Arc<(Mutex<Option<c_longlong>>, Condvar)>> {
408477
let token = EventLoop::token(Syscall::$syscall);
409478
self.operator.$syscall(token, $($arg, )*)?;
410479
let arc = Arc::new((Mutex::new(None), Condvar::new()));
@@ -439,6 +508,29 @@ impl_io_uring!(writev(fd: c_int, iov: *const iovec, iovcnt: c_int) -> ssize_t);
439508
impl_io_uring!(pwritev(fd: c_int, iov: *const iovec, iovcnt: c_int, offset: off_t) -> ssize_t);
440509
impl_io_uring!(sendmsg(fd: c_int, msg: *const msghdr, flags: c_int) -> ssize_t);
441510

511+
macro_rules! impl_iocp {
512+
( $syscall: ident($($arg: ident : $arg_type: ty),*) -> $result: ty ) => {
513+
#[cfg(all(windows, feature = "iocp"))]
514+
impl EventLoop<'_> {
515+
pub(super) fn $syscall(
516+
&self,
517+
$($arg: $arg_type),*
518+
) -> std::io::Result<Arc<(Mutex<Option<c_longlong>>, Condvar)>> {
519+
let token = EventLoop::token(Syscall::$syscall);
520+
self.operator.$syscall(token, $($arg, )*)?;
521+
let arc = Arc::new((Mutex::new(None), Condvar::new()));
522+
assert!(
523+
self.syscall_wait_table.insert(token, arc.clone()).is_none(),
524+
"The previous token was not retrieved in a timely manner"
525+
);
526+
Ok(arc)
527+
}
528+
}
529+
}
530+
}
531+
532+
impl_iocp!(accept(fd: SOCKET, addr: *mut SOCKADDR, len: *mut c_int) -> c_int);
533+
442534
#[cfg(all(test, not(all(unix, feature = "preemptive"))))]
443535
mod tests {
444536
use crate::net::event_loop::EventLoop;

0 commit comments

Comments
 (0)