diff --git a/Cargo.toml b/Cargo.toml index d4efe6ff..3825573e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -28,6 +28,7 @@ mio = { version = "1.0", default-features = false } cfg-if = "1.0.0" polling = "2.8.0" +educe = "0.6.0" libc = "0.2" rand = "0.8" @@ -50,7 +51,6 @@ once_cell = "1" dashmap = "6" num_cpus = "1" uuid = "1" -derivative = "2" tempfile = "3" cc = "1" syn = "2" diff --git a/core/Cargo.toml b/core/Cargo.toml index 6f8b255b..af54b320 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -29,7 +29,7 @@ uuid = { workspace = true, features = [ "v4", "fast-rng", ], optional = true } -derivative = { workspace = true, optional = true } +educe = { workspace = true, optional = true } core_affinity = { workspace = true, optional = true } crossbeam-utils = { workspace = true, optional = true } psm.workspace = true @@ -77,7 +77,7 @@ backtrace.workspace = true log = ["tracing", "tracing-subscriber", "time"] # low-level raw coroutine -korosensei = ["corosensei", "uuid", "nix/pthread", "derivative"] +korosensei = ["corosensei", "uuid", "nix/pthread", "educe"] # Provide preemptive scheduling implementation. # Enable for default. diff --git a/core/src/co_pool/task.rs b/core/src/co_pool/task.rs index 0553c749..ef374143 100644 --- a/core/src/co_pool/task.rs +++ b/core/src/co_pool/task.rs @@ -1,16 +1,15 @@ use crate::catch; -use derivative::Derivative; /// 做C兼容时会用到 pub type UserTaskFunc = extern "C" fn(usize) -> usize; /// The task impls. #[repr(C)] -#[derive(Derivative)] -#[derivative(Debug)] +#[derive(educe::Educe)] +#[educe(Debug)] pub struct Task<'t> { name: String, - #[derivative(Debug = "ignore")] + #[educe(Debug(ignore))] func: Box) -> Option + 't>, param: Option, } diff --git a/core/src/common/timer.rs b/core/src/common/timer.rs index e912b830..d3cbec88 100644 --- a/core/src/common/timer.rs +++ b/core/src/common/timer.rs @@ -1,5 +1,4 @@ use crate::impl_display_by_debug; -use derivative::Derivative; use std::collections::{BTreeMap, VecDeque}; use std::ops::{Deref, DerefMut}; use std::sync::atomic::{AtomicUsize, Ordering}; @@ -62,11 +61,11 @@ impl_display_by_debug!(TimerEntry); /// A queue for managing multiple `TimerEntry`. #[repr(C)] -#[derive(Derivative)] -#[derivative(Debug, Eq, PartialEq)] +#[derive(educe::Educe)] +#[educe(Debug, Eq, PartialEq)] pub struct TimerList { inner: BTreeMap>, - #[derivative(PartialEq = "ignore")] + #[educe(PartialEq(ignore))] total: AtomicUsize, } diff --git a/core/src/coroutine/suspender.rs b/core/src/coroutine/suspender.rs index 252ba982..4698db31 100644 --- a/core/src/coroutine/suspender.rs +++ b/core/src/coroutine/suspender.rs @@ -5,6 +5,7 @@ use std::collections::VecDeque; use std::time::Duration; thread_local! { + #[allow(clippy::missing_const_for_thread_local)] static TIMESTAMP: RefCell> = const { RefCell::new(VecDeque::new()) }; } @@ -52,14 +53,13 @@ pub use korosensei::Suspender; #[cfg(feature = "korosensei")] mod korosensei { use corosensei::Yielder; - use derivative::Derivative; /// Ths suspender implemented for coroutine. #[repr(C)] - #[derive(Derivative)] - #[derivative(Debug)] + #[derive(educe::Educe)] + #[educe(Debug)] pub struct Suspender<'s, Param, Yield> { - #[derivative(Debug = "ignore")] + #[educe(Debug(ignore))] inner: &'s Yielder, } diff --git a/core/src/net/event_loop.rs b/core/src/net/event_loop.rs index 98db60e7..db7e5dbe 100644 --- a/core/src/net/event_loop.rs +++ b/core/src/net/event_loop.rs @@ -21,6 +21,7 @@ cfg_if::cfg_if! { if #[cfg(all(target_os = "linux", feature = "io_uring"))] { use libc::{epoll_event, iovec, msghdr, off_t, size_t, sockaddr, socklen_t, ssize_t}; use dashmap::DashMap; + use std::ffi::c_longlong; } } @@ -36,7 +37,7 @@ pub(crate) struct EventLoop<'e> { operator: crate::net::operator::Operator<'e>, #[allow(clippy::type_complexity)] #[cfg(all(target_os = "linux", feature = "io_uring"))] - syscall_wait_table: DashMap>, Condvar)>>, + syscall_wait_table: DashMap>, Condvar)>>, selector: Poller, pool: CoroutinePool<'e>, phantom_data: PhantomData<&'e EventLoop<'e>>, @@ -223,7 +224,27 @@ impl<'e> EventLoop<'e> { } } - #[cfg(all(target_os = "linux", feature = "io_uring"))] + cfg_if::cfg_if! { + if #[cfg(all(target_os = "linux", feature = "io_uring"))] { + left_time = self.adapt_io_uring(left_time)?; + } + } + + // use epoll/kevent/iocp + let mut events = Events::with_capacity(1024); + self.selector.select(&mut events, left_time)?; + #[allow(clippy::explicit_iter_loop)] + for event in events.iter() { + let token = event.get_token(); + if event.readable() || event.writable() { + unsafe { self.resume(token) }; + } + } + Ok(()) + } + + #[cfg(all(target_os = "linux", feature = "io_uring"))] + fn adapt_io_uring(&self, mut left_time: Option) -> std::io::Result> { if crate::net::operator::support_io_uring() { // use io_uring let (count, mut cq, left) = self.operator.select(left_time, 0)?; @@ -239,7 +260,7 @@ impl<'e> EventLoop<'e> { if let Some((_, pair)) = self.syscall_wait_table.remove(&token) { let (lock, cvar) = &*pair; let mut pending = lock.lock().expect("lock failed"); - *pending = Some(result); + *pending = Some(result.try_into().expect("result overflow")); cvar.notify_one(); } unsafe { self.resume(token) }; @@ -249,18 +270,7 @@ impl<'e> EventLoop<'e> { left_time = Some(left.unwrap_or(Duration::ZERO)); } } - - // use epoll/kevent/iocp - let mut events = Events::with_capacity(1024); - self.selector.select(&mut events, left_time)?; - #[allow(clippy::explicit_iter_loop)] - for event in events.iter() { - let token = event.get_token(); - if event.readable() || event.writable() { - unsafe { self.resume(token) }; - } - } - Ok(()) + Ok(left_time) } #[allow(clippy::unused_self)] @@ -404,7 +414,7 @@ macro_rules! impl_io_uring { pub(super) fn $syscall( &self, $($arg: $arg_type),* - ) -> std::io::Result>, Condvar)>> { + ) -> std::io::Result>, Condvar)>> { let token = EventLoop::token(Syscall::$syscall); self.operator.$syscall(token, $($arg, )*)?; let arc = Arc::new((Mutex::new(None), Condvar::new())); diff --git a/core/src/net/mod.rs b/core/src/net/mod.rs index c8b73e69..a9283020 100644 --- a/core/src/net/mod.rs +++ b/core/src/net/mod.rs @@ -16,8 +16,8 @@ pub type UserFunc = extern "C" fn(usize) -> usize; cfg_if::cfg_if! { if #[cfg(all(target_os = "linux", feature = "io_uring"))] { - use libc::{epoll_event, iovec, msghdr, off_t, size_t, sockaddr, socklen_t, ssize_t}; - use std::ffi::c_void; + use libc::{epoll_event, iovec, msghdr, off_t, size_t, sockaddr, socklen_t}; + use std::ffi::{c_longlong, c_void}; } } @@ -247,7 +247,7 @@ macro_rules! impl_io_uring { #[allow(missing_docs)] pub fn $syscall( $($arg: $arg_type),* - ) -> std::io::Result>, Condvar)>> { + ) -> std::io::Result>, Condvar)>> { Self::event_loop().$syscall($($arg, )*) } } diff --git a/core/src/net/operator/linux/mod.rs b/core/src/net/operator/linux/mod.rs new file mode 100644 index 00000000..94d2bfd3 --- /dev/null +++ b/core/src/net/operator/linux/mod.rs @@ -0,0 +1,690 @@ +use io_uring::opcode::{ + Accept, AsyncCancel, Close, Connect, EpollCtl, Fsync, MkDirAt, OpenAt, PollAdd, PollRemove, + Read, Readv, Recv, RecvMsg, RenameAt, Send, SendMsg, SendZc, Shutdown, Socket, Timeout, + TimeoutRemove, TimeoutUpdate, Write, Writev, +}; +use io_uring::squeue::Entry; +use io_uring::types::{epoll_event, Fd, Timespec}; +use io_uring::{CompletionQueue, IoUring, Probe}; +use libc::{ + c_char, c_int, c_uint, c_void, iovec, mode_t, msghdr, off_t, size_t, sockaddr, socklen_t, EBUSY, +}; +use once_cell::sync::Lazy; +use std::collections::VecDeque; +use std::io::{Error, ErrorKind}; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::Mutex; +use std::time::{Duration, Instant}; + +#[cfg(test)] +mod tests; + +static SUPPORT: Lazy = + Lazy::new(|| crate::common::current_kernel_version() >= crate::common::kernel_version(5, 6, 0)); + +#[must_use] +pub(crate) fn support_io_uring() -> bool { + *SUPPORT +} + +static PROBE: Lazy = Lazy::new(|| { + let mut probe = Probe::new(); + if let Ok(io_uring) = IoUring::new(2) { + if let Ok(()) = io_uring.submitter().register_probe(&mut probe) { + return probe; + } + } + panic!("probe init failed !") +}); + +// check https://www.rustwiki.org.cn/en/reference/introduction.html for help information +macro_rules! support { + ( $self:ident, $struct_name:ident, $opcode:ident, $impls:expr ) => { + return { + static $struct_name: Lazy = once_cell::sync::Lazy::new(|| { + if $crate::net::operator::support_io_uring() { + return PROBE.is_supported($opcode::CODE); + } + false + }); + if *$struct_name { + return $self.push_sq($impls); + } + Err(Error::new(ErrorKind::Unsupported, "unsupported")) + } + }; +} + +#[repr(C)] +#[derive(educe::Educe)] +#[educe(Debug)] +pub(crate) struct Operator<'o> { + #[educe(Debug(ignore))] + inner: IoUring, + entering: AtomicBool, + backlog: Mutex>, +} + +impl Operator<'_> { + pub(crate) fn new(cpu: usize) -> std::io::Result { + IoUring::builder() + .setup_sqpoll(1000) + .setup_sqpoll_cpu(u32::try_from(cpu).unwrap_or(u32::MAX)) + .build(1024) + .map(|inner| Self { + inner, + entering: AtomicBool::new(false), + backlog: Mutex::new(VecDeque::new()), + }) + } + + fn push_sq(&self, entry: Entry) -> std::io::Result<()> { + let entry = Box::leak(Box::new(entry)); + if unsafe { self.inner.submission_shared().push(entry).is_err() } { + self.backlog + .lock() + .expect("backlog lock failed") + .push_back(entry); + } + self.inner.submit().map(|_| ()) + } + + pub(crate) fn select( + &self, + timeout: Option, + want: usize, + ) -> std::io::Result<(usize, CompletionQueue, Option)> { + if support_io_uring() { + if self + .entering + .compare_exchange(false, true, Ordering::Acquire, Ordering::Relaxed) + .is_err() + { + return Ok((0, unsafe { self.inner.completion_shared() }, timeout)); + } + let result = self.do_select(timeout, want); + self.entering.store(false, Ordering::Release); + return result; + } + Err(Error::new(ErrorKind::Unsupported, "unsupported")) + } + + fn do_select( + &self, + timeout: Option, + want: usize, + ) -> std::io::Result<(usize, CompletionQueue, Option)> { + let start_time = Instant::now(); + self.timeout_add(crate::common::constants::IO_URING_TIMEOUT_USERDATA, timeout)?; + let mut cq = unsafe { self.inner.completion_shared() }; + // when submit queue is empty, submit_and_wait will block + let count = match self.inner.submit_and_wait(want) { + Ok(count) => count, + Err(err) => { + if err.raw_os_error() == Some(EBUSY) { + 0 + } else { + return Err(err); + } + } + }; + cq.sync(); + + // clean backlog + let mut sq = unsafe { self.inner.submission_shared() }; + loop { + if sq.is_full() { + match self.inner.submit() { + Ok(_) => (), + Err(err) => { + if err.raw_os_error() == Some(EBUSY) { + break; + } + return Err(err); + } + } + } + sq.sync(); + + let mut backlog = self.backlog.lock().expect("backlog lock failed"); + match backlog.pop_front() { + Some(sqe) => { + if unsafe { sq.push(sqe).is_err() } { + backlog.push_front(sqe); + break; + } + } + None => break, + } + } + let cost = Instant::now().saturating_duration_since(start_time); + Ok((count, cq, timeout.map(|t| t.saturating_sub(cost)))) + } + + pub(crate) fn async_cancel(&self, user_data: usize) -> std::io::Result<()> { + support!( + self, + SUPPORT_ASYNC_CANCEL, + AsyncCancel, + AsyncCancel::new(user_data as u64) + .build() + .user_data(user_data as u64) + ) + } + + pub(crate) fn epoll_ctl( + &self, + user_data: usize, + epfd: c_int, + op: c_int, + fd: c_int, + event: *mut libc::epoll_event, + ) -> std::io::Result<()> { + support!( + self, + SUPPORT_EPOLL_CTL, + EpollCtl, + EpollCtl::new( + Fd(epfd), + Fd(fd), + op, + event.cast_const().cast::(), + ) + .build() + .user_data(user_data as u64) + ) + } + + pub(crate) fn poll_add( + &self, + user_data: usize, + fd: c_int, + flags: c_int, + ) -> std::io::Result<()> { + support!( + self, + SUPPORT_POLL_ADD, + PollAdd, + PollAdd::new(Fd(fd), flags as u32) + .build() + .user_data(user_data as u64) + ) + } + + pub(crate) fn poll_remove(&self, user_data: usize) -> std::io::Result<()> { + support!( + self, + SUPPORT_POLL_REMOVE, + PollRemove, + PollRemove::new(user_data as u64) + .build() + .user_data(user_data as u64) + ) + } + + pub(crate) fn timeout_add( + &self, + user_data: usize, + timeout: Option, + ) -> std::io::Result<()> { + if let Some(duration) = timeout { + let timeout = Timespec::new() + .sec(duration.as_secs()) + .nsec(duration.subsec_nanos()); + support!( + self, + SUPPORT_TIMEOUT_ADD, + Timeout, + Timeout::new(&timeout).build().user_data(user_data as u64) + ) + } + Ok(()) + } + + pub(crate) fn timeout_update( + &self, + user_data: usize, + timeout: Option, + ) -> std::io::Result<()> { + if let Some(duration) = timeout { + let timeout = Timespec::new() + .sec(duration.as_secs()) + .nsec(duration.subsec_nanos()); + support!( + self, + SUPPORT_TIMEOUT_UPDATE, + TimeoutUpdate, + TimeoutUpdate::new(user_data as u64, &timeout) + .build() + .user_data(user_data as u64) + ) + } + self.timeout_remove(user_data) + } + + pub(crate) fn timeout_remove(&self, user_data: usize) -> std::io::Result<()> { + support!( + self, + SUPPORT_TIMEOUT_REMOVE, + TimeoutRemove, + TimeoutRemove::new(user_data as u64).build() + ) + } + + pub(crate) fn openat( + &self, + user_data: usize, + dir_fd: c_int, + pathname: *const c_char, + flags: c_int, + mode: mode_t, + ) -> std::io::Result<()> { + support!( + self, + SUPPORT_OPENAT, + OpenAt, + OpenAt::new(Fd(dir_fd), pathname) + .flags(flags) + .mode(mode) + .build() + .user_data(user_data as u64) + ) + } + + pub(crate) fn mkdirat( + &self, + user_data: usize, + dir_fd: c_int, + pathname: *const c_char, + mode: mode_t, + ) -> std::io::Result<()> { + support!( + self, + SUPPORT_MK_DIR_AT, + MkDirAt, + MkDirAt::new(Fd(dir_fd), pathname) + .mode(mode) + .build() + .user_data(user_data as u64) + ) + } + + pub(crate) fn renameat( + &self, + user_data: usize, + old_dir_fd: c_int, + old_path: *const c_char, + new_dir_fd: c_int, + new_path: *const c_char, + ) -> std::io::Result<()> { + support!( + self, + SUPPORT_RENAME_AT, + RenameAt, + RenameAt::new(Fd(old_dir_fd), old_path, Fd(new_dir_fd), new_path) + .build() + .user_data(user_data as u64) + ) + } + + pub(crate) fn renameat2( + &self, + user_data: usize, + old_dir_fd: c_int, + old_path: *const c_char, + new_dir_fd: c_int, + new_path: *const c_char, + flags: c_uint, + ) -> std::io::Result<()> { + support!( + self, + SUPPORT_RENAME_AT, + RenameAt, + RenameAt::new(Fd(old_dir_fd), old_path, Fd(new_dir_fd), new_path) + .flags(flags) + .build() + .user_data(user_data as u64) + ) + } + + pub(crate) fn fsync(&self, user_data: usize, fd: c_int) -> std::io::Result<()> { + support!( + self, + SUPPORT_FSYNC, + Fsync, + Fsync::new(Fd(fd)).build().user_data(user_data as u64) + ) + } + + pub(crate) fn socket( + &self, + user_data: usize, + domain: c_int, + ty: c_int, + protocol: c_int, + ) -> std::io::Result<()> { + support!( + self, + SUPPORT_SOCKET, + Socket, + Socket::new(domain, ty, protocol) + .build() + .user_data(user_data as u64) + ) + } + + pub(crate) fn accept( + &self, + user_data: usize, + fd: c_int, + address: *mut sockaddr, + address_len: *mut socklen_t, + ) -> std::io::Result<()> { + support!( + self, + SUPPORT_ACCEPT, + Accept, + Accept::new(Fd(fd), address, address_len) + .build() + .user_data(user_data as u64) + ) + } + + pub(crate) fn accept4( + &self, + user_data: usize, + fd: c_int, + addr: *mut sockaddr, + len: *mut socklen_t, + flg: c_int, + ) -> std::io::Result<()> { + support!( + self, + SUPPORT_ACCEPT, + Accept, + Accept::new(Fd(fd), addr, len) + .flags(flg) + .build() + .user_data(user_data as u64) + ) + } + + pub(crate) fn connect( + &self, + user_data: usize, + fd: c_int, + address: *const sockaddr, + len: socklen_t, + ) -> std::io::Result<()> { + support!( + self, + SUPPORT_CONNECT, + Connect, + Connect::new(Fd(fd), address, len) + .build() + .user_data(user_data as u64) + ) + } + + pub(crate) fn shutdown(&self, user_data: usize, fd: c_int, how: c_int) -> std::io::Result<()> { + support!( + self, + SUPPORT_SHUTDOWN, + Shutdown, + Shutdown::new(Fd(fd), how) + .build() + .user_data(user_data as u64) + ) + } + + pub(crate) fn close(&self, user_data: usize, fd: c_int) -> std::io::Result<()> { + support!( + self, + SUPPORT_CLOSE, + Close, + Close::new(Fd(fd)).build().user_data(user_data as u64) + ) + } + + pub(crate) fn recv( + &self, + user_data: usize, + fd: c_int, + buf: *mut c_void, + len: size_t, + flags: c_int, + ) -> std::io::Result<()> { + support!( + self, + SUPPORT_RECV, + Recv, + Recv::new(Fd(fd), buf.cast::(), len as u32) + .flags(flags) + .build() + .user_data(user_data as u64) + ) + } + + pub(crate) fn read( + &self, + user_data: usize, + fd: c_int, + buf: *mut c_void, + count: size_t, + ) -> std::io::Result<()> { + support!( + self, + SUPPORT_READ, + Read, + Read::new(Fd(fd), buf.cast::(), count as u32) + .build() + .user_data(user_data as u64) + ) + } + + pub(crate) fn pread( + &self, + user_data: usize, + fd: c_int, + buf: *mut c_void, + count: size_t, + offset: off_t, + ) -> std::io::Result<()> { + support!( + self, + SUPPORT_READ, + Read, + Read::new(Fd(fd), buf.cast::(), count as u32) + .offset(offset as u64) + .build() + .user_data(user_data as u64) + ) + } + + pub(crate) fn readv( + &self, + user_data: usize, + fd: c_int, + iov: *const iovec, + iovcnt: c_int, + ) -> std::io::Result<()> { + support!( + self, + SUPPORT_READV, + Readv, + Readv::new(Fd(fd), iov, iovcnt as u32) + .build() + .user_data(user_data as u64) + ) + } + + pub(crate) fn preadv( + &self, + user_data: usize, + fd: c_int, + iov: *const iovec, + iovcnt: c_int, + offset: off_t, + ) -> std::io::Result<()> { + support!( + self, + SUPPORT_READV, + Readv, + Readv::new(Fd(fd), iov, iovcnt as u32) + .offset(offset as u64) + .build() + .user_data(user_data as u64) + ) + } + + pub(crate) fn recvmsg( + &self, + user_data: usize, + fd: c_int, + msg: *mut msghdr, + flags: c_int, + ) -> std::io::Result<()> { + support!( + self, + SUPPORT_RECVMSG, + RecvMsg, + RecvMsg::new(Fd(fd), msg) + .flags(flags as u32) + .build() + .user_data(user_data as u64) + ) + } + + pub(crate) fn send( + &self, + user_data: usize, + fd: c_int, + buf: *const c_void, + len: size_t, + flags: c_int, + ) -> std::io::Result<()> { + support!( + self, + SUPPORT_SEND, + Send, + Send::new(Fd(fd), buf.cast::(), len as u32) + .flags(flags) + .build() + .user_data(user_data as u64) + ) + } + + #[allow(clippy::too_many_arguments)] + pub(crate) fn sendto( + &self, + user_data: usize, + fd: c_int, + buf: *const c_void, + len: size_t, + flags: c_int, + addr: *const sockaddr, + addrlen: socklen_t, + ) -> std::io::Result<()> { + support!( + self, + SUPPORT_SEND_ZC, + SendZc, + SendZc::new(Fd(fd), buf.cast::(), len as u32) + .flags(flags) + .dest_addr(addr) + .dest_addr_len(addrlen) + .build() + .user_data(user_data as u64) + ) + } + + pub(crate) fn write( + &self, + user_data: usize, + fd: c_int, + buf: *const c_void, + count: size_t, + ) -> std::io::Result<()> { + support!( + self, + SUPPORT_WRITE, + Write, + Write::new(Fd(fd), buf.cast::(), count as u32) + .build() + .user_data(user_data as u64) + ) + } + + pub(crate) fn pwrite( + &self, + user_data: usize, + fd: c_int, + buf: *const c_void, + count: size_t, + offset: off_t, + ) -> std::io::Result<()> { + support!( + self, + SUPPORT_WRITE, + Write, + Write::new(Fd(fd), buf.cast::(), count as u32) + .offset(offset as u64) + .build() + .user_data(user_data as u64) + ) + } + + pub(crate) fn writev( + &self, + user_data: usize, + fd: c_int, + iov: *const iovec, + iovcnt: c_int, + ) -> std::io::Result<()> { + support!( + self, + SUPPORT_WRITEV, + Writev, + Writev::new(Fd(fd), iov, iovcnt as u32) + .build() + .user_data(user_data as u64) + ) + } + + pub(crate) fn pwritev( + &self, + user_data: usize, + fd: c_int, + iov: *const iovec, + iovcnt: c_int, + offset: off_t, + ) -> std::io::Result<()> { + support!( + self, + SUPPORT_WRITEV, + Writev, + Writev::new(Fd(fd), iov, iovcnt as u32) + .offset(offset as u64) + .build() + .user_data(user_data as u64) + ) + } + + pub(crate) fn sendmsg( + &self, + user_data: usize, + fd: c_int, + msg: *const msghdr, + flags: c_int, + ) -> std::io::Result<()> { + support!( + self, + SUPPORT_SENDMSG, + SendMsg, + SendMsg::new(Fd(fd), msg) + .flags(flags as u32) + .build() + .user_data(user_data as u64) + ) + } +} diff --git a/core/src/net/operator/tests.rs b/core/src/net/operator/linux/tests.rs similarity index 100% rename from core/src/net/operator/tests.rs rename to core/src/net/operator/linux/tests.rs diff --git a/core/src/net/operator/mod.rs b/core/src/net/operator/mod.rs index b6dc89e4..6a821a4d 100644 --- a/core/src/net/operator/mod.rs +++ b/core/src/net/operator/mod.rs @@ -1,691 +1,4 @@ -use derivative::Derivative; -use io_uring::opcode::{ - Accept, AsyncCancel, Close, Connect, EpollCtl, Fsync, MkDirAt, OpenAt, PollAdd, PollRemove, - Read, Readv, Recv, RecvMsg, RenameAt, Send, SendMsg, SendZc, Shutdown, Socket, Timeout, - TimeoutRemove, TimeoutUpdate, Write, Writev, -}; -use io_uring::squeue::Entry; -use io_uring::types::{epoll_event, Fd, Timespec}; -use io_uring::{CompletionQueue, IoUring, Probe}; -use libc::{ - c_char, c_int, c_uint, c_void, iovec, mode_t, msghdr, off_t, size_t, sockaddr, socklen_t, EBUSY, -}; -use once_cell::sync::Lazy; -use std::collections::VecDeque; -use std::io::{Error, ErrorKind}; -use std::sync::atomic::{AtomicBool, Ordering}; -use std::sync::Mutex; -use std::time::{Duration, Instant}; - -#[cfg(test)] -mod tests; - -static SUPPORT: Lazy = - Lazy::new(|| crate::common::current_kernel_version() >= crate::common::kernel_version(5, 6, 0)); - -#[must_use] -pub(crate) fn support_io_uring() -> bool { - *SUPPORT -} - -static PROBE: Lazy = Lazy::new(|| { - let mut probe = Probe::new(); - if let Ok(io_uring) = IoUring::new(2) { - if let Ok(()) = io_uring.submitter().register_probe(&mut probe) { - return probe; - } - } - panic!("probe init failed !") -}); - -// check https://www.rustwiki.org.cn/en/reference/introduction.html for help information -macro_rules! support { - ( $self:ident, $struct_name:ident, $opcode:ident, $impls:expr ) => { - return { - static $struct_name: Lazy = once_cell::sync::Lazy::new(|| { - if $crate::net::operator::support_io_uring() { - return PROBE.is_supported($opcode::CODE); - } - false - }); - if *$struct_name { - return $self.push_sq($impls); - } - Err(Error::new(ErrorKind::Unsupported, "unsupported")) - } - }; -} - -#[repr(C)] -#[derive(Derivative)] -#[derivative(Debug)] -pub(crate) struct Operator<'o> { - #[derivative(Debug = "ignore")] - inner: IoUring, - entering: AtomicBool, - backlog: Mutex>, -} - -impl Operator<'_> { - pub(crate) fn new(cpu: usize) -> std::io::Result { - IoUring::builder() - .setup_sqpoll(1000) - .setup_sqpoll_cpu(u32::try_from(cpu).unwrap_or(u32::MAX)) - .build(1024) - .map(|inner| Operator { - inner, - entering: AtomicBool::new(false), - backlog: Mutex::new(VecDeque::new()), - }) - } - - fn push_sq(&self, entry: Entry) -> std::io::Result<()> { - let entry = Box::leak(Box::new(entry)); - if unsafe { self.inner.submission_shared().push(entry).is_err() } { - self.backlog - .lock() - .expect("backlog lock failed") - .push_back(entry); - } - self.inner.submit().map(|_| ()) - } - - pub(crate) fn select( - &self, - timeout: Option, - want: usize, - ) -> std::io::Result<(usize, CompletionQueue, Option)> { - if support_io_uring() { - if self - .entering - .compare_exchange(false, true, Ordering::Acquire, Ordering::Relaxed) - .is_err() - { - return Ok((0, unsafe { self.inner.completion_shared() }, timeout)); - } - let result = self.do_select(timeout, want); - self.entering.store(false, Ordering::Release); - return result; - } - Err(Error::new(ErrorKind::Unsupported, "unsupported")) - } - - fn do_select( - &self, - timeout: Option, - want: usize, - ) -> std::io::Result<(usize, CompletionQueue, Option)> { - let start_time = Instant::now(); - self.timeout_add(crate::common::constants::IO_URING_TIMEOUT_USERDATA, timeout)?; - let mut cq = unsafe { self.inner.completion_shared() }; - // when submit queue is empty, submit_and_wait will block - let count = match self.inner.submit_and_wait(want) { - Ok(count) => count, - Err(err) => { - if err.raw_os_error() == Some(EBUSY) { - 0 - } else { - return Err(err); - } - } - }; - cq.sync(); - - // clean backlog - let mut sq = unsafe { self.inner.submission_shared() }; - loop { - if sq.is_full() { - match self.inner.submit() { - Ok(_) => (), - Err(err) => { - if err.raw_os_error() == Some(EBUSY) { - break; - } - return Err(err); - } - } - } - sq.sync(); - - let mut backlog = self.backlog.lock().expect("backlog lock failed"); - match backlog.pop_front() { - Some(sqe) => { - if unsafe { sq.push(sqe).is_err() } { - backlog.push_front(sqe); - break; - } - } - None => break, - } - } - let cost = Instant::now().saturating_duration_since(start_time); - Ok((count, cq, timeout.map(|t| t.saturating_sub(cost)))) - } - - pub(crate) fn async_cancel(&self, user_data: usize) -> std::io::Result<()> { - support!( - self, - SUPPORT_ASYNC_CANCEL, - AsyncCancel, - AsyncCancel::new(user_data as u64) - .build() - .user_data(user_data as u64) - ) - } - - pub(crate) fn epoll_ctl( - &self, - user_data: usize, - epfd: c_int, - op: c_int, - fd: c_int, - event: *mut libc::epoll_event, - ) -> std::io::Result<()> { - support!( - self, - SUPPORT_EPOLL_CTL, - EpollCtl, - EpollCtl::new( - Fd(epfd), - Fd(fd), - op, - event.cast_const().cast::(), - ) - .build() - .user_data(user_data as u64) - ) - } - - pub(crate) fn poll_add( - &self, - user_data: usize, - fd: c_int, - flags: c_int, - ) -> std::io::Result<()> { - support!( - self, - SUPPORT_POLL_ADD, - PollAdd, - PollAdd::new(Fd(fd), flags as u32) - .build() - .user_data(user_data as u64) - ) - } - - pub(crate) fn poll_remove(&self, user_data: usize) -> std::io::Result<()> { - support!( - self, - SUPPORT_POLL_REMOVE, - PollRemove, - PollRemove::new(user_data as u64) - .build() - .user_data(user_data as u64) - ) - } - - pub(crate) fn timeout_add( - &self, - user_data: usize, - timeout: Option, - ) -> std::io::Result<()> { - if let Some(duration) = timeout { - let timeout = Timespec::new() - .sec(duration.as_secs()) - .nsec(duration.subsec_nanos()); - support!( - self, - SUPPORT_TIMEOUT_ADD, - Timeout, - Timeout::new(&timeout).build().user_data(user_data as u64) - ) - } - Ok(()) - } - - pub(crate) fn timeout_update( - &self, - user_data: usize, - timeout: Option, - ) -> std::io::Result<()> { - if let Some(duration) = timeout { - let timeout = Timespec::new() - .sec(duration.as_secs()) - .nsec(duration.subsec_nanos()); - support!( - self, - SUPPORT_TIMEOUT_UPDATE, - TimeoutUpdate, - TimeoutUpdate::new(user_data as u64, &timeout) - .build() - .user_data(user_data as u64) - ) - } - self.timeout_remove(user_data) - } - - pub(crate) fn timeout_remove(&self, user_data: usize) -> std::io::Result<()> { - support!( - self, - SUPPORT_TIMEOUT_REMOVE, - TimeoutRemove, - TimeoutRemove::new(user_data as u64).build() - ) - } - - pub(crate) fn openat( - &self, - user_data: usize, - dir_fd: c_int, - pathname: *const c_char, - flags: c_int, - mode: mode_t, - ) -> std::io::Result<()> { - support!( - self, - SUPPORT_OPENAT, - OpenAt, - OpenAt::new(Fd(dir_fd), pathname) - .flags(flags) - .mode(mode) - .build() - .user_data(user_data as u64) - ) - } - - pub(crate) fn mkdirat( - &self, - user_data: usize, - dir_fd: c_int, - pathname: *const c_char, - mode: mode_t, - ) -> std::io::Result<()> { - support!( - self, - SUPPORT_MK_DIR_AT, - MkDirAt, - MkDirAt::new(Fd(dir_fd), pathname) - .mode(mode) - .build() - .user_data(user_data as u64) - ) - } - - pub(crate) fn renameat( - &self, - user_data: usize, - old_dir_fd: c_int, - old_path: *const c_char, - new_dir_fd: c_int, - new_path: *const c_char, - ) -> std::io::Result<()> { - support!( - self, - SUPPORT_RENAME_AT, - RenameAt, - RenameAt::new(Fd(old_dir_fd), old_path, Fd(new_dir_fd), new_path) - .build() - .user_data(user_data as u64) - ) - } - - pub(crate) fn renameat2( - &self, - user_data: usize, - old_dir_fd: c_int, - old_path: *const c_char, - new_dir_fd: c_int, - new_path: *const c_char, - flags: c_uint, - ) -> std::io::Result<()> { - support!( - self, - SUPPORT_RENAME_AT, - RenameAt, - RenameAt::new(Fd(old_dir_fd), old_path, Fd(new_dir_fd), new_path) - .flags(flags) - .build() - .user_data(user_data as u64) - ) - } - - pub(crate) fn fsync(&self, user_data: usize, fd: c_int) -> std::io::Result<()> { - support!( - self, - SUPPORT_FSYNC, - Fsync, - Fsync::new(Fd(fd)).build().user_data(user_data as u64) - ) - } - - pub(crate) fn socket( - &self, - user_data: usize, - domain: c_int, - ty: c_int, - protocol: c_int, - ) -> std::io::Result<()> { - support!( - self, - SUPPORT_SOCKET, - Socket, - Socket::new(domain, ty, protocol) - .build() - .user_data(user_data as u64) - ) - } - - pub(crate) fn accept( - &self, - user_data: usize, - fd: c_int, - address: *mut sockaddr, - address_len: *mut socklen_t, - ) -> std::io::Result<()> { - support!( - self, - SUPPORT_ACCEPT, - Accept, - Accept::new(Fd(fd), address, address_len) - .build() - .user_data(user_data as u64) - ) - } - - pub(crate) fn accept4( - &self, - user_data: usize, - fd: c_int, - addr: *mut sockaddr, - len: *mut socklen_t, - flg: c_int, - ) -> std::io::Result<()> { - support!( - self, - SUPPORT_ACCEPT, - Accept, - Accept::new(Fd(fd), addr, len) - .flags(flg) - .build() - .user_data(user_data as u64) - ) - } - - pub(crate) fn connect( - &self, - user_data: usize, - fd: c_int, - address: *const sockaddr, - len: socklen_t, - ) -> std::io::Result<()> { - support!( - self, - SUPPORT_CONNECT, - Connect, - Connect::new(Fd(fd), address, len) - .build() - .user_data(user_data as u64) - ) - } - - pub(crate) fn shutdown(&self, user_data: usize, fd: c_int, how: c_int) -> std::io::Result<()> { - support!( - self, - SUPPORT_SHUTDOWN, - Shutdown, - Shutdown::new(Fd(fd), how) - .build() - .user_data(user_data as u64) - ) - } - - pub(crate) fn close(&self, user_data: usize, fd: c_int) -> std::io::Result<()> { - support!( - self, - SUPPORT_CLOSE, - Close, - Close::new(Fd(fd)).build().user_data(user_data as u64) - ) - } - - pub(crate) fn recv( - &self, - user_data: usize, - fd: c_int, - buf: *mut c_void, - len: size_t, - flags: c_int, - ) -> std::io::Result<()> { - support!( - self, - SUPPORT_RECV, - Recv, - Recv::new(Fd(fd), buf.cast::(), len as u32) - .flags(flags) - .build() - .user_data(user_data as u64) - ) - } - - pub(crate) fn read( - &self, - user_data: usize, - fd: c_int, - buf: *mut c_void, - count: size_t, - ) -> std::io::Result<()> { - support!( - self, - SUPPORT_READ, - Read, - Read::new(Fd(fd), buf.cast::(), count as u32) - .build() - .user_data(user_data as u64) - ) - } - - pub(crate) fn pread( - &self, - user_data: usize, - fd: c_int, - buf: *mut c_void, - count: size_t, - offset: off_t, - ) -> std::io::Result<()> { - support!( - self, - SUPPORT_READ, - Read, - Read::new(Fd(fd), buf.cast::(), count as u32) - .offset(offset as u64) - .build() - .user_data(user_data as u64) - ) - } - - pub(crate) fn readv( - &self, - user_data: usize, - fd: c_int, - iov: *const iovec, - iovcnt: c_int, - ) -> std::io::Result<()> { - support!( - self, - SUPPORT_READV, - Readv, - Readv::new(Fd(fd), iov, iovcnt as u32) - .build() - .user_data(user_data as u64) - ) - } - - pub(crate) fn preadv( - &self, - user_data: usize, - fd: c_int, - iov: *const iovec, - iovcnt: c_int, - offset: off_t, - ) -> std::io::Result<()> { - support!( - self, - SUPPORT_READV, - Readv, - Readv::new(Fd(fd), iov, iovcnt as u32) - .offset(offset as u64) - .build() - .user_data(user_data as u64) - ) - } - - pub(crate) fn recvmsg( - &self, - user_data: usize, - fd: c_int, - msg: *mut msghdr, - flags: c_int, - ) -> std::io::Result<()> { - support!( - self, - SUPPORT_RECVMSG, - RecvMsg, - RecvMsg::new(Fd(fd), msg) - .flags(flags as u32) - .build() - .user_data(user_data as u64) - ) - } - - pub(crate) fn send( - &self, - user_data: usize, - fd: c_int, - buf: *const c_void, - len: size_t, - flags: c_int, - ) -> std::io::Result<()> { - support!( - self, - SUPPORT_SEND, - Send, - Send::new(Fd(fd), buf.cast::(), len as u32) - .flags(flags) - .build() - .user_data(user_data as u64) - ) - } - - #[allow(clippy::too_many_arguments)] - pub(crate) fn sendto( - &self, - user_data: usize, - fd: c_int, - buf: *const c_void, - len: size_t, - flags: c_int, - addr: *const sockaddr, - addrlen: socklen_t, - ) -> std::io::Result<()> { - support!( - self, - SUPPORT_SEND_ZC, - SendZc, - SendZc::new(Fd(fd), buf.cast::(), len as u32) - .flags(flags) - .dest_addr(addr) - .dest_addr_len(addrlen) - .build() - .user_data(user_data as u64) - ) - } - - pub(crate) fn write( - &self, - user_data: usize, - fd: c_int, - buf: *const c_void, - count: size_t, - ) -> std::io::Result<()> { - support!( - self, - SUPPORT_WRITE, - Write, - Write::new(Fd(fd), buf.cast::(), count as u32) - .build() - .user_data(user_data as u64) - ) - } - - pub(crate) fn pwrite( - &self, - user_data: usize, - fd: c_int, - buf: *const c_void, - count: size_t, - offset: off_t, - ) -> std::io::Result<()> { - support!( - self, - SUPPORT_WRITE, - Write, - Write::new(Fd(fd), buf.cast::(), count as u32) - .offset(offset as u64) - .build() - .user_data(user_data as u64) - ) - } - - pub(crate) fn writev( - &self, - user_data: usize, - fd: c_int, - iov: *const iovec, - iovcnt: c_int, - ) -> std::io::Result<()> { - support!( - self, - SUPPORT_WRITEV, - Writev, - Writev::new(Fd(fd), iov, iovcnt as u32) - .build() - .user_data(user_data as u64) - ) - } - - pub(crate) fn pwritev( - &self, - user_data: usize, - fd: c_int, - iov: *const iovec, - iovcnt: c_int, - offset: off_t, - ) -> std::io::Result<()> { - support!( - self, - SUPPORT_WRITEV, - Writev, - Writev::new(Fd(fd), iov, iovcnt as u32) - .offset(offset as u64) - .build() - .user_data(user_data as u64) - ) - } - - pub(crate) fn sendmsg( - &self, - user_data: usize, - fd: c_int, - msg: *const msghdr, - flags: c_int, - ) -> std::io::Result<()> { - support!( - self, - SUPPORT_SENDMSG, - SendMsg, - SendMsg::new(Fd(fd), msg) - .flags(flags as u32) - .build() - .user_data(user_data as u64) - ) - } -} +#[cfg(all(target_os = "linux", feature = "io_uring"))] +mod linux; +#[cfg(all(target_os = "linux", feature = "io_uring"))] +pub(crate) use linux::*; diff --git a/core/src/net/selector/mio_adapter.rs b/core/src/net/selector/mio_adapter.rs index f8126cac..57a06525 100644 --- a/core/src/net/selector/mio_adapter.rs +++ b/core/src/net/selector/mio_adapter.rs @@ -1,6 +1,5 @@ use crate::common::CondvarBlocker; use crossbeam_utils::atomic::AtomicCell; -use derivative::Derivative; use mio::event::Event; use mio::unix::SourceFd; use mio::{Events, Interest, Poll, Token}; @@ -47,12 +46,12 @@ impl super::EventIterator for Events { } #[repr(C)] -#[derive(Derivative)] -#[derivative(Debug)] +#[derive(educe::Educe)] +#[educe(Debug)] pub(crate) struct Poller { waiting: AtomicBool, blocker: CondvarBlocker, - #[derivative(Debug = "ignore")] + #[educe(Debug(ignore))] inner: AtomicCell, } diff --git a/core/src/syscall/windows/mod.rs b/core/src/syscall/windows/mod.rs index 5e66ffa4..0738f39c 100644 --- a/core/src/syscall/windows/mod.rs +++ b/core/src/syscall/windows/mod.rs @@ -492,23 +492,23 @@ static SEND_TIME_LIMIT: Lazy> = Lazy::new(Default::default) static RECV_TIME_LIMIT: Lazy> = Lazy::new(Default::default); -pub extern "C" fn set_errno(errno: windows_sys::Win32::Foundation::WIN32_ERROR) { +pub extern "system" fn set_errno(errno: windows_sys::Win32::Foundation::WIN32_ERROR) { unsafe { windows_sys::Win32::Foundation::SetLastError(errno) } } /// # Panics /// if set fails. -pub extern "C" fn set_non_blocking(fd: SOCKET) { +pub extern "system" fn set_non_blocking(fd: SOCKET) { assert!(set_non_blocking_flag(fd, true), "set_non_blocking failed !"); } /// # Panics /// if set fails. -pub extern "C" fn set_blocking(fd: SOCKET) { +pub extern "system" fn set_blocking(fd: SOCKET) { assert!(set_non_blocking_flag(fd, false), "set_blocking failed !"); } -extern "C" fn set_non_blocking_flag(fd: SOCKET, on: bool) -> bool { +extern "system" fn set_non_blocking_flag(fd: SOCKET, on: bool) -> bool { let non_blocking = is_non_blocking(fd); if non_blocking == on { return true; @@ -524,17 +524,17 @@ extern "C" fn set_non_blocking_flag(fd: SOCKET, on: bool) -> bool { } #[must_use] -pub extern "C" fn is_blocking(fd: SOCKET) -> bool { +pub extern "system" fn is_blocking(fd: SOCKET) -> bool { !is_non_blocking(fd) } #[must_use] -pub extern "C" fn is_non_blocking(fd: SOCKET) -> bool { +pub extern "system" fn is_non_blocking(fd: SOCKET) -> bool { NON_BLOCKING.contains(&fd) } #[must_use] -pub extern "C" fn send_time_limit(fd: SOCKET) -> u64 { +pub extern "system" fn send_time_limit(fd: SOCKET) -> u64 { SEND_TIME_LIMIT.get(&fd).map_or_else( || { let mut ms = 0; @@ -561,7 +561,7 @@ pub extern "C" fn send_time_limit(fd: SOCKET) -> u64 { } #[must_use] -pub extern "C" fn recv_time_limit(fd: SOCKET) -> u64 { +pub extern "system" fn recv_time_limit(fd: SOCKET) -> u64 { RECV_TIME_LIMIT.get(&fd).map_or_else( || { let mut ms = 0; diff --git a/hook/src/lib.rs b/hook/src/lib.rs index 51b153de..7c1c7d4d 100644 --- a/hook/src/lib.rs +++ b/hook/src/lib.rs @@ -68,11 +68,7 @@ pub(crate) fn hook() -> bool { clippy::similar_names, clippy::not_unsafe_ptr_arg_deref, clippy::many_single_char_names, - clippy::cast_sign_loss, - clippy::cast_possible_truncation, - clippy::cast_possible_wrap, - clippy::unnecessary_cast, - trivial_numeric_casts + clippy::unnecessary_cast )] pub mod syscall; diff --git a/open-coroutine/build.rs b/open-coroutine/build.rs index 798d1a41..38c3cfc6 100644 --- a/open-coroutine/build.rs +++ b/open-coroutine/build.rs @@ -43,6 +43,28 @@ fn main() { .expect("parent not found") .join("hook") .join("Cargo.toml"); + let metadata = MetadataCommand::default() + .no_deps() + .exec() + .expect("read cargo metadata failed"); + let package = if hook_toml.exists() { + metadata + .packages + .iter() + .find(|pkg| pkg.name.eq("open-coroutine")) + .expect("read current package failed") + } else { + metadata + .packages + .first() + .expect("read current package failed") + }; + info!("read package:{:#?}", package); + let dependency = package + .dependencies + .iter() + .find(|dep| dep.name.eq("open-coroutine-hook")) + .expect("open-coroutine-hook not found"); if !hook_toml.exists() { info!( "{:?} not exists, find open-coroutine-hook's Cargo.toml in $CARGO_HOME", @@ -92,20 +114,6 @@ fn main() { .to_string_lossy() .to_string(); info!("crates parent dirs:{:?}", crates_parent_dirs); - let metadata = MetadataCommand::default() - .no_deps() - .exec() - .expect("read cargo metadata failed"); - let package = metadata - .packages - .first() - .expect("read current package failed"); - info!("read package:{:#?}", package); - let dependency = package - .dependencies - .iter() - .find(|dep| dep.name.eq("open-coroutine-hook")) - .expect("open-coroutine-hook not found"); let version = &dependency .req .comparators @@ -122,7 +130,32 @@ fn main() { .join("Cargo.toml"); } info!("open-coroutine-hook's Cargo.toml is here:{:?}", hook_toml); + if !dependency.uses_default_features { + cmd = cmd.arg("--no-default-features"); + } + let mut features = Vec::new(); + if cfg!(feature = "log") { + features.push("log"); + } + if cfg!(feature = "preemptive") { + features.push("preemptive"); + } + if cfg!(feature = "net") { + features.push("net"); + } + if cfg!(feature = "io_uring") { + features.push("io_uring"); + } + if cfg!(feature = "syscall") { + features.push("syscall"); + } + info!( + "use open-coroutine-hook's default-features:{} and features:{:?}", + dependency.uses_default_features, features + ); if let Err(e) = cmd + .arg("--features") + .arg(features.join(",")) .arg("--manifest-path") .arg(hook_toml) .arg("--target-dir")