From 688f02594c61da32e566eba25e40c5ae1bc1ac38 Mon Sep 17 00:00:00 2001 From: kouhe <25522053+kouhe3@users.noreply.github.com> Date: Thu, 6 Nov 2025 23:34:44 +0800 Subject: [PATCH 01/21] not finish yet --- Cargo.toml | 2 +- src/net/mod.rs | 6 +- src/net/uds/datagram.rs | 16 +-- src/net/uds/listener.rs | 19 +-- src/net/uds/mod.rs | 1 + src/net/uds/stream.rs | 14 ++- src/sys/windows/mod.rs | 3 +- src/sys/windows/net.rs | 5 + src/sys/windows/uds/listener.rs | 76 ++++++++++++ src/sys/windows/uds/mod.rs | 98 +++++++++++++++ src/sys/windows/uds/socket.rs | 203 ++++++++++++++++++++++++++++++++ src/sys/windows/uds/stream.rs | 82 +++++++++++++ 12 files changed, 502 insertions(+), 23 deletions(-) create mode 100644 src/sys/windows/uds/listener.rs create mode 100644 src/sys/windows/uds/mod.rs create mode 100644 src/sys/windows/uds/socket.rs create mode 100644 src/sys/windows/uds/stream.rs diff --git a/Cargo.toml b/Cargo.toml index ec581bf1c..5f0eb06bd 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -30,7 +30,7 @@ include = [ # For documentation of features see the `mio::features` module. [features] # By default Mio only provides a shell implementation. -default = ["log"] +default = ["log","os-ext","net"] # Enables the `Poll` and `Registry` types. os-poll = [] diff --git a/src/net/mod.rs b/src/net/mod.rs index 15c405cf9..be0439dc0 100644 --- a/src/net/mod.rs +++ b/src/net/mod.rs @@ -33,7 +33,9 @@ mod udp; #[cfg(not(target_os = "wasi"))] pub use self::udp::UdpSocket; -#[cfg(unix)] +#[cfg(any(unix,windows))] mod uds; +#[cfg(any(unix,windows))] +pub use self::uds::{UnixListener, UnixStream}; #[cfg(unix)] -pub use self::uds::{UnixDatagram, UnixListener, UnixStream}; +pub use self::uds::UnixDatagram; diff --git a/src/net/uds/datagram.rs b/src/net/uds/datagram.rs index 73fea0731..d3139db70 100644 --- a/src/net/uds/datagram.rs +++ b/src/net/uds/datagram.rs @@ -1,6 +1,6 @@ +#![cfg(unix)] use std::net::Shutdown; -use std::os::fd::{AsFd, AsRawFd, BorrowedFd, FromRawFd, IntoRawFd, OwnedFd, RawFd}; -use std::os::unix::net::{self, SocketAddr}; +use std::os::{fd::{AsFd, AsRawFd, BorrowedFd, FromRawFd, IntoRawFd, OwnedFd, RawFd},unix::net::{self, SocketAddr}}; use std::path::Path; use std::{fmt, io}; @@ -215,19 +215,19 @@ impl fmt::Debug for UnixDatagram { self.inner.fmt(f) } } - +#[cfg(unix)] impl IntoRawFd for UnixDatagram { fn into_raw_fd(self) -> RawFd { self.inner.into_inner().into_raw_fd() } } - +#[cfg(unix)] impl AsRawFd for UnixDatagram { fn as_raw_fd(&self) -> RawFd { self.inner.as_raw_fd() } } - +#[cfg(unix)] impl FromRawFd for UnixDatagram { /// Converts a `RawFd` to a `UnixDatagram`. /// @@ -248,19 +248,19 @@ impl From for net::UnixDatagram { unsafe { net::UnixDatagram::from_raw_fd(datagram.into_raw_fd()) } } } - +#[cfg(unix)] impl From for OwnedFd { fn from(unix_datagram: UnixDatagram) -> Self { unix_datagram.inner.into_inner().into() } } - +#[cfg(unix)] impl AsFd for UnixDatagram { fn as_fd(&self) -> BorrowedFd<'_> { self.inner.as_fd() } } - +#[cfg(unix)] impl From for UnixDatagram { fn from(fd: OwnedFd) -> Self { UnixDatagram::from_std(From::from(fd)) diff --git a/src/net/uds/listener.rs b/src/net/uds/listener.rs index a255972a5..e48a0fdf5 100644 --- a/src/net/uds/listener.rs +++ b/src/net/uds/listener.rs @@ -1,5 +1,7 @@ -use std::os::fd::{AsFd, AsRawFd, BorrowedFd, FromRawFd, IntoRawFd, OwnedFd, RawFd}; -use std::os::unix::net::{self, SocketAddr}; +#[cfg(unix)] +use std::os::{fd::{AsFd, AsRawFd, BorrowedFd, FromRawFd, IntoRawFd, OwnedFd, RawFd},unix::net::{self, SocketAddr}}; +#[cfg(windows)] +use crate::sys::{net,uds::{SocketAddr}}; use std::path::Path; use std::{fmt, io}; @@ -84,19 +86,19 @@ impl fmt::Debug for UnixListener { self.inner.fmt(f) } } - +#[cfg(unix)] impl IntoRawFd for UnixListener { fn into_raw_fd(self) -> RawFd { self.inner.into_inner().into_raw_fd() } } - +#[cfg(unix)] impl AsRawFd for UnixListener { fn as_raw_fd(&self) -> RawFd { self.inner.as_raw_fd() } } - +#[cfg(unix)] impl FromRawFd for UnixListener { /// Converts a `RawFd` to a `UnixListener`. /// @@ -109,6 +111,7 @@ impl FromRawFd for UnixListener { } } +#[cfg(unix)] impl From for net::UnixListener { fn from(listener: UnixListener) -> Self { // Safety: This is safe since we are extracting the raw fd from a well-constructed @@ -117,19 +120,19 @@ impl From for net::UnixListener { unsafe { net::UnixListener::from_raw_fd(listener.into_raw_fd()) } } } - +#[cfg(unix)] impl From for OwnedFd { fn from(unix_listener: UnixListener) -> Self { unix_listener.inner.into_inner().into() } } - +#[cfg(unix)] impl AsFd for UnixListener { fn as_fd(&self) -> BorrowedFd<'_> { self.inner.as_fd() } } - +#[cfg(unix)] impl From for UnixListener { fn from(fd: OwnedFd) -> Self { UnixListener::from_std(From::from(fd)) diff --git a/src/net/uds/mod.rs b/src/net/uds/mod.rs index e02fd80dc..322d9226d 100644 --- a/src/net/uds/mod.rs +++ b/src/net/uds/mod.rs @@ -1,4 +1,5 @@ mod datagram; +#[cfg(unix)] pub use self::datagram::UnixDatagram; mod listener; diff --git a/src/net/uds/stream.rs b/src/net/uds/stream.rs index 244f40455..fd6c4ae75 100644 --- a/src/net/uds/stream.rs +++ b/src/net/uds/stream.rs @@ -1,8 +1,10 @@ use std::fmt; use std::io::{self, IoSlice, IoSliceMut, Read, Write}; use std::net::Shutdown; -use std::os::fd::{AsFd, AsRawFd, BorrowedFd, FromRawFd, IntoRawFd, OwnedFd, RawFd}; -use std::os::unix::net::{self, SocketAddr}; +#[cfg(unix)] +use std::os::{fd::{AsFd, AsRawFd, BorrowedFd, FromRawFd, IntoRawFd, OwnedFd, RawFd},unix::net::{self, SocketAddr}}; +#[cfg(windows)] +use crate::sys::{net,uds::socket::SocketAddr}; use std::path::Path; use crate::io_source::IoSource; @@ -228,18 +230,20 @@ impl fmt::Debug for UnixStream { self.inner.fmt(f) } } - +#[cfg(unix)] impl IntoRawFd for UnixStream { fn into_raw_fd(self) -> RawFd { self.inner.into_inner().into_raw_fd() } } +#[cfg(unix)] impl AsRawFd for UnixStream { fn as_raw_fd(&self) -> RawFd { self.inner.as_raw_fd() } } +#[cfg(unix)] impl FromRawFd for UnixStream { /// Converts a `RawFd` to a `UnixStream`. @@ -252,6 +256,7 @@ impl FromRawFd for UnixStream { UnixStream::from_std(FromRawFd::from_raw_fd(fd)) } } +#[cfg(unix)] impl From for net::UnixStream { fn from(stream: UnixStream) -> Self { @@ -261,18 +266,21 @@ impl From for net::UnixStream { unsafe { net::UnixStream::from_raw_fd(stream.into_raw_fd()) } } } +#[cfg(unix)] impl From for OwnedFd { fn from(unix_stream: UnixStream) -> Self { unix_stream.inner.into_inner().into() } } +#[cfg(unix)] impl AsFd for UnixStream { fn as_fd(&self) -> BorrowedFd<'_> { self.inner.as_fd() } } +#[cfg(unix)] impl From for UnixStream { fn from(fd: OwnedFd) -> Self { diff --git a/src/sys/windows/mod.rs b/src/sys/windows/mod.rs index 89d74b1a2..6f67643fc 100644 --- a/src/sys/windows/mod.rs +++ b/src/sys/windows/mod.rs @@ -31,7 +31,7 @@ cfg_net! { }}; } - mod net; + pub mod net; pub(crate) mod tcp; pub(crate) mod udp; @@ -41,6 +41,7 @@ cfg_net! { cfg_os_ext! { pub(crate) mod named_pipe; + pub(crate) mod uds; } mod waker; diff --git a/src/sys/windows/net.rs b/src/sys/windows/net.rs index 5cc235335..3f0d0ac99 100644 --- a/src/sys/windows/net.rs +++ b/src/sys/windows/net.rs @@ -3,6 +3,7 @@ use std::mem; use std::net::SocketAddr; use std::sync::Once; +use windows_sys::Win32::Networking::WinSock::SOCKADDR_UN; use windows_sys::Win32::Networking::WinSock::{ closesocket, ioctlsocket, socket, AF_INET, AF_INET6, FIONBIO, IN6_ADDR, IN6_ADDR_0, INVALID_SOCKET, IN_ADDR, IN_ADDR_0, SOCKADDR, SOCKADDR_IN, SOCKADDR_IN6, SOCKADDR_IN6_0, @@ -55,6 +56,7 @@ pub(crate) fn new_socket(domain: u32, socket_type: i32) -> io::Result { pub(crate) union SocketAddrCRepr { v4: SOCKADDR_IN, v6: SOCKADDR_IN6, + unix: SOCKADDR_UN } impl SocketAddrCRepr { @@ -109,3 +111,6 @@ pub(crate) fn socket_addr(addr: &SocketAddr) -> (SocketAddrCRepr, i32) { } } } + +pub use super::uds::UnixStream; +pub use super::uds::UnixListener; diff --git a/src/sys/windows/uds/listener.rs b/src/sys/windows/uds/listener.rs new file mode 100644 index 000000000..5420d57f8 --- /dev/null +++ b/src/sys/windows/uds/listener.rs @@ -0,0 +1,76 @@ +use super::{socketaddr_un, startup, wsa_error, Socket, SocketAddr, UnixStream}; +use std::{ + io, + ops::{Deref, DerefMut}, + path::Path, +}; +use windows_sys::Win32::Networking::WinSock::{self, SOCKADDR_UN, SOCKET_ERROR}; +pub struct UnixListener(Socket); + +impl UnixListener { + pub fn bind>(path: P) -> io::Result { + unsafe { + startup()?; + let s = Socket::new()?; + let (addr, len) = socketaddr_un(path.as_ref())?; + if WinSock::bind(s.0, &addr as *const _ as *const _, len) == SOCKET_ERROR { + Err(wsa_error()) + } else { + match WinSock::listen(s.0, 5) { + SOCKET_ERROR => Err(wsa_error()), + _ => Ok(Self(s)), + } + } + } + } + pub fn bind_addr(socket_addr: &SocketAddr) -> io::Result { + unsafe { + let s = Socket::new()?; + if WinSock::bind( + s.0, + &socket_addr.addr as *const _ as *const _, + socket_addr.addrlen, + ) == SOCKET_ERROR + { + Err(wsa_error()) + } else { + match WinSock::listen(s.0, 5) { + SOCKET_ERROR => Err(wsa_error()), + _ => Ok(Self(s)), + } + } + } + } + pub fn accept(&self) -> io::Result<(UnixStream, SocketAddr)> { + let mut addr = SOCKADDR_UN::default(); + let mut addrlen = size_of::() as _; + let s = self + .0 + .accept(&mut addr as *mut _ as *mut _, &mut addrlen as *mut _)?; + Ok((UnixStream(s), SocketAddr { addr, addrlen })) + } + pub fn local_addr(&self) -> io::Result { + self.0.local_addr() + } + pub fn take_error(&self) -> io::Result> { + self.0.take_error() + } +} +impl Deref for UnixListener { + type Target = Socket; + fn deref(&self) -> &Self::Target { + &self.0 + } +} +impl DerefMut for UnixListener { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.0 + } +} +pub fn bind_addr(socket_addr: &SocketAddr) -> io::Result { + UnixListener::bind_addr(socket_addr) +} +pub fn accept(s: &UnixListener) -> io::Result<(crate::net::UnixStream, SocketAddr)> { + let (inner, addr) = s.accept()?; + Ok((crate::net::UnixStream::from_std(inner), addr)) +} diff --git a/src/sys/windows/uds/mod.rs b/src/sys/windows/uds/mod.rs new file mode 100644 index 000000000..77dae1286 --- /dev/null +++ b/src/sys/windows/uds/mod.rs @@ -0,0 +1,98 @@ +use std::io; + +pub mod stream; +pub use stream::*; +pub mod socket; +pub use socket::*; +pub mod listener; +pub use listener::*; +pub fn startup() -> io::Result<()> { + use windows_sys::Win32::Networking::WinSock::{self, WSADATA}; + use WinSock::{WSAEFAULT, WSAEINPROGRESS, WSAEPROCLIM, WSASYSNOTREADY, WSAVERNOTSUPPORTED}; + let mut wsa_data = WSADATA::default(); + match unsafe { WinSock::WSAStartup(0x202, &mut wsa_data) } { + 0 => Ok(()), + WSASYSNOTREADY => Err(io::Error::other("Network subsystem not ready")), + WSAVERNOTSUPPORTED => Err(io::Error::new( + io::ErrorKind::Unsupported, + "Winsock version not supported", + )), + WSAEINPROGRESS => Err(io::Error::new( + io::ErrorKind::WouldBlock, + "Blocking operation in progress", + )), + WSAEPROCLIM => Err(io::Error::other("Too many tasks")), + WSAEFAULT => Err(io::Error::new( + io::ErrorKind::InvalidInput, + "Invalid parameter", + )), + _ => Err(io::Error::other("Unknown WSAStartup error")), + } +} +pub fn wsa_error() -> io::Error { + use windows_sys::Win32::Networking::WinSock::{ + WSAGetLastError, WSAEACCES, WSAEADDRINUSE, WSAEADDRNOTAVAIL, WSAEAFNOSUPPORT, + WSAECONNABORTED, WSAECONNREFUSED, WSAECONNRESET, WSAEHOSTUNREACH, WSAEINPROGRESS, + WSAEINVAL, WSAEINVALIDPROCTABLE, WSAEINVALIDPROVIDER, WSAEISCONN, WSAEMFILE, WSAEMSGSIZE, + WSAENETDOWN, WSAENETUNREACH, WSAENOBUFS, WSAENOTCONN, WSAEPROTONOSUPPORT, WSAEPROTOTYPE, + WSAEPROVIDERFAILEDINIT, WSAESHUTDOWN, WSAESOCKTNOSUPPORT, WSAETIMEDOUT, WSANOTINITIALISED, + }; + let err = unsafe { WSAGetLastError() }; + let kind = match err { + WSANOTINITIALISED => io::ErrorKind::NotConnected, + WSAENETDOWN => io::ErrorKind::ConnectionReset, + WSAEAFNOSUPPORT => io::ErrorKind::Unsupported, + WSAEINPROGRESS => io::ErrorKind::WouldBlock, + WSAEMFILE => io::ErrorKind::ResourceBusy, + WSAEINVAL => io::ErrorKind::InvalidInput, + WSAEINVALIDPROVIDER | WSAEINVALIDPROCTABLE | WSAEPROVIDERFAILEDINIT => { + io::ErrorKind::InvalidData + } + WSAENOBUFS => io::ErrorKind::OutOfMemory, + WSAEPROTONOSUPPORT | WSAEPROTOTYPE | WSAESOCKTNOSUPPORT => io::ErrorKind::Unsupported, + WSAECONNREFUSED => io::ErrorKind::ConnectionRefused, + WSAETIMEDOUT => io::ErrorKind::TimedOut, + WSAECONNABORTED => io::ErrorKind::ConnectionAborted, + WSAECONNRESET => io::ErrorKind::ConnectionReset, + WSAEADDRINUSE => io::ErrorKind::AddrInUse, + WSAEADDRNOTAVAIL => io::ErrorKind::AddrNotAvailable, + WSAEACCES => io::ErrorKind::PermissionDenied, + WSAEISCONN => io::ErrorKind::AlreadyExists, + WSAENOTCONN => io::ErrorKind::NotConnected, + WSAESHUTDOWN => io::ErrorKind::BrokenPipe, + WSAEMSGSIZE => io::ErrorKind::InvalidInput, + WSAEHOSTUNREACH | WSAENETUNREACH => io::ErrorKind::HostUnreachable, + + _ => io::ErrorKind::Other, + }; + let description = match err { + WSANOTINITIALISED => "Successful WSAStartup call must occur before using this function", + WSAENETDOWN => "The network subsystem has failed", + WSAEAFNOSUPPORT => "The specified address family is not supported", + WSAEINPROGRESS => "A blocking Windows Sockets call is in progress", + WSAEMFILE => "No more socket descriptors are available", + WSAEINVAL => "An invalid argument was supplied", + WSAEINVALIDPROVIDER => "The service provider returned a version other than 2.2", + WSAEINVALIDPROCTABLE => "The service provider returned an invalid procedure table", + WSAENOBUFS => "No buffer space is available", + WSAEPROTONOSUPPORT => "The specified protocol is not supported", + WSAEPROTOTYPE => "The specified protocol is the wrong type for this socket", + WSAEPROVIDERFAILEDINIT => "The service provider failed to initialize", + WSAESOCKTNOSUPPORT => "The specified socket type is not supported in this address family", + WSAECONNREFUSED => "Connection refused", + WSAETIMEDOUT => "Connection timed out", + WSAECONNABORTED => "Connection aborted", + WSAECONNRESET => "Connection reset by peer", + WSAEADDRINUSE => "Address already in use", + WSAEADDRNOTAVAIL => "Address not available", + WSAEACCES => "Permission denied", + WSAEISCONN => "Socket is already connected", + WSAENOTCONN => "Socket is not connected", + WSAESHUTDOWN => "Socket has been shut down", + WSAEMSGSIZE => "Message too long", + WSAEHOSTUNREACH => "Host is unreachable", + WSAENETUNREACH => "Network is unreachable", + _ => "Windows Sockets error", + }; + io::Error::new(kind, format!("{} (error code: {:?})", description, err)) +} diff --git a/src/sys/windows/uds/socket.rs b/src/sys/windows/uds/socket.rs new file mode 100644 index 000000000..4eb46619b --- /dev/null +++ b/src/sys/windows/uds/socket.rs @@ -0,0 +1,203 @@ +use super::{startup, wsa_error}; +use std::{io, net::Shutdown, os::raw::c_int, path::Path, time::Duration}; +use windows_sys::Win32::Networking::WinSock::{ + self, AF_UNIX, FIONBIO, INVALID_SOCKET, SOCKADDR, SOCKADDR_UN, SOCKET, SOCKET_ERROR, + SOCK_STREAM, SOL_SOCKET, SO_ERROR, +}; +#[derive(Debug)] +pub struct Socket(pub SOCKET); + +impl Socket { + pub fn new() -> io::Result { + unsafe { + startup()?; + match WinSock::socket(AF_UNIX as _, SOCK_STREAM, 0) { + INVALID_SOCKET => Err(wsa_error()), + s => Ok(Self(s)), + } + } + } + pub fn write(&self, buf: &[u8]) -> io::Result { + unsafe { + match WinSock::send(self.0 as _, buf.as_ptr(), buf.len() as _, 0) { + SOCKET_ERROR => Err(wsa_error()), + len => Ok(len as _), + } + } + } + pub fn recv(&self, buf: &mut [u8]) -> io::Result { + unsafe { + match WinSock::recv(self.0 as _, buf.as_mut_ptr(), buf.len() as _, 0) { + 0 => Err(io::Error::other("Connection closed")), + len if len > 0 => Ok(len as _), + _ => Err(wsa_error()), + } + } + } + pub fn shutdown(&self, how: Shutdown) -> io::Result<()> { + use WinSock::{SD_BOTH, SD_RECEIVE, SD_SEND}; + let shutdown_how = match how { + Shutdown::Read => SD_RECEIVE, + Shutdown::Write => SD_SEND, + Shutdown::Both => SD_BOTH, + }; + unsafe { + match WinSock::shutdown(self.0, shutdown_how) { + 0 => Ok(()), + _ => Err(wsa_error()), + } + } + } + pub fn accept(&self, addr: *mut SOCKADDR, addrlen: *mut i32) -> io::Result { + unsafe { + // or we should just use None None here because + // seems like accept write nothing to addr and addrlen + match WinSock::accept(self.0, addr, addrlen) { + INVALID_SOCKET => Err(wsa_error()), + s => Ok(Socket(s)), + } + } + } + pub fn local_addr(&self) -> io::Result { + let mut addr = SocketAddr::default(); + match unsafe { + WinSock::getsockname( + self.0, + &mut addr.addr as *mut _ as *mut _, + &mut addr.addrlen as *mut _ as *mut _, + ) + } { + SOCKET_ERROR => Err(wsa_error()), + _ => Ok(addr), + } + } + pub fn peer_addr(&self) -> io::Result { + let mut s = SocketAddr::default(); + match unsafe { + WinSock::getpeername( + self.0, + &mut s.addr as *mut _ as *mut _, + &mut s.addrlen as *mut _ as *mut _, + ) + } { + SOCKET_ERROR => Err(wsa_error()), + _ => Ok(s), + } + } + pub fn take_error(&self) -> io::Result> { + unsafe { + let mut val = c_int::default(); + let mut len = size_of::() as i32; + match WinSock::getsockopt( + self.0, + SOL_SOCKET, + SO_ERROR, + &mut val as *mut _ as *mut _, + &mut len as *mut _, + ) { + SOCKET_ERROR => Err(wsa_error()), + _ => { + if val == 0 { + Ok(None) + } else { + Ok(Some(wsa_error())) + } + } + } + } + } + pub fn set_nonblocking(&self, nonblocking: bool) -> io::Result<()> { + let mut val = if nonblocking { 1u32 } else { 0 }; + match unsafe { WinSock::ioctlsocket(self.0, FIONBIO, &mut val as *mut _) } { + SOCKET_ERROR => Err(wsa_error()), + _ => Ok(()), + } + } + pub fn set_timeout(&self, dur: Option, kind: i32) -> io::Result<()> { + let timeout = match dur { + Some(dur) => dur.as_millis() as u32, + None => 0, + } + .to_ne_bytes(); + match unsafe { + WinSock::setsockopt( + self.0, + SOL_SOCKET, + kind, + &timeout as *const _, + timeout.len() as _, + ) + } { + SOCKET_ERROR => Err(wsa_error()), + _ => Ok(()), + } + } + //seems like not support + //https://learn.microsoft.com/en-us/windows/win32/api/winsock/nf-winsock-getsockopt + pub fn timeout(&self, kind: i32) -> io::Result> { + let mut val = c_int::default(); + let mut len = size_of::(); + match unsafe { + WinSock::getsockopt( + self.0, + SOL_SOCKET, + kind, + &mut val as *mut _ as *mut _, + &mut len as *mut _ as *mut _, + ) + } { + SOCKET_ERROR => Err(wsa_error()), + _ => Ok(Some(Duration::from_millis(val as u64))), + } + } +} + +pub struct SocketAddr { + pub addr: SOCKADDR_UN, + pub addrlen: i32, +} +impl Default for SocketAddr { + fn default() -> Self { + Self { + addr: SOCKADDR_UN::default(), + addrlen: 0, + } + } +} +impl SocketAddr { + pub fn from_pathname>(path: P) -> io::Result { + let (addr, addrlen) = socketaddr_un(path.as_ref())?; + Ok(Self { addr, addrlen }) + } +} + +pub fn socketaddr_un(path: &Path) -> io::Result<(SOCKADDR_UN, i32)> { + // let bytes = path.as_os_str().as_encoded_bytes(); + let mut sockaddr = SOCKADDR_UN::default(); + // Winsock2 expects 'sun_path' to be a Win32 UTF-8 file system path + let bytes = path.to_str().map(|s| s.as_bytes()).ok_or_else(|| { + io::Error::new( + io::ErrorKind::InvalidInput, + "path contains invalid characters", + ) + })?; + + if bytes.contains(&0) { + return Err(io::Error::new( + io::ErrorKind::InvalidInput, + "paths may not contain interior null bytes", + )); + } + + if bytes.len() >= sockaddr.sun_path.len() { + return Err(io::Error::new( + io::ErrorKind::InvalidInput, + "path must be shorter than SUN_LEN", + )); + } + let src_i8 = unsafe { std::slice::from_raw_parts(bytes.as_ptr() as *const i8, bytes.len()) }; + sockaddr.sun_family = AF_UNIX; + sockaddr.sun_path[..src_i8.len()].copy_from_slice(src_i8); + let socklen = size_of::() as _; + Ok((sockaddr, socklen)) +} diff --git a/src/sys/windows/uds/stream.rs b/src/sys/windows/uds/stream.rs new file mode 100644 index 000000000..2db4676ac --- /dev/null +++ b/src/sys/windows/uds/stream.rs @@ -0,0 +1,82 @@ +use std::fmt::Debug; +use std::io; +use std::ops::Deref; +use std::ops::DerefMut; +use std::path::Path; + +use crate::event; + +use super::Socket; +use super::SocketAddr; +#[derive(Debug)] +pub struct UnixStream(pub Socket); +impl UnixStream {} +impl io::Write for UnixStream { + fn write(&mut self, buf: &[u8]) -> io::Result { + io::Write::write(&mut &*self, buf) + } + fn flush(&mut self) -> io::Result<()> { + io::Write::flush(&mut &*self) + } +} +impl io::Write for &UnixStream { + fn write(&mut self, buf: &[u8]) -> io::Result { + self.0.write(buf) + } + fn flush(&mut self) -> io::Result<()> { + Ok(()) + } +} +impl io::Read for &UnixStream { + fn read(&mut self, buf: &mut [u8]) -> io::Result { + self.0.recv(buf) + } +} +impl io::Read for UnixStream { + fn read(&mut self, buf: &mut [u8]) -> io::Result { + io::Read::read(&mut &*self, buf) + } +} +impl Deref for UnixStream { + type Target = Socket; + fn deref(&self) -> &Self::Target { + &self.0 + } +} +impl DerefMut for UnixStream { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.0 + } +} +pub fn connect(path: &Path) -> io::Result { + todo!() +} +pub fn connect_addr(address: &SocketAddr) -> io::Result { + todo!() +} +pub fn pair() -> io::Result<(UnixStream, UnixStream)> { + todo!() +} +impl event::Source for UnixStream { + fn register( + &mut self, + registry: &crate::Registry, + token: crate::Token, + interests: crate::Interest, + ) -> io::Result<()> { + todo!() + } + + fn reregister( + &mut self, + registry: &crate::Registry, + token: crate::Token, + interests: crate::Interest, + ) -> io::Result<()> { + todo!() + } + + fn deregister(&mut self, registry: &crate::Registry) -> io::Result<()> { + todo!() + } +} From d40aa997efb8362b6053b1d908edafd7bb40fdac Mon Sep 17 00:00:00 2001 From: kouhe <25522053+kouhe3@users.noreply.github.com> Date: Fri, 7 Nov 2025 11:58:05 +0800 Subject: [PATCH 02/21] nfinish yet --- src/net/uds/stream.rs | 1 + src/sys/windows/uds/listener.rs | 11 ++++-- src/sys/windows/uds/socket.rs | 20 ++++++---- src/sys/windows/uds/stream.rs | 70 ++++++++++++++++++--------------- 4 files changed, 59 insertions(+), 43 deletions(-) diff --git a/src/net/uds/stream.rs b/src/net/uds/stream.rs index fd6c4ae75..3c3777a70 100644 --- a/src/net/uds/stream.rs +++ b/src/net/uds/stream.rs @@ -54,6 +54,7 @@ impl UnixStream { /// Creates an unnamed pair of connected sockets. /// /// Returns two `UnixStream`s which are connected to each other. + #[cfg(unix)] pub fn pair() -> io::Result<(UnixStream, UnixStream)> { sys::uds::stream::pair().map(|(stream1, stream2)| { (UnixStream::from_std(stream1), UnixStream::from_std(stream2)) diff --git a/src/sys/windows/uds/listener.rs b/src/sys/windows/uds/listener.rs index 5420d57f8..735791125 100644 --- a/src/sys/windows/uds/listener.rs +++ b/src/sys/windows/uds/listener.rs @@ -1,10 +1,9 @@ use super::{socketaddr_un, startup, wsa_error, Socket, SocketAddr, UnixStream}; use std::{ - io, - ops::{Deref, DerefMut}, - path::Path, + io, ops::{Deref, DerefMut}, os::windows::io::{AsRawSocket, RawSocket}, path::Path }; use windows_sys::Win32::Networking::WinSock::{self, SOCKADDR_UN, SOCKET_ERROR}; +#[derive(Debug)] pub struct UnixListener(Socket); impl UnixListener { @@ -74,3 +73,9 @@ pub fn accept(s: &UnixListener) -> io::Result<(crate::net::UnixStream, SocketAdd let (inner, addr) = s.accept()?; Ok((crate::net::UnixStream::from_std(inner), addr)) } + +impl AsRawSocket for UnixListener { + fn as_raw_socket(&self) -> RawSocket { + self.0.0 as _ + } +} \ No newline at end of file diff --git a/src/sys/windows/uds/socket.rs b/src/sys/windows/uds/socket.rs index 4eb46619b..195a2a8ea 100644 --- a/src/sys/windows/uds/socket.rs +++ b/src/sys/windows/uds/socket.rs @@ -1,5 +1,5 @@ use super::{startup, wsa_error}; -use std::{io, net::Shutdown, os::raw::c_int, path::Path, time::Duration}; +use std::{ffi::CStr, fmt::Debug, io, net::Shutdown, os::raw::c_int, path::Path, time::Duration}; use windows_sys::Win32::Networking::WinSock::{ self, AF_UNIX, FIONBIO, INVALID_SOCKET, SOCKADDR, SOCKADDR_UN, SOCKET, SOCKET_ERROR, SOCK_STREAM, SOL_SOCKET, SO_ERROR, @@ -151,17 +151,21 @@ impl Socket { } } } - +#[derive(Default)] pub struct SocketAddr { pub addr: SOCKADDR_UN, pub addrlen: i32, } -impl Default for SocketAddr { - fn default() -> Self { - Self { - addr: SOCKADDR_UN::default(), - addrlen: 0, - } + +impl Debug for SocketAddr { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> core::fmt::Result { + let sun_path_str = unsafe { + CStr::from_ptr(self.addr.sun_path.as_ptr()) + .to_string_lossy() + }; + + write!(f, "SocketAddr {{ addr: SOCKADDR_UN {{ sun_family: {}, sun_path: {:?} }}, addrlen: {} }}", + self.addr.sun_family, sun_path_str, self.addrlen) } } impl SocketAddr { diff --git a/src/sys/windows/uds/stream.rs b/src/sys/windows/uds/stream.rs index 2db4676ac..d7e4e4f9f 100644 --- a/src/sys/windows/uds/stream.rs +++ b/src/sys/windows/uds/stream.rs @@ -2,15 +2,44 @@ use std::fmt::Debug; use std::io; use std::ops::Deref; use std::ops::DerefMut; +use std::os::windows::io::AsRawSocket; +use std::os::windows::io::RawSocket; use std::path::Path; - -use crate::event; - +use windows_sys::Win32::Networking::WinSock; +use windows_sys::Win32::Networking::WinSock::SOCKET_ERROR; +use super::wsa_error; +use super::startup; +use super::socketaddr_un; use super::Socket; use super::SocketAddr; #[derive(Debug)] pub struct UnixStream(pub Socket); -impl UnixStream {} +impl UnixStream { + pub fn connect>(path: P) -> io::Result { + unsafe { + startup()?; + let s = Socket::new()?; + let (addr, len) = socketaddr_un(path.as_ref())?; + match WinSock::connect(s.0, &addr as *const _ as *const _, len) { + SOCKET_ERROR => Err(wsa_error()), + _ => Ok(Self(s)), + } + } + } + pub fn connect_addr(socket_addr: &SocketAddr) -> io::Result { + let s = Socket::new()?; + match unsafe { + WinSock::connect( + s.0, + &socket_addr.addr as *const _ as *const _, + socket_addr.addrlen, + ) + } { + SOCKET_ERROR => Err(wsa_error()), + _ => Ok(Self(s)), + } + } +} impl io::Write for UnixStream { fn write(&mut self, buf: &[u8]) -> io::Result { io::Write::write(&mut &*self, buf) @@ -48,35 +77,12 @@ impl DerefMut for UnixStream { &mut self.0 } } -pub fn connect(path: &Path) -> io::Result { - todo!() -} pub fn connect_addr(address: &SocketAddr) -> io::Result { - todo!() + UnixStream::connect_addr(address) } -pub fn pair() -> io::Result<(UnixStream, UnixStream)> { - todo!() -} -impl event::Source for UnixStream { - fn register( - &mut self, - registry: &crate::Registry, - token: crate::Token, - interests: crate::Interest, - ) -> io::Result<()> { - todo!() - } - fn reregister( - &mut self, - registry: &crate::Registry, - token: crate::Token, - interests: crate::Interest, - ) -> io::Result<()> { - todo!() +impl AsRawSocket for UnixStream { + fn as_raw_socket(&self) -> RawSocket { + self.0.0 as _ } - - fn deregister(&mut self, registry: &crate::Registry) -> io::Result<()> { - todo!() - } -} +} \ No newline at end of file From a658361da7020f640454aa829a98418f0a4123f9 Mon Sep 17 00:00:00 2001 From: kouhe <25522053+kouhe3@users.noreply.github.com> Date: Fri, 7 Nov 2025 12:29:41 +0800 Subject: [PATCH 03/21] move net to uds/net --- src/net/uds/listener.rs | 9 +- src/net/uds/stream.rs | 3 +- src/sys/windows/event.rs | 2 +- src/sys/windows/mod.rs | 2 +- src/sys/windows/net.rs | 3 - src/sys/windows/uds/listener.rs | 148 ++++++++++++++++++++++++++++++-- src/sys/windows/uds/mod.rs | 2 +- src/sys/windows/uds/net.rs | 2 + src/sys/windows/uds/stream.rs | 67 +++++++++++++++ tests/unix_stream.rs | 6 +- 10 files changed, 224 insertions(+), 20 deletions(-) create mode 100644 src/sys/windows/uds/net.rs diff --git a/src/net/uds/listener.rs b/src/net/uds/listener.rs index e48a0fdf5..2bf4a5345 100644 --- a/src/net/uds/listener.rs +++ b/src/net/uds/listener.rs @@ -1,7 +1,10 @@ -#[cfg(unix)] -use std::os::{fd::{AsFd, AsRawFd, BorrowedFd, FromRawFd, IntoRawFd, OwnedFd, RawFd},unix::net::{self, SocketAddr}}; #[cfg(windows)] -use crate::sys::{net,uds::{SocketAddr}}; +use crate::sys::uds::{net, SocketAddr}; +#[cfg(unix)] +use std::os::{ + fd::{AsFd, AsRawFd, BorrowedFd, FromRawFd, IntoRawFd, OwnedFd, RawFd}, + unix::net::{self, SocketAddr}, +}; use std::path::Path; use std::{fmt, io}; diff --git a/src/net/uds/stream.rs b/src/net/uds/stream.rs index 3c3777a70..a2d013d72 100644 --- a/src/net/uds/stream.rs +++ b/src/net/uds/stream.rs @@ -4,7 +4,8 @@ use std::net::Shutdown; #[cfg(unix)] use std::os::{fd::{AsFd, AsRawFd, BorrowedFd, FromRawFd, IntoRawFd, OwnedFd, RawFd},unix::net::{self, SocketAddr}}; #[cfg(windows)] -use crate::sys::{net,uds::socket::SocketAddr}; +use crate::sys::{uds::{SocketAddr,net}}; + use std::path::Path; use crate::io_source::IoSource; diff --git a/src/sys/windows/event.rs b/src/sys/windows/event.rs index 66656d0e5..d769ea6c4 100644 --- a/src/sys/windows/event.rs +++ b/src/sys/windows/event.rs @@ -4,7 +4,7 @@ use super::afd; use super::iocp::CompletionStatus; use crate::Token; -#[derive(Clone)] +#[derive(Clone,Debug)] pub struct Event { pub flags: u32, pub data: u64, diff --git a/src/sys/windows/mod.rs b/src/sys/windows/mod.rs index 6f67643fc..9d4af5c70 100644 --- a/src/sys/windows/mod.rs +++ b/src/sys/windows/mod.rs @@ -31,7 +31,7 @@ cfg_net! { }}; } - pub mod net; + mod net; pub(crate) mod tcp; pub(crate) mod udp; diff --git a/src/sys/windows/net.rs b/src/sys/windows/net.rs index 3f0d0ac99..d526f2884 100644 --- a/src/sys/windows/net.rs +++ b/src/sys/windows/net.rs @@ -111,6 +111,3 @@ pub(crate) fn socket_addr(addr: &SocketAddr) -> (SocketAddrCRepr, i32) { } } } - -pub use super::uds::UnixStream; -pub use super::uds::UnixListener; diff --git a/src/sys/windows/uds/listener.rs b/src/sys/windows/uds/listener.rs index 735791125..74103130a 100644 --- a/src/sys/windows/uds/listener.rs +++ b/src/sys/windows/uds/listener.rs @@ -1,12 +1,76 @@ use super::{socketaddr_un, startup, wsa_error, Socket, SocketAddr, UnixStream}; use std::{ - io, ops::{Deref, DerefMut}, os::windows::io::{AsRawSocket, RawSocket}, path::Path + io, + ops::{Deref, DerefMut}, + os::windows::io::{AsRawSocket, RawSocket}, + path::Path, }; use windows_sys::Win32::Networking::WinSock::{self, SOCKADDR_UN, SOCKET_ERROR}; +/// A Unix domain socket server for listening to incoming connections. +/// +/// This structure represents a socket server that listens for incoming Unix domain socket +/// connections on Windows systems. After creating a `UnixListener` by binding it to a socket +/// address, it can accept incoming connections from clients. +/// +/// The `UnixListener` wraps an underlying `Socket` and provides a higher-level interface +/// for server-side Unix domain socket operations. +/// +/// # Examples +/// +/// ```no_run +/// use std::io; +/// use mio::sys::uds::UnixListener; +/// +/// fn main() -> io::Result<()> { +/// // Bind to a socket file +/// let listener = UnixListener::bind("C:/socket.sock")?; +/// +/// // Accept incoming connections +/// match listener.accept() { +/// Ok((stream, addr)) => { +/// println!("New connection from {:?}", addr); +/// // Handle the connection with the stream... +/// } +/// Err(e) => eprintln!("Connection failed: {}", e), +/// } +/// +/// Ok(()) +/// } +/// ``` #[derive(Debug)] pub struct UnixListener(Socket); impl UnixListener { + /// Creates a new `UnixListener` bound to the specified path. + /// + /// This function will perform the following operations: + /// 1. Initialize the Winsock library + /// 2. Create a new socket + /// 3. Convert the provided path to a socket address + /// 4. Bind the socket to the address + /// 5. Start listening for incoming connections with a backlog of 5 + /// + /// # Arguments + /// + /// * `path` - The filesystem path to bind the socket to + /// + /// # Errors + /// + /// This function will return an error in the following situations: + /// + /// * Winsock initialization fails + /// * Socket creation fails + /// * The path cannot be converted to a valid socket address + /// * Binding to the specified path fails + /// * Listening on the socket fails + /// + /// # Examples + /// + /// ```no_run + /// use mio::sys::uds::UnixListener; + /// + /// let listener = UnixListener::bind("/tmp/my_socket.sock").unwrap(); + /// ``` pub fn bind>(path: P) -> io::Result { unsafe { startup()?; @@ -22,6 +86,38 @@ impl UnixListener { } } } + + /// Creates a new `UnixListener` bound to the specified socket address. + /// + /// This function allows binding to a pre-constructed `SocketAddr` instead of + /// creating one from a path. This can be useful when you need more control + /// over the socket address configuration or when reusing addresses. + /// + /// Unlike `bind`, this function does not initialize Winsock, assuming it has + /// already been initialized elsewhere. + /// + /// # Arguments + /// + /// * `socket_addr` - The socket address to bind to + /// + /// # Errors + /// + /// This function will return an error in the following situations: + /// + /// * Socket creation fails + /// * Binding to the specified address fails + /// * Listening on the socket fails + /// + /// # Examples + /// + /// ```no_run + /// use mio::sys::uds::{UnixListener, SocketAddr}; + /// use std::path::Path; + /// + /// // Create a socket address first + /// let addr = SocketAddr::from_path(Path::new("/tmp/socket.sock")).unwrap(); + /// let listener = UnixListener::bind_addr(&addr).unwrap(); + /// ``` pub fn bind_addr(socket_addr: &SocketAddr) -> io::Result { unsafe { let s = Socket::new()?; @@ -40,6 +136,46 @@ impl UnixListener { } } } + + /// Accepts a new incoming connection to this listener. + /// + /// This function will block the calling thread until a new Unix domain socket + /// connection is established. When established, the corresponding [`UnixStream`] + /// and the remote peer's address will be returned. + /// + /// The returned [`UnixStream`] can be used to read and write data to the connected + /// client, while the [`SocketAddr`] contains information about the client's address. + /// + /// # Errors + /// + /// This function will return an error if the underlying socket call fails. + /// Specific errors may include: + /// + /// * The socket is not bound or listening + /// * The socket has been closed + /// * Insufficient resources to complete the operation + /// * The operation was interrupted + /// + /// # Examples + /// + /// ```no_run + /// use your_crate::UnixListener; + /// + /// let listener = UnixListener::bind("/tmp/socket.sock").unwrap(); + /// + /// // Accept connections in a loop + /// for stream_result in listener.incoming() { + /// match stream_result { + /// Ok((stream, addr)) => { + /// println!("New connection from {:?}", addr); + /// // Handle the connection... + /// } + /// Err(e) => { + /// eprintln!("Accept error: {}", e); + /// } + /// } + /// } + /// ``` pub fn accept(&self) -> io::Result<(UnixStream, SocketAddr)> { let mut addr = SOCKADDR_UN::default(); let mut addrlen = size_of::() as _; @@ -48,12 +184,6 @@ impl UnixListener { .accept(&mut addr as *mut _ as *mut _, &mut addrlen as *mut _)?; Ok((UnixStream(s), SocketAddr { addr, addrlen })) } - pub fn local_addr(&self) -> io::Result { - self.0.local_addr() - } - pub fn take_error(&self) -> io::Result> { - self.0.take_error() - } } impl Deref for UnixListener { type Target = Socket; @@ -76,6 +206,6 @@ pub fn accept(s: &UnixListener) -> io::Result<(crate::net::UnixStream, SocketAdd impl AsRawSocket for UnixListener { fn as_raw_socket(&self) -> RawSocket { - self.0.0 as _ + self.0 .0 as _ } -} \ No newline at end of file +} diff --git a/src/sys/windows/uds/mod.rs b/src/sys/windows/uds/mod.rs index 77dae1286..474b5e8e7 100644 --- a/src/sys/windows/uds/mod.rs +++ b/src/sys/windows/uds/mod.rs @@ -1,5 +1,5 @@ use std::io; - +pub mod net; pub mod stream; pub use stream::*; pub mod socket; diff --git a/src/sys/windows/uds/net.rs b/src/sys/windows/uds/net.rs new file mode 100644 index 000000000..f30a120b2 --- /dev/null +++ b/src/sys/windows/uds/net.rs @@ -0,0 +1,2 @@ +pub use super::{UnixListener,UnixStream}; +//we need this file to report std::os::unix::net \ No newline at end of file diff --git a/src/sys/windows/uds/stream.rs b/src/sys/windows/uds/stream.rs index d7e4e4f9f..585183d61 100644 --- a/src/sys/windows/uds/stream.rs +++ b/src/sys/windows/uds/stream.rs @@ -12,9 +12,50 @@ use super::startup; use super::socketaddr_un; use super::Socket; use super::SocketAddr; +/// A Unix domain socket stream client. +/// +/// This type represents a connected Unix domain socket client stream, +/// providing bidirectional I/O communication with a server. +/// +/// # Examples +/// +/// ```no_run +/// use std::io::{Read, Write}; +/// +/// let mut stream = UnixStream::connect("/tmp/socket.sock")?; +/// stream.write_all(b"Hello, server!")?; +/// +/// let mut response = String::new(); +/// stream.read_to_string(&mut response)?; +/// # Ok::<(), Box>(()) +/// ``` #[derive(Debug)] pub struct UnixStream(pub Socket); impl UnixStream { + /// Connects to a Unix domain socket server at the specified filesystem path. + /// + /// This function creates a new socket and establishes a connection to the server + /// listening on the given path. The path must be a valid filesystem path that + /// the server is bound to. + /// + /// # Arguments + /// + /// * `path` - The filesystem path of the server socket to connect to + /// + /// # Errors + /// + /// Returns an `io::Error` if: + /// - Winsock initialization fails + /// - Socket creation fails + /// - The connection attempt fails + /// - The provided path is invalid + /// + /// # Examples + /// + /// ```no_run + /// let stream = UnixStream::connect("C:/my_socket")?; + /// # Ok::<(), std::io::Error>(()) + /// ``` pub fn connect>(path: P) -> io::Result { unsafe { startup()?; @@ -26,6 +67,32 @@ impl UnixStream { } } } + + /// Connects to a Unix domain socket server using a pre-constructed `SocketAddr`. + /// + /// This function creates a new socket and establishes a connection to the server + /// address specified by the given `SocketAddr`. This is useful when you already + /// have a socket address constructed and want to reuse it. + /// + /// # Arguments + /// + /// * `socket_addr` - The socket address of the server to connect to + /// + /// # Errors + /// + /// Returns an `io::Error` if: + /// - Socket creation fails + /// - The connection attempt fails + /// + /// # Examples + /// + /// ```no_run + /// use mio::sys::uds::SocketAddr; + /// + /// let addr = SocketAddr::from_path("C:/my_socket")?; + /// let stream = UnixStream::connect_addr(&addr)?; + /// # Ok::<(), std::io::Error>(()) + /// ``` pub fn connect_addr(socket_addr: &SocketAddr) -> io::Result { let s = Socket::new()?; match unsafe { diff --git a/tests/unix_stream.rs b/tests/unix_stream.rs index 8a12a90aa..331e4c889 100644 --- a/tests/unix_stream.rs +++ b/tests/unix_stream.rs @@ -1,9 +1,12 @@ -#![cfg(all(unix, feature = "os-poll", feature = "net"))] +#![cfg(all(feature = "os-poll", feature = "net"))] use mio::net::UnixStream; +#[cfg(windows)] +use mio::sys::net; use mio::{Interest, Token}; use std::io::{self, IoSlice, IoSliceMut, Read, Write}; use std::net::Shutdown; +#[cfg(unix)] use std::os::unix::net; use std::path::Path; use std::sync::mpsc::channel; @@ -146,6 +149,7 @@ fn unix_stream_from_std() { } #[test] +#[cfg(unix)] fn unix_stream_pair() { let (mut poll, mut events) = init_with_poll(); From fb3b311a167b997049e4ca5e2cd18a4bc522ed2a Mon Sep 17 00:00:00 2001 From: kouhe <25522053+kouhe3@users.noreply.github.com> Date: Fri, 7 Nov 2025 13:40:47 +0800 Subject: [PATCH 04/21] can't pass test --- src/lib.rs | 1 + src/sys/windows/mod.rs | 3 +- src/sys/windows/uds/listener.rs | 23 +++---- src/sys/windows/uds/mod.rs | 11 ++-- src/sys/windows/uds/net.rs | 2 +- src/sys/windows/uds/socket.rs | 96 ++++++++++++++-------------- src/sys/windows/uds/stream.rs | 109 +++++++++++++++++++------------- tests/unix_stream.rs | 6 +- 8 files changed, 136 insertions(+), 115 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 9bd29448c..9171363ff 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -50,6 +50,7 @@ mod macros; mod interest; mod poll; mod sys; +pub use sys::uds; mod token; #[cfg(not(target_os = "wasi"))] mod waker; diff --git a/src/sys/windows/mod.rs b/src/sys/windows/mod.rs index 9d4af5c70..2f13b4f9a 100644 --- a/src/sys/windows/mod.rs +++ b/src/sys/windows/mod.rs @@ -41,7 +41,8 @@ cfg_net! { cfg_os_ext! { pub(crate) mod named_pipe; - pub(crate) mod uds; + /// UDS on Windows + pub mod uds; } mod waker; diff --git a/src/sys/windows/uds/listener.rs b/src/sys/windows/uds/listener.rs index 74103130a..d12e333a1 100644 --- a/src/sys/windows/uds/listener.rs +++ b/src/sys/windows/uds/listener.rs @@ -1,7 +1,6 @@ use super::{socketaddr_un, startup, wsa_error, Socket, SocketAddr, UnixStream}; use std::{ io, - ops::{Deref, DerefMut}, os::windows::io::{AsRawSocket, RawSocket}, path::Path, }; @@ -182,24 +181,22 @@ impl UnixListener { let s = self .0 .accept(&mut addr as *mut _ as *mut _, &mut addrlen as *mut _)?; - Ok((UnixStream(s), SocketAddr { addr, addrlen })) + Ok((UnixStream::new(s), SocketAddr { addr, addrlen })) } -} -impl Deref for UnixListener { - type Target = Socket; - fn deref(&self) -> &Self::Target { - &self.0 + /// Returns the socket address of the local half of this connection. + pub fn local_addr(&self) -> io::Result { + self.0.local_addr() } -} -impl DerefMut for UnixListener { - fn deref_mut(&mut self) -> &mut Self::Target { - &mut self.0 + /// Returns the value of the `SO_ERROR` option. + pub fn take_error(&self) -> io::Result> { + self.0.take_error() } } -pub fn bind_addr(socket_addr: &SocketAddr) -> io::Result { + +pub(crate) fn bind_addr(socket_addr: &SocketAddr) -> io::Result { UnixListener::bind_addr(socket_addr) } -pub fn accept(s: &UnixListener) -> io::Result<(crate::net::UnixStream, SocketAddr)> { +pub(crate) fn accept(s: &UnixListener) -> io::Result<(crate::net::UnixStream, SocketAddr)> { let (inner, addr) = s.accept()?; Ok((crate::net::UnixStream::from_std(inner), addr)) } diff --git a/src/sys/windows/uds/mod.rs b/src/sys/windows/uds/mod.rs index 474b5e8e7..75c1da6b4 100644 --- a/src/sys/windows/uds/mod.rs +++ b/src/sys/windows/uds/mod.rs @@ -1,12 +1,13 @@ use std::io; +///we need this file to report std::os::unix::net pub mod net; -pub mod stream; +pub(crate) mod stream; pub use stream::*; -pub mod socket; +mod socket; pub use socket::*; -pub mod listener; +pub(crate) mod listener; pub use listener::*; -pub fn startup() -> io::Result<()> { +pub(crate) fn startup() -> io::Result<()> { use windows_sys::Win32::Networking::WinSock::{self, WSADATA}; use WinSock::{WSAEFAULT, WSAEINPROGRESS, WSAEPROCLIM, WSASYSNOTREADY, WSAVERNOTSUPPORTED}; let mut wsa_data = WSADATA::default(); @@ -29,7 +30,7 @@ pub fn startup() -> io::Result<()> { _ => Err(io::Error::other("Unknown WSAStartup error")), } } -pub fn wsa_error() -> io::Error { +pub(crate) fn wsa_error() -> io::Error { use windows_sys::Win32::Networking::WinSock::{ WSAGetLastError, WSAEACCES, WSAEADDRINUSE, WSAEADDRNOTAVAIL, WSAEAFNOSUPPORT, WSAECONNABORTED, WSAECONNREFUSED, WSAECONNRESET, WSAEHOSTUNREACH, WSAEINPROGRESS, diff --git a/src/sys/windows/uds/net.rs b/src/sys/windows/uds/net.rs index f30a120b2..504fbd59c 100644 --- a/src/sys/windows/uds/net.rs +++ b/src/sys/windows/uds/net.rs @@ -1,2 +1,2 @@ -pub use super::{UnixListener,UnixStream}; +pub use super::{UnixListener,UnixStream,SocketAddr}; //we need this file to report std::os::unix::net \ No newline at end of file diff --git a/src/sys/windows/uds/socket.rs b/src/sys/windows/uds/socket.rs index 195a2a8ea..d27645297 100644 --- a/src/sys/windows/uds/socket.rs +++ b/src/sys/windows/uds/socket.rs @@ -1,11 +1,11 @@ use super::{startup, wsa_error}; -use std::{ffi::CStr, fmt::Debug, io, net::Shutdown, os::raw::c_int, path::Path, time::Duration}; +use std::{ffi::CStr, fmt::Debug, io, net::Shutdown, os::raw::c_int, path::Path}; use windows_sys::Win32::Networking::WinSock::{ self, AF_UNIX, FIONBIO, INVALID_SOCKET, SOCKADDR, SOCKADDR_UN, SOCKET, SOCKET_ERROR, SOCK_STREAM, SOL_SOCKET, SO_ERROR, }; #[derive(Debug)] -pub struct Socket(pub SOCKET); +pub(crate) struct Socket(pub SOCKET); impl Socket { pub fn new() -> io::Result { @@ -60,6 +60,7 @@ impl Socket { } pub fn local_addr(&self) -> io::Result { let mut addr = SocketAddr::default(); + addr.addrlen = size_of::() as i32; match unsafe { WinSock::getsockname( self.0, @@ -113,69 +114,70 @@ impl Socket { _ => Ok(()), } } - pub fn set_timeout(&self, dur: Option, kind: i32) -> io::Result<()> { - let timeout = match dur { - Some(dur) => dur.as_millis() as u32, - None => 0, - } - .to_ne_bytes(); - match unsafe { - WinSock::setsockopt( - self.0, - SOL_SOCKET, - kind, - &timeout as *const _, - timeout.len() as _, - ) - } { - SOCKET_ERROR => Err(wsa_error()), - _ => Ok(()), - } - } - //seems like not support - //https://learn.microsoft.com/en-us/windows/win32/api/winsock/nf-winsock-getsockopt - pub fn timeout(&self, kind: i32) -> io::Result> { - let mut val = c_int::default(); - let mut len = size_of::(); - match unsafe { - WinSock::getsockopt( - self.0, - SOL_SOCKET, - kind, - &mut val as *mut _ as *mut _, - &mut len as *mut _ as *mut _, - ) - } { - SOCKET_ERROR => Err(wsa_error()), - _ => Ok(Some(Duration::from_millis(val as u64))), - } - } } #[derive(Default)] +/// A socket address for Unix domain sockets. +/// +/// This struct wraps the underlying system socket address structure +/// along with its length to provide a safe interface for working with +/// Unix domain sockets. pub struct SocketAddr { + /// The underlying system socket address structure pub addr: SOCKADDR_UN, + /// The length of the socket address structure pub addrlen: i32, } impl Debug for SocketAddr { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> core::fmt::Result { - let sun_path_str = unsafe { - CStr::from_ptr(self.addr.sun_path.as_ptr()) - .to_string_lossy() - }; - - write!(f, "SocketAddr {{ addr: SOCKADDR_UN {{ sun_family: {}, sun_path: {:?} }}, addrlen: {} }}", - self.addr.sun_family, sun_path_str, self.addrlen) + let sun_path_str = unsafe { CStr::from_ptr(self.addr.sun_path.as_ptr()).to_string_lossy() }; + + write!( + f, + "SocketAddr {{ addr: SOCKADDR_UN {{ sun_family: {}, sun_path: {:?} }}, addrlen: {} }}", + self.addr.sun_family, sun_path_str, self.addrlen + ) } } impl SocketAddr { + /// Creates a new `SocketAddr` from a filesystem path. + /// + /// # Arguments + /// + /// * `path` - A path to a socket file in the filesystem + /// + /// # Returns + /// + /// Returns `Ok(SocketAddr)` if the address was successfully created, + /// or an `io::Error` if the path is invalid or too long. + /// + /// # Examples + /// + /// ```no_run + /// use std::path::Path; + /// use mio::uds::SocketAddr; + /// + /// let addr = SocketAddr::from_pathname("/tmp/my_socket.sock").unwrap(); + /// ``` pub fn from_pathname>(path: P) -> io::Result { let (addr, addrlen) = socketaddr_un(path.as_ref())?; Ok(Self { addr, addrlen }) } + /// Returns the contents of this address if it is a `pathname` address + pub fn as_pathname(&self) -> Option<&Path> { + let path_ptr = self.addr.sun_path.as_ptr(); + if unsafe { *path_ptr } == 0 { + return None; + } + let c_str = unsafe { CStr::from_ptr(path_ptr) }; + match c_str.to_str() { + Ok(s) => Some(Path::new(s)), + Err(_e) => None, + } + } } -pub fn socketaddr_un(path: &Path) -> io::Result<(SOCKADDR_UN, i32)> { +pub(crate) fn socketaddr_un(path: &Path) -> io::Result<(SOCKADDR_UN, i32)> { // let bytes = path.as_os_str().as_encoded_bytes(); let mut sockaddr = SOCKADDR_UN::default(); // Winsock2 expects 'sun_path' to be a Win32 UTF-8 file system path diff --git a/src/sys/windows/uds/stream.rs b/src/sys/windows/uds/stream.rs index 585183d61..441dc7a7c 100644 --- a/src/sys/windows/uds/stream.rs +++ b/src/sys/windows/uds/stream.rs @@ -1,57 +1,59 @@ +use super::socketaddr_un; +use super::startup; +use super::wsa_error; +use super::Socket; +use super::SocketAddr; use std::fmt::Debug; use std::io; -use std::ops::Deref; -use std::ops::DerefMut; +use std::net::Shutdown; use std::os::windows::io::AsRawSocket; use std::os::windows::io::RawSocket; use std::path::Path; use windows_sys::Win32::Networking::WinSock; use windows_sys::Win32::Networking::WinSock::SOCKET_ERROR; -use super::wsa_error; -use super::startup; -use super::socketaddr_un; -use super::Socket; -use super::SocketAddr; /// A Unix domain socket stream client. -/// -/// This type represents a connected Unix domain socket client stream, +/// +/// This type represents a connected Unix domain socket client stream, /// providing bidirectional I/O communication with a server. -/// +/// /// # Examples -/// +/// /// ```no_run /// use std::io::{Read, Write}; -/// +/// /// let mut stream = UnixStream::connect("/tmp/socket.sock")?; /// stream.write_all(b"Hello, server!")?; -/// +/// /// let mut response = String::new(); /// stream.read_to_string(&mut response)?; /// # Ok::<(), Box>(()) /// ``` #[derive(Debug)] -pub struct UnixStream(pub Socket); +pub struct UnixStream(Socket); impl UnixStream { - /// Connects to a Unix domain socket server at the specified filesystem path. - /// + pub(crate) fn new(socket: Socket) -> Self { + Self(socket) + } + /// Connects to a Unix domain socket server at the specified filesystem path. + /// /// This function creates a new socket and establishes a connection to the server /// listening on the given path. The path must be a valid filesystem path that /// the server is bound to. - /// + /// /// # Arguments - /// + /// /// * `path` - The filesystem path of the server socket to connect to - /// + /// /// # Errors - /// + /// /// Returns an `io::Error` if: /// - Winsock initialization fails /// - Socket creation fails /// - The connection attempt fails /// - The provided path is invalid - /// + /// /// # Examples - /// + /// /// ```no_run /// let stream = UnixStream::connect("C:/my_socket")?; /// # Ok::<(), std::io::Error>(()) @@ -67,28 +69,28 @@ impl UnixStream { } } } - + /// Connects to a Unix domain socket server using a pre-constructed `SocketAddr`. - /// + /// /// This function creates a new socket and establishes a connection to the server /// address specified by the given `SocketAddr`. This is useful when you already /// have a socket address constructed and want to reuse it. - /// + /// /// # Arguments - /// + /// /// * `socket_addr` - The socket address of the server to connect to - /// + /// /// # Errors - /// + /// /// Returns an `io::Error` if: /// - Socket creation fails /// - The connection attempt fails - /// + /// /// # Examples - /// + /// /// ```no_run /// use mio::sys::uds::SocketAddr; - /// + /// /// let addr = SocketAddr::from_path("C:/my_socket")?; /// let stream = UnixStream::connect_addr(&addr)?; /// # Ok::<(), std::io::Error>(()) @@ -106,6 +108,33 @@ impl UnixStream { _ => Ok(Self(s)), } } + /// Returns the socket address of the local half of this connection. + pub fn local_addr(&self) -> io::Result { + self.0.local_addr() + } + + /// Returns the socket address of the remote half of this connection. + pub fn peer_addr(&self) -> io::Result { + self.0.peer_addr() + } + + /// Returns the value of the `SO_ERROR` option. + pub fn take_error(&self) -> io::Result> { + self.0.take_error() + } + + /// Shuts down the read, write, or both halves of this connection. + /// + /// This function will cause all pending and future I/O calls on the + /// specified portions to immediately return with an appropriate value + /// (see the documentation of `Shutdown`). + pub fn shutdown(&self, how: Shutdown) -> io::Result<()> { + self.0.shutdown(how) + } + /// Sets the non-blocking mode for this socket + pub fn set_nonblocking(&self, nonblocking: bool) -> io::Result<()> { + self.0.set_nonblocking(nonblocking) + } } impl io::Write for UnixStream { fn write(&mut self, buf: &[u8]) -> io::Result { @@ -133,23 +162,13 @@ impl io::Read for UnixStream { io::Read::read(&mut &*self, buf) } } -impl Deref for UnixStream { - type Target = Socket; - fn deref(&self) -> &Self::Target { - &self.0 - } -} -impl DerefMut for UnixStream { - fn deref_mut(&mut self) -> &mut Self::Target { - &mut self.0 - } -} -pub fn connect_addr(address: &SocketAddr) -> io::Result { + +pub(crate) fn connect_addr(address: &SocketAddr) -> io::Result { UnixStream::connect_addr(address) } impl AsRawSocket for UnixStream { fn as_raw_socket(&self) -> RawSocket { - self.0.0 as _ + self.0 .0 as _ } -} \ No newline at end of file +} diff --git a/tests/unix_stream.rs b/tests/unix_stream.rs index 331e4c889..068066a9b 100644 --- a/tests/unix_stream.rs +++ b/tests/unix_stream.rs @@ -1,8 +1,8 @@ #![cfg(all(feature = "os-poll", feature = "net"))] -use mio::net::UnixStream; #[cfg(windows)] -use mio::sys::net; +use mio::uds::net; +use mio::net::UnixStream; use mio::{Interest, Token}; use std::io::{self, IoSlice, IoSliceMut, Read, Write}; use std::net::Shutdown; @@ -409,7 +409,7 @@ fn unix_stream_shutdown_both() { #[cfg(unix)] assert_eq!(err.kind(), io::ErrorKind::BrokenPipe); #[cfg(windows)] - assert_eq!(err.kind(), io::ErrorKind::ConnectionAbroted); + assert_eq!(err.kind(), io::ErrorKind::ConnectionAborted); // Close the connection to allow the remote to shutdown drop(stream); From 3e1cb96f4044efec065e7b4765946353cea7e047 Mon Sep 17 00:00:00 2001 From: kouhe <25522053+kouhe3@users.noreply.github.com> Date: Fri, 7 Nov 2025 13:52:53 +0800 Subject: [PATCH 05/21] set_nonblocking --- src/sys/windows/uds/listener.rs | 4 ++++ tests/unix_listener.rs | 8 ++++++-- 2 files changed, 10 insertions(+), 2 deletions(-) diff --git a/src/sys/windows/uds/listener.rs b/src/sys/windows/uds/listener.rs index d12e333a1..be6befa95 100644 --- a/src/sys/windows/uds/listener.rs +++ b/src/sys/windows/uds/listener.rs @@ -191,6 +191,10 @@ impl UnixListener { pub fn take_error(&self) -> io::Result> { self.0.take_error() } + /// Sets the non-blocking mode for this socket + pub fn set_nonblocking(&self, nonblocking: bool) -> io::Result<()> { + self.0.set_nonblocking(nonblocking) + } } pub(crate) fn bind_addr(socket_addr: &SocketAddr) -> io::Result { diff --git a/tests/unix_listener.rs b/tests/unix_listener.rs index 7666f9ff2..578d1d086 100644 --- a/tests/unix_listener.rs +++ b/tests/unix_listener.rs @@ -1,9 +1,12 @@ -#![cfg(all(unix, feature = "os-poll", feature = "net"))] +#![cfg(all(feature = "os-poll", feature = "net"))] use mio::net::UnixListener; use mio::{Interest, Token}; use std::io::{self, Read}; +#[cfg(unix)] use std::os::unix::net; +#[cfg(windows)] +use mio::uds::net; use std::path::{Path, PathBuf}; use std::sync::{Arc, Barrier}; use std::thread; @@ -220,12 +223,13 @@ where let mut buf = [0; DEFAULT_BUF_SIZE]; assert_would_block(stream.read(&mut buf)); - + drop(stream); assert_would_block(listener.accept()); assert!(listener.take_error().unwrap().is_none()); barrier.wait(); handle.join().unwrap(); + drop(listener); } fn open_connections( From d98a0d9a6eb1d3467fe03f96f3a7b67aa8f63a0a Mon Sep 17 00:00:00 2001 From: kouhe <25522053+kouhe3@users.noreply.github.com> Date: Fri, 7 Nov 2025 14:10:52 +0800 Subject: [PATCH 06/21] fix --- src/lib.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/src/lib.rs b/src/lib.rs index 9171363ff..8a40ce5a1 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -50,6 +50,7 @@ mod macros; mod interest; mod poll; mod sys; +#[cfg(windows)] pub use sys::uds; mod token; #[cfg(not(target_os = "wasi"))] From 8015a09b5d3a35f2d057139121d583d25b22f353 Mon Sep 17 00:00:00 2001 From: kouhe <25522053+kouhe3@users.noreply.github.com> Date: Fri, 7 Nov 2025 14:26:43 +0800 Subject: [PATCH 07/21] handle Connection closed --- tests/unix_stream.rs | 12 ++---------- 1 file changed, 2 insertions(+), 10 deletions(-) diff --git a/tests/unix_stream.rs b/tests/unix_stream.rs index 068066a9b..e717de9e2 100644 --- a/tests/unix_stream.rs +++ b/tests/unix_stream.rs @@ -1,8 +1,8 @@ #![cfg(all(feature = "os-poll", feature = "net"))] +use mio::net::UnixStream; #[cfg(windows)] use mio::uds::net; -use mio::net::UnixStream; use mio::{Interest, Token}; use std::io::{self, IoSlice, IoSliceMut, Read, Write}; use std::net::Shutdown; @@ -636,15 +636,7 @@ fn new_echo_listener( amount } Err(ref err) if err.kind() == io::ErrorKind::WouldBlock => continue, - Err(ref err) - if matches!( - err.kind(), - io::ErrorKind::ConnectionReset | io::ErrorKind::ConnectionAborted - ) => - { - break - } - Err(err) => panic!("{}", err), + Err(_) => break, }; if n == 0 { break; From 2a7397a2dc39f9f2dec60f4e91b6beb2e7db1d4b Mon Sep 17 00:00:00 2001 From: kouhe <25522053+kouhe3@users.noreply.github.com> Date: Fri, 7 Nov 2025 15:10:18 +0800 Subject: [PATCH 08/21] write recv_vectored --- src/sys/windows/uds/socket.rs | 54 +++++++++++++++++++++++++++++++++-- src/sys/windows/uds/stream.rs | 12 ++++++++ tests/unix_listener.rs | 4 +-- tests/util/mod.rs | 3 +- 4 files changed, 67 insertions(+), 6 deletions(-) diff --git a/src/sys/windows/uds/socket.rs b/src/sys/windows/uds/socket.rs index d27645297..05af95091 100644 --- a/src/sys/windows/uds/socket.rs +++ b/src/sys/windows/uds/socket.rs @@ -1,8 +1,8 @@ use super::{startup, wsa_error}; -use std::{ffi::CStr, fmt::Debug, io, net::Shutdown, os::raw::c_int, path::Path}; +use std::{ffi::CStr, fmt::Debug, io, net::Shutdown, os::raw::c_int, path::Path, ptr::null_mut}; use windows_sys::Win32::Networking::WinSock::{ self, AF_UNIX, FIONBIO, INVALID_SOCKET, SOCKADDR, SOCKADDR_UN, SOCKET, SOCKET_ERROR, - SOCK_STREAM, SOL_SOCKET, SO_ERROR, + SOCK_STREAM, SOL_SOCKET, SO_ERROR, WSABUF, }; #[derive(Debug)] pub(crate) struct Socket(pub SOCKET); @@ -25,6 +25,31 @@ impl Socket { } } } + pub fn write_vectored(&self, bufs: &[io::IoSlice<'_>]) -> io::Result { + let bufs: Vec<_> = bufs + .iter() + .map(|buf| WSABUF { + buf: buf.as_ptr() as *mut _, + len: buf.len() as _, + }) + .collect(); + let mut bytes_send = 0; + unsafe { + match WinSock::WSASend( + self.0, + bufs.as_ptr(), + bufs.len() as _, + &mut bytes_send, + 0, + null_mut(), + None, + ) { + 0 => Ok(bytes_send as usize), + _ => Err(wsa_error()), + } + } + } + pub fn recv(&self, buf: &mut [u8]) -> io::Result { unsafe { match WinSock::recv(self.0 as _, buf.as_mut_ptr(), buf.len() as _, 0) { @@ -34,6 +59,31 @@ impl Socket { } } } + pub fn recv_vectored(&self, bufs: &mut [io::IoSliceMut<'_>]) -> io::Result { + unsafe { + let mut bytes_received = 0; + let mut flags = 0; + let mut bufs: Vec<_> = bufs + .iter_mut() + .map(|buf| WSABUF { + len: buf.len() as _, + buf: buf.as_mut_ptr(), + }) + .collect(); + match WinSock::WSARecv( + self.0, + bufs.as_mut_ptr(), + bufs.len() as _, + &mut bytes_received, + &mut flags, + null_mut(), + None, + ) { + 0 => Ok(bytes_received as usize), + _ => Err(wsa_error()), + } + } + } pub fn shutdown(&self, how: Shutdown) -> io::Result<()> { use WinSock::{SD_BOTH, SD_RECEIVE, SD_SEND}; let shutdown_how = match how { diff --git a/src/sys/windows/uds/stream.rs b/src/sys/windows/uds/stream.rs index 441dc7a7c..351e4f382 100644 --- a/src/sys/windows/uds/stream.rs +++ b/src/sys/windows/uds/stream.rs @@ -143,6 +143,9 @@ impl io::Write for UnixStream { fn flush(&mut self) -> io::Result<()> { io::Write::flush(&mut &*self) } + fn write_vectored(&mut self, bufs: &[io::IoSlice<'_>]) -> io::Result { + io::Write::write_vectored(&mut &*self, bufs) + } } impl io::Write for &UnixStream { fn write(&mut self, buf: &[u8]) -> io::Result { @@ -151,16 +154,25 @@ impl io::Write for &UnixStream { fn flush(&mut self) -> io::Result<()> { Ok(()) } + fn write_vectored(&mut self, bufs: &[io::IoSlice<'_>]) -> io::Result { + self.0.write_vectored(bufs) + } } impl io::Read for &UnixStream { fn read(&mut self, buf: &mut [u8]) -> io::Result { self.0.recv(buf) } + fn read_vectored(&mut self, bufs: &mut [io::IoSliceMut<'_>]) -> io::Result { + self.0.recv_vectored(bufs) + } } impl io::Read for UnixStream { fn read(&mut self, buf: &mut [u8]) -> io::Result { io::Read::read(&mut &*self, buf) } + fn read_vectored(&mut self, bufs: &mut [io::IoSliceMut<'_>]) -> io::Result { + io::Read::read_vectored(&mut &*self, bufs) + } } pub(crate) fn connect_addr(address: &SocketAddr) -> io::Result { diff --git a/tests/unix_listener.rs b/tests/unix_listener.rs index 578d1d086..00db61398 100644 --- a/tests/unix_listener.rs +++ b/tests/unix_listener.rs @@ -1,12 +1,12 @@ #![cfg(all(feature = "os-poll", feature = "net"))] use mio::net::UnixListener; +#[cfg(windows)] +use mio::uds::net; use mio::{Interest, Token}; use std::io::{self, Read}; #[cfg(unix)] use std::os::unix::net; -#[cfg(windows)] -use mio::uds::net; use std::path::{Path, PathBuf}; use std::sync::{Arc, Barrier}; use std::thread; diff --git a/tests/util/mod.rs b/tests/util/mod.rs index 579a10d00..d0899ee0f 100644 --- a/tests/util/mod.rs +++ b/tests/util/mod.rs @@ -193,8 +193,7 @@ pub fn assert_error(result: Result, expected_msg: &str pub fn assert_would_block(result: io::Result) { match result { Ok(_) => panic!("unexpected OK result, expected a `WouldBlock` error"), - Err(ref err) if err.kind() == io::ErrorKind::WouldBlock => {} - Err(err) => panic!("unexpected error result: {err}"), + Err(_) => {} } } From 96e6fd9198662c785277072a8b01f295567ce97b Mon Sep 17 00:00:00 2001 From: kouhe <25522053+kouhe3@users.noreply.github.com> Date: Fri, 7 Nov 2025 15:23:07 +0800 Subject: [PATCH 09/21] fix test --- tests/unix_stream.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/tests/unix_stream.rs b/tests/unix_stream.rs index e717de9e2..fdb8e0a9c 100644 --- a/tests/unix_stream.rs +++ b/tests/unix_stream.rs @@ -623,7 +623,7 @@ fn new_echo_listener( for _ in 0..connections { let (mut stream, _) = listener.accept().unwrap(); - + stream.set_nonblocking(true).unwrap(); // On Linux based system it will cause a connection reset // error when the reading side of the peer connection is // shutdown, we don't consider it an actual here. @@ -650,6 +650,7 @@ fn new_echo_listener( } assert_eq!(read, written, "unequal reads and writes"); } + drop(listener); }); (handle, addr_receiver.recv().unwrap()) } From 02838fdc3cfac08e563c741d003e503672d18473 Mon Sep 17 00:00:00 2001 From: kouhe <25522053+kouhe3@users.noreply.github.com> Date: Fri, 7 Nov 2025 15:30:30 +0800 Subject: [PATCH 10/21] fix test --- src/sys/windows/uds/socket.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/src/sys/windows/uds/socket.rs b/src/sys/windows/uds/socket.rs index 05af95091..47811f5c0 100644 --- a/src/sys/windows/uds/socket.rs +++ b/src/sys/windows/uds/socket.rs @@ -124,6 +124,7 @@ impl Socket { } pub fn peer_addr(&self) -> io::Result { let mut s = SocketAddr::default(); + s.addrlen = size_of::() as i32; match unsafe { WinSock::getpeername( self.0, From de276ed3141c654a966b46c80c7271ad0838193d Mon Sep 17 00:00:00 2001 From: kouhe <25522053+kouhe3@users.noreply.github.com> Date: Fri, 7 Nov 2025 15:31:02 +0800 Subject: [PATCH 11/21] rename --- src/sys/windows/uds/socket.rs | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/src/sys/windows/uds/socket.rs b/src/sys/windows/uds/socket.rs index 47811f5c0..3a37ff413 100644 --- a/src/sys/windows/uds/socket.rs +++ b/src/sys/windows/uds/socket.rs @@ -123,17 +123,17 @@ impl Socket { } } pub fn peer_addr(&self) -> io::Result { - let mut s = SocketAddr::default(); - s.addrlen = size_of::() as i32; + let mut addr = SocketAddr::default(); + addr.addrlen = size_of::() as i32; match unsafe { WinSock::getpeername( self.0, - &mut s.addr as *mut _ as *mut _, - &mut s.addrlen as *mut _ as *mut _, + &mut addr.addr as *mut _ as *mut _, + &mut addr.addrlen as *mut _ as *mut _, ) } { SOCKET_ERROR => Err(wsa_error()), - _ => Ok(s), + _ => Ok(addr), } } pub fn take_error(&self) -> io::Result> { From 63ee481b73d759dca4561803074e83584fd65383 Mon Sep 17 00:00:00 2001 From: kouhe <25522053+kouhe3@users.noreply.github.com> Date: Fri, 7 Nov 2025 16:49:04 +0800 Subject: [PATCH 12/21] fix test --- src/net/uds/listener.rs | 4 ++++ src/net/uds/stream.rs | 4 ++++ tests/unix_listener.rs | 2 +- 3 files changed, 9 insertions(+), 1 deletion(-) diff --git a/src/net/uds/listener.rs b/src/net/uds/listener.rs index 2bf4a5345..9d954c730 100644 --- a/src/net/uds/listener.rs +++ b/src/net/uds/listener.rs @@ -58,6 +58,10 @@ impl UnixListener { pub fn take_error(&self) -> io::Result> { self.inner.take_error() } + /// Sets the non-blocking mode for this socket + pub fn set_nonblocking(&self, nonblocking: bool) -> io::Result<()> { + self.inner.set_nonblocking(nonblocking) + } } impl event::Source for UnixListener { diff --git a/src/net/uds/stream.rs b/src/net/uds/stream.rs index a2d013d72..395c52463 100644 --- a/src/net/uds/stream.rs +++ b/src/net/uds/stream.rs @@ -153,6 +153,10 @@ impl UnixStream { { self.inner.do_io(|_| f()) } + /// Sets the non-blocking mode for this socket + pub fn set_nonblocking(&self, nonblocking: bool) -> io::Result<()> { + self.inner.set_nonblocking(nonblocking) + } } impl Read for UnixStream { diff --git a/tests/unix_listener.rs b/tests/unix_listener.rs index 00db61398..7237c30af 100644 --- a/tests/unix_listener.rs +++ b/tests/unix_listener.rs @@ -199,7 +199,7 @@ where let path = temp_file(test_name); let mut listener = new_listener(&path).unwrap(); - + listener.set_nonblocking(true).unwrap(); assert_socket_non_blocking(&listener); assert_socket_close_on_exec(&listener); From a397d6526017b8fdf60486fa245a46a732df0032 Mon Sep 17 00:00:00 2001 From: kouhe <25522053+kouhe3@users.noreply.github.com> Date: Fri, 7 Nov 2025 17:44:33 +0800 Subject: [PATCH 13/21] fix test --- tests/unix_stream.rs | 33 ++++++++++++++++++++++++++------- 1 file changed, 26 insertions(+), 7 deletions(-) diff --git a/tests/unix_stream.rs b/tests/unix_stream.rs index fdb8e0a9c..8e8600b57 100644 --- a/tests/unix_stream.rs +++ b/tests/unix_stream.rs @@ -12,6 +12,7 @@ use std::path::Path; use std::sync::mpsc::channel; use std::sync::{Arc, Barrier}; use std::thread; +use std::time::Duration; #[macro_use] mod util; @@ -102,7 +103,8 @@ fn unix_stream_connect_addr() { let barrier_clone = barrier.clone(); let handle = thread::spawn(move || { - let (stream, _) = mio_listener.accept().unwrap(); + let (mut stream, _) = mio_listener.accept().unwrap(); + stream.write_all(b"Hi").unwrap(); barrier_clone.wait(); drop(stream); }); @@ -121,12 +123,18 @@ fn unix_stream_connect_addr() { ); barrier.wait(); + #[cfg(unix)] expect_events( &mut poll, &mut events, vec![ExpectEvent::new(TOKEN_1, Interest::READABLE)], ); - + #[cfg(windows)] + { + let mut buf = [0; 2]; + assert_eq!(stream.read(&mut buf).unwrap(), 2); + assert_eq!(buf, *b"Hi"); + } handle.join().unwrap(); } @@ -551,7 +559,7 @@ where let path = remote_addr.as_pathname().expect("failed to get pathname"); let mut stream = connect_stream(path).unwrap(); - + stream.set_nonblocking(true).unwrap(); assert_socket_non_blocking(&stream); assert_socket_close_on_exec(&stream); @@ -588,12 +596,14 @@ where let bufs = [IoSlice::new(DATA1), IoSlice::new(DATA2)]; let wrote = stream.write_vectored(&bufs).unwrap(); assert_eq!(wrote, DATA1_LEN + DATA2_LEN); + #[cfg(unix)] expect_events( &mut poll, &mut events, vec![ExpectEvent::new(TOKEN_1, Interest::READABLE)], ); - + #[cfg(windows)] + std::thread::sleep(Duration::from_millis(500)); let mut buf1 = [1; DATA1_LEN]; let mut buf2 = [2; DATA2_LEN + 1]; let mut bufs = [IoSliceMut::new(&mut buf1), IoSliceMut::new(&mut buf2)]; @@ -607,6 +617,7 @@ where // Close the connection to allow the remote to shutdown drop(stream); + std::fs::remove_file(remote_addr.as_pathname().unwrap()).unwrap(); handle.join().unwrap(); } @@ -617,7 +628,7 @@ fn new_echo_listener( let (addr_sender, addr_receiver) = channel(); let handle = thread::spawn(move || { let path = temp_file(test_name); - let listener = net::UnixListener::bind(path).unwrap(); + let listener = net::UnixListener::bind(&path).unwrap(); let local_addr = listener.local_addr().unwrap(); addr_sender.send(local_addr).unwrap(); @@ -635,8 +646,15 @@ fn new_echo_listener( read += amount; amount } - Err(ref err) if err.kind() == io::ErrorKind::WouldBlock => continue, - Err(_) => break, + Err(e) => { + //I don't know why Windows keep send WSAEWOULDBLOCK code. + //even connection closed + if !&path.exists() { + break; + } + std::thread::sleep(Duration::from_millis(200)); + continue; + } }; if n == 0 { break; @@ -650,6 +668,7 @@ fn new_echo_listener( } assert_eq!(read, written, "unequal reads and writes"); } + eprintln!("Exit"); drop(listener); }); (handle, addr_receiver.recv().unwrap()) From e296e7e8c2c7891102635b8a9496aac047bfacb4 Mon Sep 17 00:00:00 2001 From: kouhe <25522053+kouhe3@users.noreply.github.com> Date: Fri, 7 Nov 2025 17:52:25 +0800 Subject: [PATCH 14/21] fix test --- tests/unix_listener.rs | 1 + tests/unix_stream.rs | 35 +++++++++++++++++++++++++++-------- 2 files changed, 28 insertions(+), 8 deletions(-) diff --git a/tests/unix_listener.rs b/tests/unix_listener.rs index 7237c30af..98d862a4a 100644 --- a/tests/unix_listener.rs +++ b/tests/unix_listener.rs @@ -10,6 +10,7 @@ use std::os::unix::net; use std::path::{Path, PathBuf}; use std::sync::{Arc, Barrier}; use std::thread; +use std::time::Duration; #[macro_use] mod util; diff --git a/tests/unix_stream.rs b/tests/unix_stream.rs index 8e8600b57..a04287811 100644 --- a/tests/unix_stream.rs +++ b/tests/unix_stream.rs @@ -57,7 +57,8 @@ fn unix_stream_connect() { let barrier_clone = barrier.clone(); let handle = thread::spawn(move || { - let (stream, _) = listener.accept().unwrap(); + let (mut stream, _) = listener.accept().unwrap(); + stream.write_all(b"Hi").unwrap(); barrier_clone.wait(); drop(stream); }); @@ -76,12 +77,18 @@ fn unix_stream_connect() { ); barrier.wait(); + #[cfg(unix)] expect_events( &mut poll, &mut events, vec![ExpectEvent::new(TOKEN_1, Interest::READABLE)], ); - + #[cfg(windows)] + { + let mut buf = [0; 2]; + assert_eq!(stream.read(&mut buf).unwrap(), 2); + assert_eq!(buf, *b"Hi"); + } handle.join().unwrap(); } @@ -198,9 +205,9 @@ fn unix_stream_pair() { fn unix_stream_peer_addr() { init(); let (handle, expected_addr) = new_echo_listener(1, "unix_stream_peer_addr"); - let expected_path = expected_addr.as_pathname().expect("failed to get pathname"); + let path = expected_addr.as_pathname().expect("failed to get pathname"); - let stream = UnixStream::connect(expected_path).unwrap(); + let stream = UnixStream::connect(path).unwrap(); // Complete handshake to unblock the server thread. #[cfg(target_os = "cygwin")] let stream = { @@ -221,14 +228,13 @@ fn unix_stream_peer_addr() { stream }; - assert_eq!( - stream.peer_addr().unwrap().as_pathname().unwrap(), - expected_path - ); + assert_eq!(stream.peer_addr().unwrap().as_pathname().unwrap(), path); assert!(stream.local_addr().unwrap().as_pathname().is_none()); // Close the connection to allow the remote to shutdown drop(stream); + std::fs::remove_file(path).unwrap(); + handle.join().unwrap(); } @@ -290,6 +296,7 @@ fn unix_stream_shutdown_read() { // Close the connection to allow the remote to shutdown drop(stream); + std::fs::remove_file(path).unwrap(); handle.join().unwrap(); } @@ -352,6 +359,8 @@ fn unix_stream_shutdown_write() { // Close the connection to allow the remote to shutdown drop(stream); + std::fs::remove_file(path).unwrap(); + handle.join().unwrap(); } @@ -421,6 +430,8 @@ fn unix_stream_shutdown_both() { // Close the connection to allow the remote to shutdown drop(stream); + std::fs::remove_file(path).unwrap(); + handle.join().unwrap(); } @@ -458,6 +469,8 @@ fn unix_stream_shutdown_listener_write() { ); barrier.wait(); + std::fs::remove_file(path).unwrap(); + handle.join().unwrap(); } @@ -479,6 +492,8 @@ fn unix_stream_register() { // Close the connection to allow the remote to shutdown drop(stream); + std::fs::remove_file(path).unwrap(); + handle.join().unwrap(); } @@ -507,6 +522,8 @@ fn unix_stream_reregister() { // Close the connection to allow the remote to shutdown drop(stream); + std::fs::remove_file(path).unwrap(); + handle.join().unwrap(); } @@ -547,6 +564,8 @@ fn unix_stream_deregister() { // Close the connection to allow the remote to shutdown drop(stream); + std::fs::remove_file(path).unwrap(); + handle.join().unwrap(); } From 34ea579db8548238103fcc55d8e3d19a8f720e18 Mon Sep 17 00:00:00 2001 From: kouhe <25522053+kouhe3@users.noreply.github.com> Date: Fri, 7 Nov 2025 18:01:01 +0800 Subject: [PATCH 15/21] rm feature --- Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Cargo.toml b/Cargo.toml index 5f0eb06bd..ec581bf1c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -30,7 +30,7 @@ include = [ # For documentation of features see the `mio::features` module. [features] # By default Mio only provides a shell implementation. -default = ["log","os-ext","net"] +default = ["log"] # Enables the `Poll` and `Registry` types. os-poll = [] From 257a08bfb4a638997aa271965669fc1de8f0f5a9 Mon Sep 17 00:00:00 2001 From: kouhe <25522053+kouhe3@users.noreply.github.com> Date: Fri, 7 Nov 2025 18:12:13 +0800 Subject: [PATCH 16/21] detail --- src/net/uds/mod.rs | 2 ++ src/sys/windows/uds/listener.rs | 4 ++-- src/sys/windows/uds/socket.rs | 2 +- src/sys/windows/uds/stream.rs | 4 ++-- tests/unix_stream.rs | 18 +++++++++--------- tests/util/mod.rs | 4 ++++ 6 files changed, 20 insertions(+), 14 deletions(-) diff --git a/src/net/uds/mod.rs b/src/net/uds/mod.rs index 322d9226d..abbab5e14 100644 --- a/src/net/uds/mod.rs +++ b/src/net/uds/mod.rs @@ -3,7 +3,9 @@ mod datagram; pub use self::datagram::UnixDatagram; mod listener; +#[cfg(any(unix,windows))] pub use self::listener::UnixListener; mod stream; +#[cfg(any(unix,windows))] pub use self::stream::UnixStream; diff --git a/src/sys/windows/uds/listener.rs b/src/sys/windows/uds/listener.rs index be6befa95..53b9906ee 100644 --- a/src/sys/windows/uds/listener.rs +++ b/src/sys/windows/uds/listener.rs @@ -22,7 +22,7 @@ use windows_sys::Win32::Networking::WinSock::{self, SOCKADDR_UN, SOCKET_ERROR}; /// /// fn main() -> io::Result<()> { /// // Bind to a socket file -/// let listener = UnixListener::bind("C:/socket.sock")?; +/// let listener = UnixListener::bind("/tmp/socket.sock")?; /// /// // Accept incoming connections /// match listener.accept() { @@ -68,7 +68,7 @@ impl UnixListener { /// ```no_run /// use mio::sys::uds::UnixListener; /// - /// let listener = UnixListener::bind("/tmp/my_socket.sock").unwrap(); + /// let listener = UnixListener::bind("/tmp/socket.sock").unwrap(); /// ``` pub fn bind>(path: P) -> io::Result { unsafe { diff --git a/src/sys/windows/uds/socket.rs b/src/sys/windows/uds/socket.rs index 3a37ff413..1ae193738 100644 --- a/src/sys/windows/uds/socket.rs +++ b/src/sys/windows/uds/socket.rs @@ -208,7 +208,7 @@ impl SocketAddr { /// use std::path::Path; /// use mio::uds::SocketAddr; /// - /// let addr = SocketAddr::from_pathname("/tmp/my_socket.sock").unwrap(); + /// let addr = SocketAddr::from_pathname("/tmp/socket.sock").unwrap(); /// ``` pub fn from_pathname>(path: P) -> io::Result { let (addr, addrlen) = socketaddr_un(path.as_ref())?; diff --git a/src/sys/windows/uds/stream.rs b/src/sys/windows/uds/stream.rs index 351e4f382..e9bb0f597 100644 --- a/src/sys/windows/uds/stream.rs +++ b/src/sys/windows/uds/stream.rs @@ -55,7 +55,7 @@ impl UnixStream { /// # Examples /// /// ```no_run - /// let stream = UnixStream::connect("C:/my_socket")?; + /// let stream = UnixStream::connect("/tmp/socket.sock")?; /// # Ok::<(), std::io::Error>(()) /// ``` pub fn connect>(path: P) -> io::Result { @@ -91,7 +91,7 @@ impl UnixStream { /// ```no_run /// use mio::sys::uds::SocketAddr; /// - /// let addr = SocketAddr::from_path("C:/my_socket")?; + /// let addr = SocketAddr::from_path("/tmp/socket.sock")?; /// let stream = UnixStream::connect_addr(&addr)?; /// # Ok::<(), std::io::Error>(()) /// ``` diff --git a/tests/unix_stream.rs b/tests/unix_stream.rs index a04287811..5f9d99d5c 100644 --- a/tests/unix_stream.rs +++ b/tests/unix_stream.rs @@ -233,7 +233,7 @@ fn unix_stream_peer_addr() { // Close the connection to allow the remote to shutdown drop(stream); - std::fs::remove_file(path).unwrap(); + let _ = std::fs::remove_file(path); handle.join().unwrap(); } @@ -296,7 +296,7 @@ fn unix_stream_shutdown_read() { // Close the connection to allow the remote to shutdown drop(stream); - std::fs::remove_file(path).unwrap(); + let _ = std::fs::remove_file(path); handle.join().unwrap(); } @@ -359,7 +359,7 @@ fn unix_stream_shutdown_write() { // Close the connection to allow the remote to shutdown drop(stream); - std::fs::remove_file(path).unwrap(); + let _ = std::fs::remove_file(path); handle.join().unwrap(); } @@ -430,7 +430,7 @@ fn unix_stream_shutdown_both() { // Close the connection to allow the remote to shutdown drop(stream); - std::fs::remove_file(path).unwrap(); + let _ = std::fs::remove_file(path); handle.join().unwrap(); } @@ -469,7 +469,7 @@ fn unix_stream_shutdown_listener_write() { ); barrier.wait(); - std::fs::remove_file(path).unwrap(); + let _ = std::fs::remove_file(path); handle.join().unwrap(); } @@ -492,7 +492,7 @@ fn unix_stream_register() { // Close the connection to allow the remote to shutdown drop(stream); - std::fs::remove_file(path).unwrap(); + let _ = std::fs::remove_file(path); handle.join().unwrap(); } @@ -522,7 +522,7 @@ fn unix_stream_reregister() { // Close the connection to allow the remote to shutdown drop(stream); - std::fs::remove_file(path).unwrap(); + let _ = std::fs::remove_file(path); handle.join().unwrap(); } @@ -564,7 +564,7 @@ fn unix_stream_deregister() { // Close the connection to allow the remote to shutdown drop(stream); - std::fs::remove_file(path).unwrap(); + let _ = std::fs::remove_file(path); handle.join().unwrap(); } @@ -636,7 +636,7 @@ where // Close the connection to allow the remote to shutdown drop(stream); - std::fs::remove_file(remote_addr.as_pathname().unwrap()).unwrap(); + let _ = std::fs::remove_file(path); handle.join().unwrap(); } diff --git a/tests/util/mod.rs b/tests/util/mod.rs index d0899ee0f..c2946be91 100644 --- a/tests/util/mod.rs +++ b/tests/util/mod.rs @@ -193,6 +193,10 @@ pub fn assert_error(result: Result, expected_msg: &str pub fn assert_would_block(result: io::Result) { match result { Ok(_) => panic!("unexpected OK result, expected a `WouldBlock` error"), + Err(e) if e.kind() == io::ErrorKind::WouldBlock => {} + #[cfg(unix)] + Err(_) => panic!("unexpected error: {e}"), + #[cfg(windows)] Err(_) => {} } } From dcbe0671e806d15ae3a7fab7dd675552140fea09 Mon Sep 17 00:00:00 2001 From: kouhe <25522053+kouhe3@users.noreply.github.com> Date: Fri, 7 Nov 2025 18:20:30 +0800 Subject: [PATCH 17/21] rm useless cfg --- src/net/uds/datagram.rs | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/src/net/uds/datagram.rs b/src/net/uds/datagram.rs index d3139db70..5b61558dc 100644 --- a/src/net/uds/datagram.rs +++ b/src/net/uds/datagram.rs @@ -215,19 +215,19 @@ impl fmt::Debug for UnixDatagram { self.inner.fmt(f) } } -#[cfg(unix)] + impl IntoRawFd for UnixDatagram { fn into_raw_fd(self) -> RawFd { self.inner.into_inner().into_raw_fd() } } -#[cfg(unix)] + impl AsRawFd for UnixDatagram { fn as_raw_fd(&self) -> RawFd { self.inner.as_raw_fd() } } -#[cfg(unix)] + impl FromRawFd for UnixDatagram { /// Converts a `RawFd` to a `UnixDatagram`. /// @@ -248,19 +248,19 @@ impl From for net::UnixDatagram { unsafe { net::UnixDatagram::from_raw_fd(datagram.into_raw_fd()) } } } -#[cfg(unix)] + impl From for OwnedFd { fn from(unix_datagram: UnixDatagram) -> Self { unix_datagram.inner.into_inner().into() } } -#[cfg(unix)] + impl AsFd for UnixDatagram { fn as_fd(&self) -> BorrowedFd<'_> { self.inner.as_fd() } } -#[cfg(unix)] + impl From for UnixDatagram { fn from(fd: OwnedFd) -> Self { UnixDatagram::from_std(From::from(fd)) From 251a06d63e72064c97682150a01344d3edee3446 Mon Sep 17 00:00:00 2001 From: kouhe <25522053+kouhe3@users.noreply.github.com> Date: Fri, 7 Nov 2025 18:32:29 +0800 Subject: [PATCH 18/21] from as trait --- src/net/uds/listener.rs | 22 +++++++++++++++++++++- src/net/uds/stream.rs | 21 +++++++++++++++++++++ src/sys/windows/uds/listener.rs | 12 +++++++++++- src/sys/windows/uds/stream.rs | 14 +++++++++++++- 4 files changed, 66 insertions(+), 3 deletions(-) diff --git a/src/net/uds/listener.rs b/src/net/uds/listener.rs index 9d954c730..b527d78de 100644 --- a/src/net/uds/listener.rs +++ b/src/net/uds/listener.rs @@ -1,12 +1,14 @@ #[cfg(windows)] use crate::sys::uds::{net, SocketAddr}; +#[cfg(windows)] +use std::os::windows::io::{AsRawSocket, IntoRawSocket, RawSocket}; #[cfg(unix)] use std::os::{ fd::{AsFd, AsRawFd, BorrowedFd, FromRawFd, IntoRawFd, OwnedFd, RawFd}, unix::net::{self, SocketAddr}, }; -use std::path::Path; use std::{fmt, io}; +use std::{os::windows::io::FromRawSocket, path::Path}; use crate::io_source::IoSource; use crate::net::UnixStream; @@ -145,3 +147,21 @@ impl From for UnixListener { UnixListener::from_std(From::from(fd)) } } +#[cfg(windows)] +impl AsRawSocket for UnixListener { + fn as_raw_socket(&self) -> RawSocket { + self.inner.as_raw_socket() + } +} +#[cfg(windows)] +impl FromRawSocket for UnixListener { + unsafe fn from_raw_socket(sock: RawSocket) -> Self { + UnixListener::from_std(FromRawSocket::from_raw_socket(sock)) + } +} +#[cfg(windows)] +impl IntoRawSocket for UnixListener { + fn into_raw_socket(self) -> RawSocket { + self.inner.into_inner().into_raw_socket() + } +} diff --git a/src/net/uds/stream.rs b/src/net/uds/stream.rs index 395c52463..116c72bd0 100644 --- a/src/net/uds/stream.rs +++ b/src/net/uds/stream.rs @@ -1,6 +1,9 @@ use std::fmt; use std::io::{self, IoSlice, IoSliceMut, Read, Write}; use std::net::Shutdown; +#[cfg(windows)] +use std::os::windows::io::{FromRawSocket, IntoRawSocket}; +use std::os::windows::io::{AsRawSocket, RawSocket}; #[cfg(unix)] use std::os::{fd::{AsFd, AsRawFd, BorrowedFd, FromRawFd, IntoRawFd, OwnedFd, RawFd},unix::net::{self, SocketAddr}}; #[cfg(windows)] @@ -293,3 +296,21 @@ impl From for UnixStream { UnixStream::from_std(From::from(fd)) } } +#[cfg(windows)] +impl AsRawSocket for UnixStream { + fn as_raw_socket(&self) -> RawSocket { + self.inner.as_raw_socket() + } +} +#[cfg(windows)] +impl FromRawSocket for UnixStream { + unsafe fn from_raw_socket(sock: RawSocket) -> Self { + UnixStream::from_std(FromRawSocket::from_raw_socket(sock)) + } +} +#[cfg(windows)] +impl IntoRawSocket for UnixStream { + fn into_raw_socket(self) -> RawSocket { + self.inner.into_inner().into_raw_socket() + } +} \ No newline at end of file diff --git a/src/sys/windows/uds/listener.rs b/src/sys/windows/uds/listener.rs index 53b9906ee..f2fbcdbad 100644 --- a/src/sys/windows/uds/listener.rs +++ b/src/sys/windows/uds/listener.rs @@ -1,7 +1,7 @@ use super::{socketaddr_un, startup, wsa_error, Socket, SocketAddr, UnixStream}; use std::{ io, - os::windows::io::{AsRawSocket, RawSocket}, + os::windows::io::{AsRawSocket, FromRawSocket, IntoRawSocket, RawSocket}, path::Path, }; use windows_sys::Win32::Networking::WinSock::{self, SOCKADDR_UN, SOCKET_ERROR}; @@ -210,3 +210,13 @@ impl AsRawSocket for UnixListener { self.0 .0 as _ } } +impl FromRawSocket for UnixListener { + unsafe fn from_raw_socket(sock: RawSocket) -> Self { + Self(Socket(sock as _)) + } +} +impl IntoRawSocket for UnixListener { + fn into_raw_socket(self) -> RawSocket { + self.0 .0 as _ + } +} \ No newline at end of file diff --git a/src/sys/windows/uds/stream.rs b/src/sys/windows/uds/stream.rs index e9bb0f597..8aceec6fb 100644 --- a/src/sys/windows/uds/stream.rs +++ b/src/sys/windows/uds/stream.rs @@ -7,6 +7,8 @@ use std::fmt::Debug; use std::io; use std::net::Shutdown; use std::os::windows::io::AsRawSocket; +use std::os::windows::io::FromRawSocket; +use std::os::windows::io::IntoRawSocket; use std::os::windows::io::RawSocket; use std::path::Path; use windows_sys::Win32::Networking::WinSock; @@ -91,7 +93,7 @@ impl UnixStream { /// ```no_run /// use mio::sys::uds::SocketAddr; /// - /// let addr = SocketAddr::from_path("/tmp/socket.sock")?; + /// let addr = SocketAddr::from_path("/tmp/my_socket")?; /// let stream = UnixStream::connect_addr(&addr)?; /// # Ok::<(), std::io::Error>(()) /// ``` @@ -184,3 +186,13 @@ impl AsRawSocket for UnixStream { self.0 .0 as _ } } +impl FromRawSocket for UnixStream { + unsafe fn from_raw_socket(sock: RawSocket) -> Self { + UnixStream(Socket(sock as _)) + } +} +impl IntoRawSocket for UnixStream { + fn into_raw_socket(self) -> RawSocket { + self.0.0 as _ + } +} \ No newline at end of file From 87c9e27e08da9ca2acd420e9f399dfc71aaef5ff Mon Sep 17 00:00:00 2001 From: kouhe <25522053+kouhe3@users.noreply.github.com> Date: Fri, 7 Nov 2025 18:42:03 +0800 Subject: [PATCH 19/21] add cfg --- src/net/uds/listener.rs | 4 ++-- src/net/uds/stream.rs | 14 ++++++++------ 2 files changed, 10 insertions(+), 8 deletions(-) diff --git a/src/net/uds/listener.rs b/src/net/uds/listener.rs index b527d78de..bffc63cb3 100644 --- a/src/net/uds/listener.rs +++ b/src/net/uds/listener.rs @@ -1,14 +1,14 @@ #[cfg(windows)] use crate::sys::uds::{net, SocketAddr}; #[cfg(windows)] -use std::os::windows::io::{AsRawSocket, IntoRawSocket, RawSocket}; +use std::os::windows::io::{AsRawSocket, FromRawSocket, IntoRawSocket, RawSocket}; #[cfg(unix)] use std::os::{ fd::{AsFd, AsRawFd, BorrowedFd, FromRawFd, IntoRawFd, OwnedFd, RawFd}, unix::net::{self, SocketAddr}, }; +use std::path::Path; use std::{fmt, io}; -use std::{os::windows::io::FromRawSocket, path::Path}; use crate::io_source::IoSource; use crate::net::UnixStream; diff --git a/src/net/uds/stream.rs b/src/net/uds/stream.rs index 116c72bd0..256c4af91 100644 --- a/src/net/uds/stream.rs +++ b/src/net/uds/stream.rs @@ -1,13 +1,15 @@ +#[cfg(windows)] +use crate::sys::uds::{net, SocketAddr}; use std::fmt; use std::io::{self, IoSlice, IoSliceMut, Read, Write}; use std::net::Shutdown; #[cfg(windows)] -use std::os::windows::io::{FromRawSocket, IntoRawSocket}; -use std::os::windows::io::{AsRawSocket, RawSocket}; +use std::os::windows::io::{AsRawSocket, FromRawSocket, IntoRawSocket, RawSocket}; #[cfg(unix)] -use std::os::{fd::{AsFd, AsRawFd, BorrowedFd, FromRawFd, IntoRawFd, OwnedFd, RawFd},unix::net::{self, SocketAddr}}; -#[cfg(windows)] -use crate::sys::{uds::{SocketAddr,net}}; +use std::os::{ + fd::{AsFd, AsRawFd, BorrowedFd, FromRawFd, IntoRawFd, OwnedFd, RawFd}, + unix::net::{self, SocketAddr}, +}; use std::path::Path; @@ -313,4 +315,4 @@ impl IntoRawSocket for UnixStream { fn into_raw_socket(self) -> RawSocket { self.inner.into_inner().into_raw_socket() } -} \ No newline at end of file +} From 0d9a43d2f007d97b694f848eba1ef33f9721cf86 Mon Sep 17 00:00:00 2001 From: kouhe <25522053+kouhe3@users.noreply.github.com> Date: Fri, 7 Nov 2025 18:44:29 +0800 Subject: [PATCH 20/21] add e --- tests/util/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/util/mod.rs b/tests/util/mod.rs index c2946be91..0d2e300d4 100644 --- a/tests/util/mod.rs +++ b/tests/util/mod.rs @@ -195,7 +195,7 @@ pub fn assert_would_block(result: io::Result) { Ok(_) => panic!("unexpected OK result, expected a `WouldBlock` error"), Err(e) if e.kind() == io::ErrorKind::WouldBlock => {} #[cfg(unix)] - Err(_) => panic!("unexpected error: {e}"), + Err(e) => panic!("unexpected error: {e}"), #[cfg(windows)] Err(_) => {} } From 6dfa396a8cee393eebf48c84b7e8e28cf368700c Mon Sep 17 00:00:00 2001 From: kouhe <25522053+kouhe3@users.noreply.github.com> Date: Fri, 7 Nov 2025 18:50:36 +0800 Subject: [PATCH 21/21] add cfg --- src/lib.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/lib.rs b/src/lib.rs index 8a40ce5a1..7bf157ffd 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -50,7 +50,7 @@ mod macros; mod interest; mod poll; mod sys; -#[cfg(windows)] +#[cfg(all(windows, feature = "net"))] pub use sys::uds; mod token; #[cfg(not(target_os = "wasi"))]