diff --git a/.github/workflows/ci-preemptive.sh b/.github/workflows/ci-preemptive.sh index 1a244eaf..d8172791 100644 --- a/.github/workflows/ci-preemptive.sh +++ b/.github/workflows/ci-preemptive.sh @@ -34,3 +34,13 @@ if [ "${TARGET}" = "x86_64-unknown-linux-gnu" ]; then "${CARGO}" test --target "${TARGET}" --no-default-features --features io_uring,preemptive "${CARGO}" test --target "${TARGET}" --no-default-features --features io_uring,preemptive --release fi + +# test IOCP +if [ "${OS}" = "windows-latest" ]; then + cd "${PROJECT_DIR}"/core + "${CARGO}" test --target "${TARGET}" --no-default-features --features iocp,preemptive + "${CARGO}" test --target "${TARGET}" --no-default-features --features iocp,preemptive --release + cd "${PROJECT_DIR}"/open-coroutine + "${CARGO}" test --target "${TARGET}" --no-default-features --features iocp,preemptive + "${CARGO}" test --target "${TARGET}" --no-default-features --features iocp,preemptive --release +fi diff --git a/.github/workflows/ci.sh b/.github/workflows/ci.sh index 47b5a23c..e97641eb 100644 --- a/.github/workflows/ci.sh +++ b/.github/workflows/ci.sh @@ -34,3 +34,13 @@ if [ "${TARGET}" = "x86_64-unknown-linux-gnu" ]; then "${CARGO}" test --target "${TARGET}" --no-default-features --features io_uring "${CARGO}" test --target "${TARGET}" --no-default-features --features io_uring --release fi + +# test IOCP +if [ "${OS}" = "windows-latest" ]; then + cd "${PROJECT_DIR}"/core + "${CARGO}" test --target "${TARGET}" --no-default-features --features iocp + "${CARGO}" test --target "${TARGET}" --no-default-features --features iocp --release + cd "${PROJECT_DIR}"/open-coroutine + "${CARGO}" test --target "${TARGET}" --no-default-features --features iocp + "${CARGO}" test --target "${TARGET}" --no-default-features --features iocp --release +fi diff --git a/Cargo.toml b/Cargo.toml index d4efe6ff..3825573e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -28,6 +28,7 @@ mio = { version = "1.0", default-features = false } cfg-if = "1.0.0" polling = "2.8.0" +educe = "0.6.0" libc = "0.2" rand = "0.8" @@ -50,7 +51,6 @@ once_cell = "1" dashmap = "6" num_cpus = "1" uuid = "1" -derivative = "2" tempfile = "3" cc = "1" syn = "2" diff --git a/core/Cargo.toml b/core/Cargo.toml index 6f8b255b..ff6c27a0 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -29,7 +29,7 @@ uuid = { workspace = true, features = [ "v4", "fast-rng", ], optional = true } -derivative = { workspace = true, optional = true } +educe = { workspace = true, optional = true } core_affinity = { workspace = true, optional = true } crossbeam-utils = { workspace = true, optional = true } psm.workspace = true @@ -77,7 +77,7 @@ backtrace.workspace = true log = ["tracing", "tracing-subscriber", "time"] # low-level raw coroutine -korosensei = ["corosensei", "uuid", "nix/pthread", "derivative"] +korosensei = ["corosensei", "uuid", "nix/pthread", "educe"] # Provide preemptive scheduling implementation. # Enable for default. @@ -89,6 +89,12 @@ net = ["korosensei", "polling", "mio", "crossbeam-utils", "core_affinity"] # Provide io_uring adaptation, this feature only works in linux. io_uring = ["net", "io-uring"] +# Provide IOCP adaptation, this feature only works in windows. +iocp = ["net"] + +# Provide completion IOCP adaptation +completion_io = ["io_uring", "iocp"] + # Provide syscall implementation. syscall = ["net"] diff --git a/core/src/co_pool/task.rs b/core/src/co_pool/task.rs index 0553c749..ef374143 100644 --- a/core/src/co_pool/task.rs +++ b/core/src/co_pool/task.rs @@ -1,16 +1,15 @@ use crate::catch; -use derivative::Derivative; /// 做C兼容时会用到 pub type UserTaskFunc = extern "C" fn(usize) -> usize; /// The task impls. #[repr(C)] -#[derive(Derivative)] -#[derivative(Debug)] +#[derive(educe::Educe)] +#[educe(Debug)] pub struct Task<'t> { name: String, - #[derivative(Debug = "ignore")] + #[educe(Debug(ignore))] func: Box) -> Option + 't>, param: Option, } diff --git a/core/src/common/timer.rs b/core/src/common/timer.rs index e912b830..d3cbec88 100644 --- a/core/src/common/timer.rs +++ b/core/src/common/timer.rs @@ -1,5 +1,4 @@ use crate::impl_display_by_debug; -use derivative::Derivative; use std::collections::{BTreeMap, VecDeque}; use std::ops::{Deref, DerefMut}; use std::sync::atomic::{AtomicUsize, Ordering}; @@ -62,11 +61,11 @@ impl_display_by_debug!(TimerEntry); /// A queue for managing multiple `TimerEntry`. #[repr(C)] -#[derive(Derivative)] -#[derivative(Debug, Eq, PartialEq)] +#[derive(educe::Educe)] +#[educe(Debug, Eq, PartialEq)] pub struct TimerList { inner: BTreeMap>, - #[derivative(PartialEq = "ignore")] + #[educe(PartialEq(ignore))] total: AtomicUsize, } diff --git a/core/src/coroutine/suspender.rs b/core/src/coroutine/suspender.rs index 252ba982..4698db31 100644 --- a/core/src/coroutine/suspender.rs +++ b/core/src/coroutine/suspender.rs @@ -5,6 +5,7 @@ use std::collections::VecDeque; use std::time::Duration; thread_local! { + #[allow(clippy::missing_const_for_thread_local)] static TIMESTAMP: RefCell> = const { RefCell::new(VecDeque::new()) }; } @@ -52,14 +53,13 @@ pub use korosensei::Suspender; #[cfg(feature = "korosensei")] mod korosensei { use corosensei::Yielder; - use derivative::Derivative; /// Ths suspender implemented for coroutine. #[repr(C)] - #[derive(Derivative)] - #[derivative(Debug)] + #[derive(educe::Educe)] + #[educe(Debug)] pub struct Suspender<'s, Param, Yield> { - #[derivative(Debug = "ignore")] + #[educe(Debug(ignore))] inner: &'s Yielder, } diff --git a/core/src/net/event_loop.rs b/core/src/net/event_loop.rs index 98db60e7..3638862a 100644 --- a/core/src/net/event_loop.rs +++ b/core/src/net/event_loop.rs @@ -6,6 +6,8 @@ use crate::scheduler::SchedulableCoroutine; use crate::{error, impl_current_for, impl_display_by_debug, info}; use crossbeam_utils::atomic::AtomicCell; use dashmap::DashSet; +#[cfg(all(target_os = "linux", feature = "io_uring"))] +use libc::{epoll_event, iovec, msghdr, off_t, size_t, sockaddr, socklen_t, ssize_t}; use once_cell::sync::Lazy; use rand::Rng; use std::ffi::{c_char, c_int, c_void, CStr, CString}; @@ -16,11 +18,15 @@ use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::{Arc, Condvar, Mutex}; use std::thread::JoinHandle; use std::time::Duration; +#[cfg(all(windows, feature = "iocp"))] +use windows_sys::Win32::Networking::WinSock::{ + setsockopt, SOCKADDR, SOCKET, SOL_SOCKET, SO_UPDATE_ACCEPT_CONTEXT, +}; cfg_if::cfg_if! { - if #[cfg(all(target_os = "linux", feature = "io_uring"))] { - use libc::{epoll_event, iovec, msghdr, off_t, size_t, sockaddr, socklen_t, ssize_t}; + if #[cfg(any(all(target_os = "linux", feature = "io_uring"), all(windows, feature = "iocp")))] { use dashmap::DashMap; + use std::ffi::c_longlong; } } @@ -32,11 +38,17 @@ pub(crate) struct EventLoop<'e> { stop: Arc<(Mutex, Condvar)>, shared_stop: Arc<(Mutex, Condvar)>, cpu: usize, - #[cfg(all(target_os = "linux", feature = "io_uring"))] + #[cfg(any( + all(target_os = "linux", feature = "io_uring"), + all(windows, feature = "iocp") + ))] operator: crate::net::operator::Operator<'e>, #[allow(clippy::type_complexity)] - #[cfg(all(target_os = "linux", feature = "io_uring"))] - syscall_wait_table: DashMap>, Condvar)>>, + #[cfg(any( + all(target_os = "linux", feature = "io_uring"), + all(windows, feature = "iocp") + ))] + syscall_wait_table: DashMap>, Condvar)>>, selector: Poller, pool: CoroutinePool<'e>, phantom_data: PhantomData<&'e EventLoop<'e>>, @@ -90,9 +102,15 @@ impl<'e> EventLoop<'e> { stop: Arc::new((Mutex::new(false), Condvar::new())), shared_stop, cpu, - #[cfg(all(target_os = "linux", feature = "io_uring"))] + #[cfg(any( + all(target_os = "linux", feature = "io_uring"), + all(windows, feature = "iocp") + ))] operator: crate::net::operator::Operator::new(cpu)?, - #[cfg(all(target_os = "linux", feature = "io_uring"))] + #[cfg(any( + all(target_os = "linux", feature = "io_uring"), + all(windows, feature = "iocp") + ))] syscall_wait_table: DashMap::new(), selector: Poller::new()?, pool: CoroutinePool::new(name, stack_size, min_size, max_size, keep_alive_time), @@ -223,7 +241,29 @@ impl<'e> EventLoop<'e> { } } - #[cfg(all(target_os = "linux", feature = "io_uring"))] + cfg_if::cfg_if! { + if #[cfg(all(target_os = "linux", feature = "io_uring"))] { + left_time = self.adapt_io_uring(left_time)?; + } else if #[cfg(all(windows, feature = "iocp"))] { + left_time = self.adapt_iocp(left_time)?; + } + } + + // use epoll/kevent/iocp + let mut events = Events::with_capacity(1024); + self.selector.select(&mut events, left_time)?; + #[allow(clippy::explicit_iter_loop)] + for event in events.iter() { + let token = event.get_token(); + if event.readable() || event.writable() { + unsafe { self.resume(token) }; + } + } + Ok(()) + } + + #[cfg(all(target_os = "linux", feature = "io_uring"))] + fn adapt_io_uring(&self, mut left_time: Option) -> std::io::Result> { if crate::net::operator::support_io_uring() { // use io_uring let (count, mut cq, left) = self.operator.select(left_time, 0)?; @@ -239,7 +279,7 @@ impl<'e> EventLoop<'e> { if let Some((_, pair)) = self.syscall_wait_table.remove(&token) { let (lock, cvar) = &*pair; let mut pending = lock.lock().expect("lock failed"); - *pending = Some(result); + *pending = Some(result.try_into().expect("result overflow")); cvar.notify_one(); } unsafe { self.resume(token) }; @@ -249,18 +289,47 @@ impl<'e> EventLoop<'e> { left_time = Some(left.unwrap_or(Duration::ZERO)); } } + Ok(left_time) + } - // use epoll/kevent/iocp - let mut events = Events::with_capacity(1024); - self.selector.select(&mut events, left_time)?; - #[allow(clippy::explicit_iter_loop)] - for event in events.iter() { - let token = event.get_token(); - if event.readable() || event.writable() { + #[cfg(all(windows, feature = "iocp"))] + fn adapt_iocp(&self, mut left_time: Option) -> std::io::Result> { + // use IOCP + let (count, mut cq, left) = self.operator.select(left_time, 0)?; + if count > 0 { + for cqe in &mut cq { + let token = cqe.token; + // resolve completed read/write tasks + // todo refactor IOCP impl + let result = match cqe.syscall { + Syscall::accept => { + unsafe { + _ = setsockopt( + cqe.socket, + SOL_SOCKET, + SO_UPDATE_ACCEPT_CONTEXT, + (&cqe.from_fd as *const SOCKET).cast(), + size_of::() as c_int, + ) + }; + cqe.socket.try_into().expect("result overflow") + } + _ => panic!("unsupported"), + }; + eprintln!("IOCP finish {token} {result}"); + if let Some((_, pair)) = self.syscall_wait_table.remove(&token) { + let (lock, cvar) = &*pair; + let mut pending = lock.lock().expect("lock failed"); + *pending = Some(result); + cvar.notify_one(); + } unsafe { self.resume(token) }; } } - Ok(()) + if left != left_time { + left_time = Some(left.unwrap_or(Duration::ZERO)); + } + Ok(left_time) } #[allow(clippy::unused_self)] @@ -404,7 +473,7 @@ macro_rules! impl_io_uring { pub(super) fn $syscall( &self, $($arg: $arg_type),* - ) -> std::io::Result>, Condvar)>> { + ) -> std::io::Result>, Condvar)>> { let token = EventLoop::token(Syscall::$syscall); self.operator.$syscall(token, $($arg, )*)?; let arc = Arc::new((Mutex::new(None), Condvar::new())); @@ -439,6 +508,29 @@ impl_io_uring!(writev(fd: c_int, iov: *const iovec, iovcnt: c_int) -> ssize_t); impl_io_uring!(pwritev(fd: c_int, iov: *const iovec, iovcnt: c_int, offset: off_t) -> ssize_t); impl_io_uring!(sendmsg(fd: c_int, msg: *const msghdr, flags: c_int) -> ssize_t); +macro_rules! impl_iocp { + ( $syscall: ident($($arg: ident : $arg_type: ty),*) -> $result: ty ) => { + #[cfg(all(windows, feature = "iocp"))] + impl EventLoop<'_> { + pub(super) fn $syscall( + &self, + $($arg: $arg_type),* + ) -> std::io::Result>, Condvar)>> { + let token = EventLoop::token(Syscall::$syscall); + self.operator.$syscall(token, $($arg, )*)?; + let arc = Arc::new((Mutex::new(None), Condvar::new())); + assert!( + self.syscall_wait_table.insert(token, arc.clone()).is_none(), + "The previous token was not retrieved in a timely manner" + ); + Ok(arc) + } + } + } +} + +impl_iocp!(accept(fd: SOCKET, addr: *mut SOCKADDR, len: *mut c_int) -> c_int); + #[cfg(all(test, not(all(unix, feature = "preemptive"))))] mod tests { use crate::net::event_loop::EventLoop; diff --git a/core/src/net/mod.rs b/core/src/net/mod.rs index c8b73e69..380ec64e 100644 --- a/core/src/net/mod.rs +++ b/core/src/net/mod.rs @@ -6,17 +6,24 @@ use crate::{error, info}; use once_cell::sync::OnceCell; use std::collections::VecDeque; use std::ffi::c_int; +#[cfg(any( + all(target_os = "linux", feature = "io_uring"), + all(windows, feature = "iocp") +))] +use std::ffi::c_longlong; use std::io::{Error, ErrorKind}; use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::{Arc, Condvar, Mutex}; use std::time::Duration; +#[cfg(all(windows, feature = "iocp"))] +use windows_sys::Win32::Networking::WinSock::{SOCKADDR, SOCKET}; /// 做C兼容时会用到 pub type UserFunc = extern "C" fn(usize) -> usize; cfg_if::cfg_if! { if #[cfg(all(target_os = "linux", feature = "io_uring"))] { - use libc::{epoll_event, iovec, msghdr, off_t, size_t, sockaddr, socklen_t, ssize_t}; + use libc::{epoll_event, iovec, msghdr, off_t, size_t, sockaddr, socklen_t}; use std::ffi::c_void; } } @@ -24,8 +31,11 @@ cfg_if::cfg_if! { mod selector; #[allow(clippy::cast_possible_truncation, clippy::cast_sign_loss)] -#[cfg(all(target_os = "linux", feature = "io_uring"))] -mod operator; +#[cfg(any( + all(target_os = "linux", feature = "io_uring"), + all(windows, feature = "iocp") +))] +pub(crate) mod operator; #[allow(missing_docs)] pub mod event_loop; @@ -247,7 +257,7 @@ macro_rules! impl_io_uring { #[allow(missing_docs)] pub fn $syscall( $($arg: $arg_type),* - ) -> std::io::Result>, Condvar)>> { + ) -> std::io::Result>, Condvar)>> { Self::event_loop().$syscall($($arg, )*) } } @@ -274,3 +284,19 @@ impl_io_uring!(pwrite(fd: c_int, buf: *const c_void, count: size_t, offset: off_ impl_io_uring!(writev(fd: c_int, iov: *const iovec, iovcnt: c_int) -> ssize_t); impl_io_uring!(pwritev(fd: c_int, iov: *const iovec, iovcnt: c_int, offset: off_t) -> ssize_t); impl_io_uring!(sendmsg(fd: c_int, msg: *const msghdr, flags: c_int) -> ssize_t); + +macro_rules! impl_iocp { + ( $syscall: ident($($arg: ident : $arg_type: ty),*) -> $result: ty ) => { + #[cfg(all(windows, feature = "iocp"))] + impl EventLoops { + #[allow(missing_docs)] + pub fn $syscall( + $($arg: $arg_type),* + ) -> std::io::Result>, Condvar)>> { + Self::event_loop().$syscall($($arg, )*) + } + } + } +} + +impl_iocp!(accept(fd: SOCKET, addr: *mut SOCKADDR, len: *mut c_int) -> c_int); diff --git a/core/src/net/operator/linux/mod.rs b/core/src/net/operator/linux/mod.rs new file mode 100644 index 00000000..94d2bfd3 --- /dev/null +++ b/core/src/net/operator/linux/mod.rs @@ -0,0 +1,690 @@ +use io_uring::opcode::{ + Accept, AsyncCancel, Close, Connect, EpollCtl, Fsync, MkDirAt, OpenAt, PollAdd, PollRemove, + Read, Readv, Recv, RecvMsg, RenameAt, Send, SendMsg, SendZc, Shutdown, Socket, Timeout, + TimeoutRemove, TimeoutUpdate, Write, Writev, +}; +use io_uring::squeue::Entry; +use io_uring::types::{epoll_event, Fd, Timespec}; +use io_uring::{CompletionQueue, IoUring, Probe}; +use libc::{ + c_char, c_int, c_uint, c_void, iovec, mode_t, msghdr, off_t, size_t, sockaddr, socklen_t, EBUSY, +}; +use once_cell::sync::Lazy; +use std::collections::VecDeque; +use std::io::{Error, ErrorKind}; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::Mutex; +use std::time::{Duration, Instant}; + +#[cfg(test)] +mod tests; + +static SUPPORT: Lazy = + Lazy::new(|| crate::common::current_kernel_version() >= crate::common::kernel_version(5, 6, 0)); + +#[must_use] +pub(crate) fn support_io_uring() -> bool { + *SUPPORT +} + +static PROBE: Lazy = Lazy::new(|| { + let mut probe = Probe::new(); + if let Ok(io_uring) = IoUring::new(2) { + if let Ok(()) = io_uring.submitter().register_probe(&mut probe) { + return probe; + } + } + panic!("probe init failed !") +}); + +// check https://www.rustwiki.org.cn/en/reference/introduction.html for help information +macro_rules! support { + ( $self:ident, $struct_name:ident, $opcode:ident, $impls:expr ) => { + return { + static $struct_name: Lazy = once_cell::sync::Lazy::new(|| { + if $crate::net::operator::support_io_uring() { + return PROBE.is_supported($opcode::CODE); + } + false + }); + if *$struct_name { + return $self.push_sq($impls); + } + Err(Error::new(ErrorKind::Unsupported, "unsupported")) + } + }; +} + +#[repr(C)] +#[derive(educe::Educe)] +#[educe(Debug)] +pub(crate) struct Operator<'o> { + #[educe(Debug(ignore))] + inner: IoUring, + entering: AtomicBool, + backlog: Mutex>, +} + +impl Operator<'_> { + pub(crate) fn new(cpu: usize) -> std::io::Result { + IoUring::builder() + .setup_sqpoll(1000) + .setup_sqpoll_cpu(u32::try_from(cpu).unwrap_or(u32::MAX)) + .build(1024) + .map(|inner| Self { + inner, + entering: AtomicBool::new(false), + backlog: Mutex::new(VecDeque::new()), + }) + } + + fn push_sq(&self, entry: Entry) -> std::io::Result<()> { + let entry = Box::leak(Box::new(entry)); + if unsafe { self.inner.submission_shared().push(entry).is_err() } { + self.backlog + .lock() + .expect("backlog lock failed") + .push_back(entry); + } + self.inner.submit().map(|_| ()) + } + + pub(crate) fn select( + &self, + timeout: Option, + want: usize, + ) -> std::io::Result<(usize, CompletionQueue, Option)> { + if support_io_uring() { + if self + .entering + .compare_exchange(false, true, Ordering::Acquire, Ordering::Relaxed) + .is_err() + { + return Ok((0, unsafe { self.inner.completion_shared() }, timeout)); + } + let result = self.do_select(timeout, want); + self.entering.store(false, Ordering::Release); + return result; + } + Err(Error::new(ErrorKind::Unsupported, "unsupported")) + } + + fn do_select( + &self, + timeout: Option, + want: usize, + ) -> std::io::Result<(usize, CompletionQueue, Option)> { + let start_time = Instant::now(); + self.timeout_add(crate::common::constants::IO_URING_TIMEOUT_USERDATA, timeout)?; + let mut cq = unsafe { self.inner.completion_shared() }; + // when submit queue is empty, submit_and_wait will block + let count = match self.inner.submit_and_wait(want) { + Ok(count) => count, + Err(err) => { + if err.raw_os_error() == Some(EBUSY) { + 0 + } else { + return Err(err); + } + } + }; + cq.sync(); + + // clean backlog + let mut sq = unsafe { self.inner.submission_shared() }; + loop { + if sq.is_full() { + match self.inner.submit() { + Ok(_) => (), + Err(err) => { + if err.raw_os_error() == Some(EBUSY) { + break; + } + return Err(err); + } + } + } + sq.sync(); + + let mut backlog = self.backlog.lock().expect("backlog lock failed"); + match backlog.pop_front() { + Some(sqe) => { + if unsafe { sq.push(sqe).is_err() } { + backlog.push_front(sqe); + break; + } + } + None => break, + } + } + let cost = Instant::now().saturating_duration_since(start_time); + Ok((count, cq, timeout.map(|t| t.saturating_sub(cost)))) + } + + pub(crate) fn async_cancel(&self, user_data: usize) -> std::io::Result<()> { + support!( + self, + SUPPORT_ASYNC_CANCEL, + AsyncCancel, + AsyncCancel::new(user_data as u64) + .build() + .user_data(user_data as u64) + ) + } + + pub(crate) fn epoll_ctl( + &self, + user_data: usize, + epfd: c_int, + op: c_int, + fd: c_int, + event: *mut libc::epoll_event, + ) -> std::io::Result<()> { + support!( + self, + SUPPORT_EPOLL_CTL, + EpollCtl, + EpollCtl::new( + Fd(epfd), + Fd(fd), + op, + event.cast_const().cast::(), + ) + .build() + .user_data(user_data as u64) + ) + } + + pub(crate) fn poll_add( + &self, + user_data: usize, + fd: c_int, + flags: c_int, + ) -> std::io::Result<()> { + support!( + self, + SUPPORT_POLL_ADD, + PollAdd, + PollAdd::new(Fd(fd), flags as u32) + .build() + .user_data(user_data as u64) + ) + } + + pub(crate) fn poll_remove(&self, user_data: usize) -> std::io::Result<()> { + support!( + self, + SUPPORT_POLL_REMOVE, + PollRemove, + PollRemove::new(user_data as u64) + .build() + .user_data(user_data as u64) + ) + } + + pub(crate) fn timeout_add( + &self, + user_data: usize, + timeout: Option, + ) -> std::io::Result<()> { + if let Some(duration) = timeout { + let timeout = Timespec::new() + .sec(duration.as_secs()) + .nsec(duration.subsec_nanos()); + support!( + self, + SUPPORT_TIMEOUT_ADD, + Timeout, + Timeout::new(&timeout).build().user_data(user_data as u64) + ) + } + Ok(()) + } + + pub(crate) fn timeout_update( + &self, + user_data: usize, + timeout: Option, + ) -> std::io::Result<()> { + if let Some(duration) = timeout { + let timeout = Timespec::new() + .sec(duration.as_secs()) + .nsec(duration.subsec_nanos()); + support!( + self, + SUPPORT_TIMEOUT_UPDATE, + TimeoutUpdate, + TimeoutUpdate::new(user_data as u64, &timeout) + .build() + .user_data(user_data as u64) + ) + } + self.timeout_remove(user_data) + } + + pub(crate) fn timeout_remove(&self, user_data: usize) -> std::io::Result<()> { + support!( + self, + SUPPORT_TIMEOUT_REMOVE, + TimeoutRemove, + TimeoutRemove::new(user_data as u64).build() + ) + } + + pub(crate) fn openat( + &self, + user_data: usize, + dir_fd: c_int, + pathname: *const c_char, + flags: c_int, + mode: mode_t, + ) -> std::io::Result<()> { + support!( + self, + SUPPORT_OPENAT, + OpenAt, + OpenAt::new(Fd(dir_fd), pathname) + .flags(flags) + .mode(mode) + .build() + .user_data(user_data as u64) + ) + } + + pub(crate) fn mkdirat( + &self, + user_data: usize, + dir_fd: c_int, + pathname: *const c_char, + mode: mode_t, + ) -> std::io::Result<()> { + support!( + self, + SUPPORT_MK_DIR_AT, + MkDirAt, + MkDirAt::new(Fd(dir_fd), pathname) + .mode(mode) + .build() + .user_data(user_data as u64) + ) + } + + pub(crate) fn renameat( + &self, + user_data: usize, + old_dir_fd: c_int, + old_path: *const c_char, + new_dir_fd: c_int, + new_path: *const c_char, + ) -> std::io::Result<()> { + support!( + self, + SUPPORT_RENAME_AT, + RenameAt, + RenameAt::new(Fd(old_dir_fd), old_path, Fd(new_dir_fd), new_path) + .build() + .user_data(user_data as u64) + ) + } + + pub(crate) fn renameat2( + &self, + user_data: usize, + old_dir_fd: c_int, + old_path: *const c_char, + new_dir_fd: c_int, + new_path: *const c_char, + flags: c_uint, + ) -> std::io::Result<()> { + support!( + self, + SUPPORT_RENAME_AT, + RenameAt, + RenameAt::new(Fd(old_dir_fd), old_path, Fd(new_dir_fd), new_path) + .flags(flags) + .build() + .user_data(user_data as u64) + ) + } + + pub(crate) fn fsync(&self, user_data: usize, fd: c_int) -> std::io::Result<()> { + support!( + self, + SUPPORT_FSYNC, + Fsync, + Fsync::new(Fd(fd)).build().user_data(user_data as u64) + ) + } + + pub(crate) fn socket( + &self, + user_data: usize, + domain: c_int, + ty: c_int, + protocol: c_int, + ) -> std::io::Result<()> { + support!( + self, + SUPPORT_SOCKET, + Socket, + Socket::new(domain, ty, protocol) + .build() + .user_data(user_data as u64) + ) + } + + pub(crate) fn accept( + &self, + user_data: usize, + fd: c_int, + address: *mut sockaddr, + address_len: *mut socklen_t, + ) -> std::io::Result<()> { + support!( + self, + SUPPORT_ACCEPT, + Accept, + Accept::new(Fd(fd), address, address_len) + .build() + .user_data(user_data as u64) + ) + } + + pub(crate) fn accept4( + &self, + user_data: usize, + fd: c_int, + addr: *mut sockaddr, + len: *mut socklen_t, + flg: c_int, + ) -> std::io::Result<()> { + support!( + self, + SUPPORT_ACCEPT, + Accept, + Accept::new(Fd(fd), addr, len) + .flags(flg) + .build() + .user_data(user_data as u64) + ) + } + + pub(crate) fn connect( + &self, + user_data: usize, + fd: c_int, + address: *const sockaddr, + len: socklen_t, + ) -> std::io::Result<()> { + support!( + self, + SUPPORT_CONNECT, + Connect, + Connect::new(Fd(fd), address, len) + .build() + .user_data(user_data as u64) + ) + } + + pub(crate) fn shutdown(&self, user_data: usize, fd: c_int, how: c_int) -> std::io::Result<()> { + support!( + self, + SUPPORT_SHUTDOWN, + Shutdown, + Shutdown::new(Fd(fd), how) + .build() + .user_data(user_data as u64) + ) + } + + pub(crate) fn close(&self, user_data: usize, fd: c_int) -> std::io::Result<()> { + support!( + self, + SUPPORT_CLOSE, + Close, + Close::new(Fd(fd)).build().user_data(user_data as u64) + ) + } + + pub(crate) fn recv( + &self, + user_data: usize, + fd: c_int, + buf: *mut c_void, + len: size_t, + flags: c_int, + ) -> std::io::Result<()> { + support!( + self, + SUPPORT_RECV, + Recv, + Recv::new(Fd(fd), buf.cast::(), len as u32) + .flags(flags) + .build() + .user_data(user_data as u64) + ) + } + + pub(crate) fn read( + &self, + user_data: usize, + fd: c_int, + buf: *mut c_void, + count: size_t, + ) -> std::io::Result<()> { + support!( + self, + SUPPORT_READ, + Read, + Read::new(Fd(fd), buf.cast::(), count as u32) + .build() + .user_data(user_data as u64) + ) + } + + pub(crate) fn pread( + &self, + user_data: usize, + fd: c_int, + buf: *mut c_void, + count: size_t, + offset: off_t, + ) -> std::io::Result<()> { + support!( + self, + SUPPORT_READ, + Read, + Read::new(Fd(fd), buf.cast::(), count as u32) + .offset(offset as u64) + .build() + .user_data(user_data as u64) + ) + } + + pub(crate) fn readv( + &self, + user_data: usize, + fd: c_int, + iov: *const iovec, + iovcnt: c_int, + ) -> std::io::Result<()> { + support!( + self, + SUPPORT_READV, + Readv, + Readv::new(Fd(fd), iov, iovcnt as u32) + .build() + .user_data(user_data as u64) + ) + } + + pub(crate) fn preadv( + &self, + user_data: usize, + fd: c_int, + iov: *const iovec, + iovcnt: c_int, + offset: off_t, + ) -> std::io::Result<()> { + support!( + self, + SUPPORT_READV, + Readv, + Readv::new(Fd(fd), iov, iovcnt as u32) + .offset(offset as u64) + .build() + .user_data(user_data as u64) + ) + } + + pub(crate) fn recvmsg( + &self, + user_data: usize, + fd: c_int, + msg: *mut msghdr, + flags: c_int, + ) -> std::io::Result<()> { + support!( + self, + SUPPORT_RECVMSG, + RecvMsg, + RecvMsg::new(Fd(fd), msg) + .flags(flags as u32) + .build() + .user_data(user_data as u64) + ) + } + + pub(crate) fn send( + &self, + user_data: usize, + fd: c_int, + buf: *const c_void, + len: size_t, + flags: c_int, + ) -> std::io::Result<()> { + support!( + self, + SUPPORT_SEND, + Send, + Send::new(Fd(fd), buf.cast::(), len as u32) + .flags(flags) + .build() + .user_data(user_data as u64) + ) + } + + #[allow(clippy::too_many_arguments)] + pub(crate) fn sendto( + &self, + user_data: usize, + fd: c_int, + buf: *const c_void, + len: size_t, + flags: c_int, + addr: *const sockaddr, + addrlen: socklen_t, + ) -> std::io::Result<()> { + support!( + self, + SUPPORT_SEND_ZC, + SendZc, + SendZc::new(Fd(fd), buf.cast::(), len as u32) + .flags(flags) + .dest_addr(addr) + .dest_addr_len(addrlen) + .build() + .user_data(user_data as u64) + ) + } + + pub(crate) fn write( + &self, + user_data: usize, + fd: c_int, + buf: *const c_void, + count: size_t, + ) -> std::io::Result<()> { + support!( + self, + SUPPORT_WRITE, + Write, + Write::new(Fd(fd), buf.cast::(), count as u32) + .build() + .user_data(user_data as u64) + ) + } + + pub(crate) fn pwrite( + &self, + user_data: usize, + fd: c_int, + buf: *const c_void, + count: size_t, + offset: off_t, + ) -> std::io::Result<()> { + support!( + self, + SUPPORT_WRITE, + Write, + Write::new(Fd(fd), buf.cast::(), count as u32) + .offset(offset as u64) + .build() + .user_data(user_data as u64) + ) + } + + pub(crate) fn writev( + &self, + user_data: usize, + fd: c_int, + iov: *const iovec, + iovcnt: c_int, + ) -> std::io::Result<()> { + support!( + self, + SUPPORT_WRITEV, + Writev, + Writev::new(Fd(fd), iov, iovcnt as u32) + .build() + .user_data(user_data as u64) + ) + } + + pub(crate) fn pwritev( + &self, + user_data: usize, + fd: c_int, + iov: *const iovec, + iovcnt: c_int, + offset: off_t, + ) -> std::io::Result<()> { + support!( + self, + SUPPORT_WRITEV, + Writev, + Writev::new(Fd(fd), iov, iovcnt as u32) + .offset(offset as u64) + .build() + .user_data(user_data as u64) + ) + } + + pub(crate) fn sendmsg( + &self, + user_data: usize, + fd: c_int, + msg: *const msghdr, + flags: c_int, + ) -> std::io::Result<()> { + support!( + self, + SUPPORT_SENDMSG, + SendMsg, + SendMsg::new(Fd(fd), msg) + .flags(flags as u32) + .build() + .user_data(user_data as u64) + ) + } +} diff --git a/core/src/net/operator/tests.rs b/core/src/net/operator/linux/tests.rs similarity index 100% rename from core/src/net/operator/tests.rs rename to core/src/net/operator/linux/tests.rs diff --git a/core/src/net/operator/mod.rs b/core/src/net/operator/mod.rs index b6dc89e4..dc2cb818 100644 --- a/core/src/net/operator/mod.rs +++ b/core/src/net/operator/mod.rs @@ -1,691 +1,9 @@ -use derivative::Derivative; -use io_uring::opcode::{ - Accept, AsyncCancel, Close, Connect, EpollCtl, Fsync, MkDirAt, OpenAt, PollAdd, PollRemove, - Read, Readv, Recv, RecvMsg, RenameAt, Send, SendMsg, SendZc, Shutdown, Socket, Timeout, - TimeoutRemove, TimeoutUpdate, Write, Writev, -}; -use io_uring::squeue::Entry; -use io_uring::types::{epoll_event, Fd, Timespec}; -use io_uring::{CompletionQueue, IoUring, Probe}; -use libc::{ - c_char, c_int, c_uint, c_void, iovec, mode_t, msghdr, off_t, size_t, sockaddr, socklen_t, EBUSY, -}; -use once_cell::sync::Lazy; -use std::collections::VecDeque; -use std::io::{Error, ErrorKind}; -use std::sync::atomic::{AtomicBool, Ordering}; -use std::sync::Mutex; -use std::time::{Duration, Instant}; - -#[cfg(test)] -mod tests; - -static SUPPORT: Lazy = - Lazy::new(|| crate::common::current_kernel_version() >= crate::common::kernel_version(5, 6, 0)); - -#[must_use] -pub(crate) fn support_io_uring() -> bool { - *SUPPORT -} - -static PROBE: Lazy = Lazy::new(|| { - let mut probe = Probe::new(); - if let Ok(io_uring) = IoUring::new(2) { - if let Ok(()) = io_uring.submitter().register_probe(&mut probe) { - return probe; - } - } - panic!("probe init failed !") -}); - -// check https://www.rustwiki.org.cn/en/reference/introduction.html for help information -macro_rules! support { - ( $self:ident, $struct_name:ident, $opcode:ident, $impls:expr ) => { - return { - static $struct_name: Lazy = once_cell::sync::Lazy::new(|| { - if $crate::net::operator::support_io_uring() { - return PROBE.is_supported($opcode::CODE); - } - false - }); - if *$struct_name { - return $self.push_sq($impls); - } - Err(Error::new(ErrorKind::Unsupported, "unsupported")) - } - }; -} - -#[repr(C)] -#[derive(Derivative)] -#[derivative(Debug)] -pub(crate) struct Operator<'o> { - #[derivative(Debug = "ignore")] - inner: IoUring, - entering: AtomicBool, - backlog: Mutex>, -} - -impl Operator<'_> { - pub(crate) fn new(cpu: usize) -> std::io::Result { - IoUring::builder() - .setup_sqpoll(1000) - .setup_sqpoll_cpu(u32::try_from(cpu).unwrap_or(u32::MAX)) - .build(1024) - .map(|inner| Operator { - inner, - entering: AtomicBool::new(false), - backlog: Mutex::new(VecDeque::new()), - }) - } - - fn push_sq(&self, entry: Entry) -> std::io::Result<()> { - let entry = Box::leak(Box::new(entry)); - if unsafe { self.inner.submission_shared().push(entry).is_err() } { - self.backlog - .lock() - .expect("backlog lock failed") - .push_back(entry); - } - self.inner.submit().map(|_| ()) - } - - pub(crate) fn select( - &self, - timeout: Option, - want: usize, - ) -> std::io::Result<(usize, CompletionQueue, Option)> { - if support_io_uring() { - if self - .entering - .compare_exchange(false, true, Ordering::Acquire, Ordering::Relaxed) - .is_err() - { - return Ok((0, unsafe { self.inner.completion_shared() }, timeout)); - } - let result = self.do_select(timeout, want); - self.entering.store(false, Ordering::Release); - return result; - } - Err(Error::new(ErrorKind::Unsupported, "unsupported")) - } - - fn do_select( - &self, - timeout: Option, - want: usize, - ) -> std::io::Result<(usize, CompletionQueue, Option)> { - let start_time = Instant::now(); - self.timeout_add(crate::common::constants::IO_URING_TIMEOUT_USERDATA, timeout)?; - let mut cq = unsafe { self.inner.completion_shared() }; - // when submit queue is empty, submit_and_wait will block - let count = match self.inner.submit_and_wait(want) { - Ok(count) => count, - Err(err) => { - if err.raw_os_error() == Some(EBUSY) { - 0 - } else { - return Err(err); - } - } - }; - cq.sync(); - - // clean backlog - let mut sq = unsafe { self.inner.submission_shared() }; - loop { - if sq.is_full() { - match self.inner.submit() { - Ok(_) => (), - Err(err) => { - if err.raw_os_error() == Some(EBUSY) { - break; - } - return Err(err); - } - } - } - sq.sync(); - - let mut backlog = self.backlog.lock().expect("backlog lock failed"); - match backlog.pop_front() { - Some(sqe) => { - if unsafe { sq.push(sqe).is_err() } { - backlog.push_front(sqe); - break; - } - } - None => break, - } - } - let cost = Instant::now().saturating_duration_since(start_time); - Ok((count, cq, timeout.map(|t| t.saturating_sub(cost)))) - } - - pub(crate) fn async_cancel(&self, user_data: usize) -> std::io::Result<()> { - support!( - self, - SUPPORT_ASYNC_CANCEL, - AsyncCancel, - AsyncCancel::new(user_data as u64) - .build() - .user_data(user_data as u64) - ) - } - - pub(crate) fn epoll_ctl( - &self, - user_data: usize, - epfd: c_int, - op: c_int, - fd: c_int, - event: *mut libc::epoll_event, - ) -> std::io::Result<()> { - support!( - self, - SUPPORT_EPOLL_CTL, - EpollCtl, - EpollCtl::new( - Fd(epfd), - Fd(fd), - op, - event.cast_const().cast::(), - ) - .build() - .user_data(user_data as u64) - ) - } - - pub(crate) fn poll_add( - &self, - user_data: usize, - fd: c_int, - flags: c_int, - ) -> std::io::Result<()> { - support!( - self, - SUPPORT_POLL_ADD, - PollAdd, - PollAdd::new(Fd(fd), flags as u32) - .build() - .user_data(user_data as u64) - ) - } - - pub(crate) fn poll_remove(&self, user_data: usize) -> std::io::Result<()> { - support!( - self, - SUPPORT_POLL_REMOVE, - PollRemove, - PollRemove::new(user_data as u64) - .build() - .user_data(user_data as u64) - ) - } - - pub(crate) fn timeout_add( - &self, - user_data: usize, - timeout: Option, - ) -> std::io::Result<()> { - if let Some(duration) = timeout { - let timeout = Timespec::new() - .sec(duration.as_secs()) - .nsec(duration.subsec_nanos()); - support!( - self, - SUPPORT_TIMEOUT_ADD, - Timeout, - Timeout::new(&timeout).build().user_data(user_data as u64) - ) - } - Ok(()) - } - - pub(crate) fn timeout_update( - &self, - user_data: usize, - timeout: Option, - ) -> std::io::Result<()> { - if let Some(duration) = timeout { - let timeout = Timespec::new() - .sec(duration.as_secs()) - .nsec(duration.subsec_nanos()); - support!( - self, - SUPPORT_TIMEOUT_UPDATE, - TimeoutUpdate, - TimeoutUpdate::new(user_data as u64, &timeout) - .build() - .user_data(user_data as u64) - ) - } - self.timeout_remove(user_data) - } - - pub(crate) fn timeout_remove(&self, user_data: usize) -> std::io::Result<()> { - support!( - self, - SUPPORT_TIMEOUT_REMOVE, - TimeoutRemove, - TimeoutRemove::new(user_data as u64).build() - ) - } - - pub(crate) fn openat( - &self, - user_data: usize, - dir_fd: c_int, - pathname: *const c_char, - flags: c_int, - mode: mode_t, - ) -> std::io::Result<()> { - support!( - self, - SUPPORT_OPENAT, - OpenAt, - OpenAt::new(Fd(dir_fd), pathname) - .flags(flags) - .mode(mode) - .build() - .user_data(user_data as u64) - ) - } - - pub(crate) fn mkdirat( - &self, - user_data: usize, - dir_fd: c_int, - pathname: *const c_char, - mode: mode_t, - ) -> std::io::Result<()> { - support!( - self, - SUPPORT_MK_DIR_AT, - MkDirAt, - MkDirAt::new(Fd(dir_fd), pathname) - .mode(mode) - .build() - .user_data(user_data as u64) - ) - } - - pub(crate) fn renameat( - &self, - user_data: usize, - old_dir_fd: c_int, - old_path: *const c_char, - new_dir_fd: c_int, - new_path: *const c_char, - ) -> std::io::Result<()> { - support!( - self, - SUPPORT_RENAME_AT, - RenameAt, - RenameAt::new(Fd(old_dir_fd), old_path, Fd(new_dir_fd), new_path) - .build() - .user_data(user_data as u64) - ) - } - - pub(crate) fn renameat2( - &self, - user_data: usize, - old_dir_fd: c_int, - old_path: *const c_char, - new_dir_fd: c_int, - new_path: *const c_char, - flags: c_uint, - ) -> std::io::Result<()> { - support!( - self, - SUPPORT_RENAME_AT, - RenameAt, - RenameAt::new(Fd(old_dir_fd), old_path, Fd(new_dir_fd), new_path) - .flags(flags) - .build() - .user_data(user_data as u64) - ) - } - - pub(crate) fn fsync(&self, user_data: usize, fd: c_int) -> std::io::Result<()> { - support!( - self, - SUPPORT_FSYNC, - Fsync, - Fsync::new(Fd(fd)).build().user_data(user_data as u64) - ) - } - - pub(crate) fn socket( - &self, - user_data: usize, - domain: c_int, - ty: c_int, - protocol: c_int, - ) -> std::io::Result<()> { - support!( - self, - SUPPORT_SOCKET, - Socket, - Socket::new(domain, ty, protocol) - .build() - .user_data(user_data as u64) - ) - } - - pub(crate) fn accept( - &self, - user_data: usize, - fd: c_int, - address: *mut sockaddr, - address_len: *mut socklen_t, - ) -> std::io::Result<()> { - support!( - self, - SUPPORT_ACCEPT, - Accept, - Accept::new(Fd(fd), address, address_len) - .build() - .user_data(user_data as u64) - ) - } - - pub(crate) fn accept4( - &self, - user_data: usize, - fd: c_int, - addr: *mut sockaddr, - len: *mut socklen_t, - flg: c_int, - ) -> std::io::Result<()> { - support!( - self, - SUPPORT_ACCEPT, - Accept, - Accept::new(Fd(fd), addr, len) - .flags(flg) - .build() - .user_data(user_data as u64) - ) - } - - pub(crate) fn connect( - &self, - user_data: usize, - fd: c_int, - address: *const sockaddr, - len: socklen_t, - ) -> std::io::Result<()> { - support!( - self, - SUPPORT_CONNECT, - Connect, - Connect::new(Fd(fd), address, len) - .build() - .user_data(user_data as u64) - ) - } - - pub(crate) fn shutdown(&self, user_data: usize, fd: c_int, how: c_int) -> std::io::Result<()> { - support!( - self, - SUPPORT_SHUTDOWN, - Shutdown, - Shutdown::new(Fd(fd), how) - .build() - .user_data(user_data as u64) - ) - } - - pub(crate) fn close(&self, user_data: usize, fd: c_int) -> std::io::Result<()> { - support!( - self, - SUPPORT_CLOSE, - Close, - Close::new(Fd(fd)).build().user_data(user_data as u64) - ) - } - - pub(crate) fn recv( - &self, - user_data: usize, - fd: c_int, - buf: *mut c_void, - len: size_t, - flags: c_int, - ) -> std::io::Result<()> { - support!( - self, - SUPPORT_RECV, - Recv, - Recv::new(Fd(fd), buf.cast::(), len as u32) - .flags(flags) - .build() - .user_data(user_data as u64) - ) - } - - pub(crate) fn read( - &self, - user_data: usize, - fd: c_int, - buf: *mut c_void, - count: size_t, - ) -> std::io::Result<()> { - support!( - self, - SUPPORT_READ, - Read, - Read::new(Fd(fd), buf.cast::(), count as u32) - .build() - .user_data(user_data as u64) - ) - } - - pub(crate) fn pread( - &self, - user_data: usize, - fd: c_int, - buf: *mut c_void, - count: size_t, - offset: off_t, - ) -> std::io::Result<()> { - support!( - self, - SUPPORT_READ, - Read, - Read::new(Fd(fd), buf.cast::(), count as u32) - .offset(offset as u64) - .build() - .user_data(user_data as u64) - ) - } - - pub(crate) fn readv( - &self, - user_data: usize, - fd: c_int, - iov: *const iovec, - iovcnt: c_int, - ) -> std::io::Result<()> { - support!( - self, - SUPPORT_READV, - Readv, - Readv::new(Fd(fd), iov, iovcnt as u32) - .build() - .user_data(user_data as u64) - ) - } - - pub(crate) fn preadv( - &self, - user_data: usize, - fd: c_int, - iov: *const iovec, - iovcnt: c_int, - offset: off_t, - ) -> std::io::Result<()> { - support!( - self, - SUPPORT_READV, - Readv, - Readv::new(Fd(fd), iov, iovcnt as u32) - .offset(offset as u64) - .build() - .user_data(user_data as u64) - ) - } - - pub(crate) fn recvmsg( - &self, - user_data: usize, - fd: c_int, - msg: *mut msghdr, - flags: c_int, - ) -> std::io::Result<()> { - support!( - self, - SUPPORT_RECVMSG, - RecvMsg, - RecvMsg::new(Fd(fd), msg) - .flags(flags as u32) - .build() - .user_data(user_data as u64) - ) - } - - pub(crate) fn send( - &self, - user_data: usize, - fd: c_int, - buf: *const c_void, - len: size_t, - flags: c_int, - ) -> std::io::Result<()> { - support!( - self, - SUPPORT_SEND, - Send, - Send::new(Fd(fd), buf.cast::(), len as u32) - .flags(flags) - .build() - .user_data(user_data as u64) - ) - } - - #[allow(clippy::too_many_arguments)] - pub(crate) fn sendto( - &self, - user_data: usize, - fd: c_int, - buf: *const c_void, - len: size_t, - flags: c_int, - addr: *const sockaddr, - addrlen: socklen_t, - ) -> std::io::Result<()> { - support!( - self, - SUPPORT_SEND_ZC, - SendZc, - SendZc::new(Fd(fd), buf.cast::(), len as u32) - .flags(flags) - .dest_addr(addr) - .dest_addr_len(addrlen) - .build() - .user_data(user_data as u64) - ) - } - - pub(crate) fn write( - &self, - user_data: usize, - fd: c_int, - buf: *const c_void, - count: size_t, - ) -> std::io::Result<()> { - support!( - self, - SUPPORT_WRITE, - Write, - Write::new(Fd(fd), buf.cast::(), count as u32) - .build() - .user_data(user_data as u64) - ) - } - - pub(crate) fn pwrite( - &self, - user_data: usize, - fd: c_int, - buf: *const c_void, - count: size_t, - offset: off_t, - ) -> std::io::Result<()> { - support!( - self, - SUPPORT_WRITE, - Write, - Write::new(Fd(fd), buf.cast::(), count as u32) - .offset(offset as u64) - .build() - .user_data(user_data as u64) - ) - } - - pub(crate) fn writev( - &self, - user_data: usize, - fd: c_int, - iov: *const iovec, - iovcnt: c_int, - ) -> std::io::Result<()> { - support!( - self, - SUPPORT_WRITEV, - Writev, - Writev::new(Fd(fd), iov, iovcnt as u32) - .build() - .user_data(user_data as u64) - ) - } - - pub(crate) fn pwritev( - &self, - user_data: usize, - fd: c_int, - iov: *const iovec, - iovcnt: c_int, - offset: off_t, - ) -> std::io::Result<()> { - support!( - self, - SUPPORT_WRITEV, - Writev, - Writev::new(Fd(fd), iov, iovcnt as u32) - .offset(offset as u64) - .build() - .user_data(user_data as u64) - ) - } - - pub(crate) fn sendmsg( - &self, - user_data: usize, - fd: c_int, - msg: *const msghdr, - flags: c_int, - ) -> std::io::Result<()> { - support!( - self, - SUPPORT_SENDMSG, - SendMsg, - SendMsg::new(Fd(fd), msg) - .flags(flags as u32) - .build() - .user_data(user_data as u64) - ) - } -} +#[cfg(all(target_os = "linux", feature = "io_uring"))] +mod linux; +#[cfg(all(target_os = "linux", feature = "io_uring"))] +pub(crate) use linux::*; + +#[cfg(all(windows, feature = "iocp"))] +mod windows; +#[cfg(all(windows, feature = "iocp"))] +pub(crate) use windows::*; diff --git a/core/src/net/operator/windows.rs b/core/src/net/operator/windows.rs new file mode 100644 index 00000000..f12b00ab --- /dev/null +++ b/core/src/net/operator/windows.rs @@ -0,0 +1,198 @@ +use crate::common::constants::Syscall; +use crate::common::{get_timeout_time, now}; +use dashmap::{DashMap, DashSet}; +use once_cell::sync::Lazy; +use std::ffi::c_int; +use std::io::{Error, ErrorKind}; +use std::marker::PhantomData; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::time::{Duration, Instant}; +use windows_sys::Win32::Foundation::{ + ERROR_NETNAME_DELETED, FALSE, HANDLE, INVALID_HANDLE_VALUE, WAIT_TIMEOUT, +}; +use windows_sys::Win32::Networking::WinSock::{ + closesocket, WSAGetLastError, INVALID_SOCKET, IPPROTO, SOCKADDR, SOCKADDR_IN, SOCKET, + WINSOCK_SOCKET_TYPE, WSA_FLAG_OVERLAPPED, WSA_IO_PENDING, +}; +use windows_sys::Win32::System::IO::{ + CreateIoCompletionPort, GetQueuedCompletionStatus, OVERLAPPED, +}; + +#[repr(C)] +#[derive(Debug, Copy, Clone, Eq, PartialEq, Ord, PartialOrd, Hash)] +pub(crate) struct SocketContext { + pub(crate) domain: c_int, + pub(crate) ty: WINSOCK_SOCKET_TYPE, + pub(crate) protocol: IPPROTO, +} + +pub(crate) static SOCKET_CONTEXT: Lazy> = + Lazy::new(Default::default); + +/// The overlapped struct we actually used for IOCP. +#[repr(C)] +pub(crate) struct Overlapped { + /// The base [`OVERLAPPED`]. + pub base: OVERLAPPED, + pub from_fd: SOCKET, + pub socket: SOCKET, + pub token: usize, + pub syscall: Syscall, + pub dw_number_of_bytes_transferred: u32, +} + +#[repr(C)] +#[derive(Debug)] +pub(crate) struct Operator<'o> { + iocp: HANDLE, + entering: AtomicBool, + handles: DashSet, + phantom_data: PhantomData<&'o HANDLE>, +} + +impl Operator<'_> { + pub(crate) fn new(_cpu: usize) -> std::io::Result { + let iocp = + unsafe { CreateIoCompletionPort(INVALID_HANDLE_VALUE, std::ptr::null_mut(), 0, 0) }; + if iocp == std::ptr::null_mut() { + return Err(Error::last_os_error()); + } + Ok(Self { + iocp, + entering: AtomicBool::new(false), + handles: Default::default(), + phantom_data: Default::default(), + }) + } + + /// Associates a new `HANDLE` to this I/O completion port. + /// + /// This function will associate the given handle to this port with the + /// given `token` to be returned in status messages whenever it receives a + /// notification. + /// + /// Any object which is convertible to a `HANDLE` via the `AsRawHandle` + /// trait can be provided to this function, such as `std::fs::File` and + /// friends. + fn add_handle(&self, token: usize, handle: HANDLE) -> std::io::Result<()> { + assert_eq!(size_of_val(&token), size_of::()); + let ret = unsafe { CreateIoCompletionPort(handle, self.iocp, token, 0) }; + if ret == std::ptr::null_mut() { + return Err(Error::last_os_error()); + } + debug_assert_eq!(ret, self.iocp); + Ok(()) + } + + pub(crate) fn select( + &self, + timeout: Option, + want: usize, + ) -> std::io::Result<(usize, Vec, Option)> { + if self + .entering + .compare_exchange(false, true, Ordering::Acquire, Ordering::Relaxed) + .is_err() + { + return Ok((0, Vec::new(), timeout)); + } + let result = self.do_select(timeout, want); + self.entering.store(false, Ordering::Release); + return result; + } + + fn do_select( + &self, + timeout: Option, + want: usize, + ) -> std::io::Result<(usize, Vec, Option)> { + let start_time = Instant::now(); + let timeout_time = timeout.map(|t| get_timeout_time(t)).unwrap_or(u64::MAX); + let mut cq = Vec::new(); + loop { + let mut bytes = 0; + let mut token = 0; + let mut overlapped: Overlapped = unsafe { std::mem::zeroed() }; + let ret = unsafe { + GetQueuedCompletionStatus( + self.iocp, + &mut bytes, + &mut token, + (&mut overlapped as *mut Overlapped).cast(), + 1, + ) + }; + if ret == FALSE { + let err = Error::last_os_error().raw_os_error(); + if Some(ERROR_NETNAME_DELETED as i32) == err || Some(WAIT_TIMEOUT as i32) == err { + _ = unsafe { closesocket(overlapped.socket) }; + if cq.len() >= want || timeout_time.saturating_sub(now()) == 0 { + break; + } + continue; + } + } + overlapped.token = token; + overlapped.dw_number_of_bytes_transferred = bytes; + cq.push(overlapped); + if cq.len() >= want || timeout_time.saturating_sub(now()) == 0 { + break; + } + } + let cost = Instant::now().saturating_duration_since(start_time); + Ok((cq.len(), cq, timeout.map(|t| t.saturating_sub(cost)))) + } + + pub(crate) fn accept( + &self, + user_data: usize, + fd: SOCKET, + _address: *mut SOCKADDR, + _address_len: *mut c_int, + ) -> std::io::Result<()> { + if !self.handles.contains(&(fd as HANDLE)) { + self.add_handle(fd, fd as HANDLE)?; + } + let context = SOCKET_CONTEXT.get(&fd).expect("socket context not found"); + let ctx = context.value(); + unsafe { + let socket = windows_sys::Win32::Networking::WinSock::WSASocketW( + ctx.domain, + ctx.ty, + ctx.protocol, + std::ptr::null(), + 0, + WSA_FLAG_OVERLAPPED, + ); + if INVALID_SOCKET == socket { + return Err(Error::new(ErrorKind::Other, "add accept operation failed")); + } + let size = size_of::() + .saturating_add(16) + .try_into() + .expect("size overflow"); + let mut lpdwbytesreceived = 0; + let mut lpoverlapped: Overlapped = std::mem::zeroed(); + lpoverlapped.from_fd = fd; + lpoverlapped.socket = socket; + lpoverlapped.token = user_data; + lpoverlapped.syscall = Syscall::accept; + while windows_sys::Win32::Networking::WinSock::AcceptEx( + fd, + socket, + std::ptr::null_mut(), + 0, + size, + size, + &mut lpdwbytesreceived, + (&mut lpoverlapped as *mut Overlapped).cast(), + ) == FALSE + { + if WSA_IO_PENDING == WSAGetLastError() { + break; + } + } + } + Ok(()) + } +} diff --git a/core/src/net/selector/mio_adapter.rs b/core/src/net/selector/mio_adapter.rs index f8126cac..57a06525 100644 --- a/core/src/net/selector/mio_adapter.rs +++ b/core/src/net/selector/mio_adapter.rs @@ -1,6 +1,5 @@ use crate::common::CondvarBlocker; use crossbeam_utils::atomic::AtomicCell; -use derivative::Derivative; use mio::event::Event; use mio::unix::SourceFd; use mio::{Events, Interest, Poll, Token}; @@ -47,12 +46,12 @@ impl super::EventIterator for Events { } #[repr(C)] -#[derive(Derivative)] -#[derivative(Debug)] +#[derive(educe::Educe)] +#[educe(Debug)] pub(crate) struct Poller { waiting: AtomicBool, blocker: CondvarBlocker, - #[derivative(Debug = "ignore")] + #[educe(Debug(ignore))] inner: AtomicCell, } diff --git a/core/src/syscall/windows/WSASocketW.rs b/core/src/syscall/windows/WSASocketW.rs index fe0022ef..f653a217 100644 --- a/core/src/syscall/windows/WSASocketW.rs +++ b/core/src/syscall/windows/WSASocketW.rs @@ -1,8 +1,6 @@ use once_cell::sync::Lazy; use std::ffi::{c_int, c_uint}; -use windows_sys::Win32::Networking::WinSock::{ - IPPROTO, SOCKET, WINSOCK_SOCKET_TYPE, WSAPROTOCOL_INFOW, -}; +use windows_sys::Win32::Networking::WinSock::{IPPROTO, SOCKET, WINSOCK_SOCKET_TYPE, WSAPROTOCOL_INFOW}; #[must_use] pub extern "system" fn WSASocketW( @@ -23,7 +21,7 @@ pub extern "system" fn WSASocketW( g: c_uint, dw_flags: c_uint, ) -> SOCKET { - static CHAIN: Lazy> = Lazy::new(Default::default); + static CHAIN: Lazy>> = Lazy::new(Default::default); CHAIN.WSASocketW(fn_ptr, domain, ty, protocol, lpprotocolinfo, g, dw_flags) } @@ -60,6 +58,45 @@ impl_facade!(WSASocketWSyscallFacade, WSASocketWSyscall, ) -> SOCKET ); +#[repr(C)] +#[derive(Debug, Default)] +struct NioWSASocketWSyscall { + inner: I, +} + +impl WSASocketWSyscall for NioWSASocketWSyscall { + extern "system" fn WSASocketW( + &self, + fn_ptr: Option< + &extern "system" fn( + c_int, + WINSOCK_SOCKET_TYPE, + IPPROTO, + *const WSAPROTOCOL_INFOW, + c_uint, + c_uint, + ) -> SOCKET, + >, + domain: c_int, + ty: WINSOCK_SOCKET_TYPE, + protocol: IPPROTO, + lpprotocolinfo: *const WSAPROTOCOL_INFOW, + g: c_uint, + dw_flags: c_uint + ) -> SOCKET { + let r = self.inner.WSASocketW(fn_ptr, domain, ty, protocol, lpprotocolinfo, g, dw_flags); + #[cfg(feature = "iocp")] + if windows_sys::Win32::Networking::WinSock::INVALID_SOCKET != r { + _ = crate::net::operator::SOCKET_CONTEXT.insert(r,crate::net::operator::SocketContext{ + domain, + ty, + protocol, + }); + } + r + } +} + impl_raw!(RawWSASocketWSyscall, WSASocketWSyscall, windows_sys::Win32::Networking::WinSock, WSASocketW( domain: c_int, diff --git a/core/src/syscall/windows/accept.rs b/core/src/syscall/windows/accept.rs index dbf1dd60..f70f13e2 100644 --- a/core/src/syscall/windows/accept.rs +++ b/core/src/syscall/windows/accept.rs @@ -9,8 +9,16 @@ pub extern "system" fn accept( address: *mut SOCKADDR, address_len: *mut c_int, ) -> SOCKET { - static CHAIN: Lazy>> = - Lazy::new(Default::default); + cfg_if::cfg_if! { + if #[cfg(feature = "iocp")] { + static CHAIN: Lazy< + AcceptSyscallFacade>> + > = Lazy::new(Default::default); + } else { + static CHAIN: Lazy>> = + Lazy::new(Default::default); + } + } CHAIN.accept(fn_ptr, fd, address, address_len) } @@ -28,6 +36,10 @@ impl_facade!(AcceptSyscallFacade, AcceptSyscall, accept(fd: SOCKET, address: *mut SOCKADDR, address_len: *mut c_int) -> SOCKET ); +impl_iocp!(IocpAcceptSyscall, AcceptSyscall, + accept(fd: SOCKET, address: *mut SOCKADDR, address_len: *mut c_int) -> SOCKET +); + impl_nio_read!(NioAcceptSyscall, AcceptSyscall, accept(fd: SOCKET, address: *mut SOCKADDR, address_len: *mut c_int) -> SOCKET ); diff --git a/core/src/syscall/windows/mod.rs b/core/src/syscall/windows/mod.rs index 5e66ffa4..113434bf 100644 --- a/core/src/syscall/windows/mod.rs +++ b/core/src/syscall/windows/mod.rs @@ -43,6 +43,82 @@ macro_rules! impl_facade { } } +macro_rules! impl_iocp { + ( $struct_name:ident, $trait_name: ident, $syscall: ident($($arg: ident : $arg_type: ty),*) -> $result: ty ) => { + #[repr(C)] + #[derive(Debug, Default)] + #[cfg(all(windows, feature = "iocp"))] + struct $struct_name { + inner: I, + } + + #[cfg(all(windows, feature = "iocp"))] + impl $trait_name for $struct_name { + extern "system" fn $syscall( + &self, + fn_ptr: Option<&extern "system" fn($($arg_type),*) -> $result>, + $($arg: $arg_type),* + ) -> $result { + if let Ok(arc) = $crate::net::EventLoops::$syscall($($arg, )*) { + use $crate::common::constants::{CoroutineState, SyscallState}; + use $crate::scheduler::{SchedulableCoroutine, SchedulableSuspender}; + + if let Some(co) = SchedulableCoroutine::current() { + if let CoroutineState::SystemCall((), syscall, SyscallState::Executing) = co.state() + { + let new_state = SyscallState::Suspend(u64::MAX); + if co.syscall((), syscall, new_state).is_err() { + $crate::error!( + "{} change to syscall {} {} failed !", + co.name(), + syscall, + new_state + ); + } + } + } + if let Some(suspender) = SchedulableSuspender::current() { + suspender.suspend(); + //回来的时候,系统调用已经执行完了 + } + if let Some(co) = SchedulableCoroutine::current() { + if let CoroutineState::SystemCall((), syscall, SyscallState::Callback) = co.state() + { + let new_state = SyscallState::Executing; + if co.syscall((), syscall, new_state).is_err() { + $crate::error!( + "{} change to syscall {} {} failed !", + co.name(), + syscall, + new_state + ); + } + } + } + let (lock, cvar) = &*arc; + let syscall_result: $result = cvar + .wait_while(lock.lock().expect("lock failed"), + |&mut result| result.is_none() + ) + .expect("lock failed") + .expect("no syscall result") + .try_into() + .expect("IOCP syscall result overflow"); + // fixme 错误处理 + // if syscall_result < 0 { + // let errno: std::ffi::c_int = (-syscall_result).try_into() + // .expect("IOCP errno overflow"); + // $crate::syscall::common::set_errno(errno); + // syscall_result = -1; + // } + return syscall_result; + } + self.inner.$syscall(fn_ptr, $($arg, )*) + } + } + } +} + macro_rules! impl_nio_read { ( $struct_name:ident, $trait_name: ident, $syscall: ident($fd: ident : $fd_type: ty, $($arg: ident : $arg_type: ty),*) -> $result: ty ) => { #[repr(C)] @@ -492,23 +568,23 @@ static SEND_TIME_LIMIT: Lazy> = Lazy::new(Default::default) static RECV_TIME_LIMIT: Lazy> = Lazy::new(Default::default); -pub extern "C" fn set_errno(errno: windows_sys::Win32::Foundation::WIN32_ERROR) { +pub extern "system" fn set_errno(errno: windows_sys::Win32::Foundation::WIN32_ERROR) { unsafe { windows_sys::Win32::Foundation::SetLastError(errno) } } /// # Panics /// if set fails. -pub extern "C" fn set_non_blocking(fd: SOCKET) { +pub extern "system" fn set_non_blocking(fd: SOCKET) { assert!(set_non_blocking_flag(fd, true), "set_non_blocking failed !"); } /// # Panics /// if set fails. -pub extern "C" fn set_blocking(fd: SOCKET) { +pub extern "system" fn set_blocking(fd: SOCKET) { assert!(set_non_blocking_flag(fd, false), "set_blocking failed !"); } -extern "C" fn set_non_blocking_flag(fd: SOCKET, on: bool) -> bool { +extern "system" fn set_non_blocking_flag(fd: SOCKET, on: bool) -> bool { let non_blocking = is_non_blocking(fd); if non_blocking == on { return true; @@ -524,17 +600,17 @@ extern "C" fn set_non_blocking_flag(fd: SOCKET, on: bool) -> bool { } #[must_use] -pub extern "C" fn is_blocking(fd: SOCKET) -> bool { +pub extern "system" fn is_blocking(fd: SOCKET) -> bool { !is_non_blocking(fd) } #[must_use] -pub extern "C" fn is_non_blocking(fd: SOCKET) -> bool { +pub extern "system" fn is_non_blocking(fd: SOCKET) -> bool { NON_BLOCKING.contains(&fd) } #[must_use] -pub extern "C" fn send_time_limit(fd: SOCKET) -> u64 { +pub extern "system" fn send_time_limit(fd: SOCKET) -> u64 { SEND_TIME_LIMIT.get(&fd).map_or_else( || { let mut ms = 0; @@ -561,7 +637,7 @@ pub extern "C" fn send_time_limit(fd: SOCKET) -> u64 { } #[must_use] -pub extern "C" fn recv_time_limit(fd: SOCKET) -> u64 { +pub extern "system" fn recv_time_limit(fd: SOCKET) -> u64 { RECV_TIME_LIMIT.get(&fd).map_or_else( || { let mut ms = 0; diff --git a/core/src/syscall/windows/socket.rs b/core/src/syscall/windows/socket.rs index aff69705..b5ca20fc 100644 --- a/core/src/syscall/windows/socket.rs +++ b/core/src/syscall/windows/socket.rs @@ -9,7 +9,7 @@ pub extern "system" fn socket( ty: WINSOCK_SOCKET_TYPE, protocol: IPPROTO, ) -> SOCKET { - static CHAIN: Lazy> = Lazy::new(Default::default); + static CHAIN: Lazy>> = Lazy::new(Default::default); CHAIN.socket(fn_ptr, domain, ty, protocol) } @@ -27,6 +27,27 @@ impl_facade!(SocketSyscallFacade, SocketSyscall, socket(domain: c_int, ty: WINSOCK_SOCKET_TYPE, protocol: IPPROTO) -> SOCKET ); +#[repr(C)] +#[derive(Debug, Default)] +struct NioSocketSyscall { + inner: I, +} + +impl SocketSyscall for NioSocketSyscall { + extern "system" fn socket(&self, fn_ptr: Option<&extern "system" fn(c_int, WINSOCK_SOCKET_TYPE, IPPROTO) -> SOCKET>, domain: c_int, ty: WINSOCK_SOCKET_TYPE, protocol: IPPROTO) -> SOCKET { + let r = self.inner.socket(fn_ptr, domain, ty, protocol); + #[cfg(feature = "iocp")] + if windows_sys::Win32::Networking::WinSock::INVALID_SOCKET != r { + _ = crate::net::operator::SOCKET_CONTEXT.insert(r,crate::net::operator::SocketContext{ + domain, + ty, + protocol, + }); + } + r + } +} + impl_raw!(RawSocketSyscall, SocketSyscall, windows_sys::Win32::Networking::WinSock, socket(domain: c_int, ty: WINSOCK_SOCKET_TYPE, protocol: IPPROTO) -> SOCKET ); diff --git a/hook/Cargo.toml b/hook/Cargo.toml index 47c4f4a5..f82ab70b 100644 --- a/hook/Cargo.toml +++ b/hook/Cargo.toml @@ -44,6 +44,12 @@ net = ["open-coroutine-core/net"] # Provide io_uring adaptation, this feature only works in linux. io_uring = ["open-coroutine-core/io_uring"] +# Provide IOCP adaptation, this feature only works in windows. +iocp = ["open-coroutine-core/iocp"] + +# Provide completion IOCP adaptation +completion_io = ["open-coroutine-core/completion_io"] + # Provide syscall implementation. syscall = ["open-coroutine-core/syscall"] diff --git a/hook/src/lib.rs b/hook/src/lib.rs index 51b153de..7c1c7d4d 100644 --- a/hook/src/lib.rs +++ b/hook/src/lib.rs @@ -68,11 +68,7 @@ pub(crate) fn hook() -> bool { clippy::similar_names, clippy::not_unsafe_ptr_arg_deref, clippy::many_single_char_names, - clippy::cast_sign_loss, - clippy::cast_possible_truncation, - clippy::cast_possible_wrap, - clippy::unnecessary_cast, - trivial_numeric_casts + clippy::unnecessary_cast )] pub mod syscall; diff --git a/open-coroutine/Cargo.toml b/open-coroutine/Cargo.toml index 8dacd906..60d21d1f 100644 --- a/open-coroutine/Cargo.toml +++ b/open-coroutine/Cargo.toml @@ -56,5 +56,11 @@ net = ["open-coroutine-hook/net", "open-coroutine-core/net"] # This feature only works in linux. io_uring = ["open-coroutine-hook/io_uring", "open-coroutine-core/io_uring"] +# Provide IOCP adaptation, this feature only works in windows. +iocp = ["open-coroutine-hook/iocp", "open-coroutine-core/iocp"] + +# Provide completion IOCP adaptation +completion_io = ["open-coroutine-hook/completion_io", "open-coroutine-core/completion_io"] + # Provide syscall implementation. syscall = ["open-coroutine-hook/syscall", "open-coroutine-core/syscall"] diff --git a/open-coroutine/build.rs b/open-coroutine/build.rs index 798d1a41..0882f98d 100644 --- a/open-coroutine/build.rs +++ b/open-coroutine/build.rs @@ -43,6 +43,28 @@ fn main() { .expect("parent not found") .join("hook") .join("Cargo.toml"); + let metadata = MetadataCommand::default() + .no_deps() + .exec() + .expect("read cargo metadata failed"); + let package = if hook_toml.exists() { + metadata + .packages + .iter() + .find(|pkg| pkg.name.eq("open-coroutine")) + .expect("read current package failed") + } else { + metadata + .packages + .first() + .expect("read current package failed") + }; + info!("read package:{:#?}", package); + let dependency = package + .dependencies + .iter() + .find(|dep| dep.name.eq("open-coroutine-hook")) + .expect("open-coroutine-hook not found"); if !hook_toml.exists() { info!( "{:?} not exists, find open-coroutine-hook's Cargo.toml in $CARGO_HOME", @@ -92,20 +114,6 @@ fn main() { .to_string_lossy() .to_string(); info!("crates parent dirs:{:?}", crates_parent_dirs); - let metadata = MetadataCommand::default() - .no_deps() - .exec() - .expect("read cargo metadata failed"); - let package = metadata - .packages - .first() - .expect("read current package failed"); - info!("read package:{:#?}", package); - let dependency = package - .dependencies - .iter() - .find(|dep| dep.name.eq("open-coroutine-hook")) - .expect("open-coroutine-hook not found"); let version = &dependency .req .comparators @@ -122,7 +130,38 @@ fn main() { .join("Cargo.toml"); } info!("open-coroutine-hook's Cargo.toml is here:{:?}", hook_toml); + if !dependency.uses_default_features { + cmd = cmd.arg("--no-default-features"); + } + let mut features = Vec::new(); + if cfg!(feature = "log") { + features.push("log"); + } + if cfg!(feature = "preemptive") { + features.push("preemptive"); + } + if cfg!(feature = "net") { + features.push("net"); + } + if cfg!(feature = "io_uring") { + features.push("io_uring"); + } + if cfg!(feature = "iocp") { + features.push("iocp"); + } + if cfg!(feature = "completion_io") { + features.push("completion_io"); + } + if cfg!(feature = "syscall") { + features.push("syscall"); + } + info!( + "use open-coroutine-hook's default-features:{} and features:{:?}", + dependency.uses_default_features, features + ); if let Err(e) = cmd + .arg("--features") + .arg(features.join(",")) .arg("--manifest-path") .arg(hook_toml) .arg("--target-dir") diff --git a/open-coroutine/src/lib.rs b/open-coroutine/src/lib.rs index 205ae160..62b0d65f 100644 --- a/open-coroutine/src/lib.rs +++ b/open-coroutine/src/lib.rs @@ -134,10 +134,9 @@ pub struct JoinHandle(open_coroutine_core::net::join::JoinHandle, PhantomData #[allow(missing_docs)] impl JoinHandle { - #[allow(clippy::cast_possible_truncation)] pub fn timeout_join(&self, dur: Duration) -> std::io::Result> { unsafe { - let ptr = task_timeout_join(self, dur.as_nanos() as u64); + let ptr = task_timeout_join(self, dur.as_nanos().try_into().expect("overflow")); match ptr.cmp(&0) { Ordering::Less => Err(Error::new(ErrorKind::Other, "timeout join failed")), Ordering::Equal => Ok(None),