diff --git a/pingora-core/Cargo.toml b/pingora-core/Cargo.toml index 03015d26..bd8f740a 100644 --- a/pingora-core/Cargo.toml +++ b/pingora-core/Cargo.toml @@ -75,7 +75,7 @@ lru = { workspace = true, optional = true } [target.'cfg(unix)'.dependencies] daemonize = "0.5.0" -nix = "~0.24.3" +nix = { version = "~0.31.1", features = ["socket", "net", "fs", "uio"] } [target.'cfg(windows)'.dependencies] windows-sys = { version = "0.59.0", features = ["Win32_Networking_WinSock"] } diff --git a/pingora-core/src/protocols/l4/socket.rs b/pingora-core/src/protocols/l4/socket.rs index 46decd2f..fd098c34 100644 --- a/pingora-core/src/protocols/l4/socket.rs +++ b/pingora-core/src/protocols/l4/socket.rs @@ -66,7 +66,7 @@ impl SocketAddr { fn from_sockaddr_storage(sock: &SockaddrStorage) -> Option { if let Some(v4) = sock.as_sockaddr_in() { return Some(SocketAddr::Inet(StdSockAddr::V4( - std::net::SocketAddrV4::new(v4.ip().into(), v4.port()), + std::net::SocketAddrV4::new(v4.ip(), v4.port()), ))); } else if let Some(v6) = sock.as_sockaddr_in6() { return Some(SocketAddr::Inet(StdSockAddr::V6( diff --git a/pingora-core/src/protocols/l4/stream.rs b/pingora-core/src/protocols/l4/stream.rs index 59ec3f60..da7f5a26 100644 --- a/pingora-core/src/protocols/l4/stream.rs +++ b/pingora-core/src/protocols/l4/stream.rs @@ -177,7 +177,7 @@ impl RawStreamWrapper { #[cfg(target_os = "linux")] enable_rx_ts: false, #[cfg(target_os = "linux")] - reusable_cmsg_space: nix::cmsg_space!(nix::sys::time::TimeSpec), + reusable_cmsg_space: nix::cmsg_space!(nix::sys::socket::Timestamps), } } @@ -240,7 +240,8 @@ impl AsyncRead for RawStreamWrapper { as *mut [u8]) }; let mut iov = [IoSliceMut::new(b)]; - rs_wrapper.reusable_cmsg_space.clear(); + + rs_wrapper.reusable_cmsg_space.fill(0); match s.try_io(Interest::READABLE, || { recvmsg::( @@ -253,7 +254,7 @@ impl AsyncRead for RawStreamWrapper { }) { Ok(r) => { if let Some(ControlMessageOwned::ScmTimestampsns(rtime)) = r - .cmsgs() + .cmsgs()? .find(|i| matches!(i, ControlMessageOwned::ScmTimestampsns(_))) { // The returned timestamp is a real (i.e. not monotonic) timestamp @@ -432,7 +433,7 @@ impl Stream { if let RawStream::Tcp(s) = &self.stream_mut().get_mut().stream { let timestamp_options = TimestampingFlag::SOF_TIMESTAMPING_RX_SOFTWARE | TimestampingFlag::SOF_TIMESTAMPING_SOFTWARE; - setsockopt(s.as_raw_fd(), sockopt::Timestamping, ×tamp_options) + setsockopt(&s, sockopt::Timestamping, ×tamp_options) .or_err(InternalError, "failed to set SOF_TIMESTAMPING_RX_SOFTWARE")?; self.stream_mut().get_mut().enable_rx_ts(true); } diff --git a/pingora-core/src/server/transfer_fd/mod.rs b/pingora-core/src/server/transfer_fd/mod.rs index c851eb48..43734f2e 100644 --- a/pingora-core/src/server/transfer_fd/mod.rs +++ b/pingora-core/src/server/transfer_fd/mod.rs @@ -16,7 +16,7 @@ use log::{debug, error, warn}; use nix::errno::Errno; #[cfg(target_os = "linux")] -use nix::sys::socket::{self, AddressFamily, RecvMsg, SockFlag, SockType, UnixAddr}; +use nix::sys::socket::{self, AddressFamily, Backlog, RecvMsg, SockFlag, SockType, UnixAddr}; #[cfg(target_os = "linux")] use nix::sys::stat; use nix::{Error, NixPath}; @@ -24,6 +24,8 @@ use std::collections::HashMap; use std::io::Write; #[cfg(target_os = "linux")] use std::io::{IoSlice, IoSliceMut}; +#[cfg(target_os = "linux")] +use std::os::fd::{AsRawFd, BorrowedFd}; use std::os::unix::io::RawFd; #[cfg(target_os = "linux")] use std::{thread, time}; @@ -127,20 +129,20 @@ where // TODO: warn if exist but not able to unlink } }; - socket::bind(listen_fd, &unix_addr).unwrap(); + socket::bind(listen_fd.as_raw_fd(), &unix_addr).unwrap(); /* sock is created before we change user, need to give permission to all */ stat::fchmodat( - None, + unsafe { BorrowedFd::borrow_raw(libc::AT_FDCWD) }, path, stat::Mode::all(), stat::FchmodatFlags::FollowSymlink, ) .unwrap(); - socket::listen(listen_fd, 8).unwrap(); + socket::listen(&listen_fd, Backlog::new(8).unwrap()).unwrap(); - let fd = match accept_with_retry_timeout(listen_fd, max_retry) { + let fd = match accept_with_retry_timeout(listen_fd.as_raw_fd(), max_retry) { Ok(fd) => fd, Err(e) => { error!("Giving up reading socket from: {path}, error: {e:?}"); @@ -163,7 +165,7 @@ where .unwrap(); let mut fds: Vec = Vec::new(); - for cmsg in msg.cmsgs() { + for cmsg in msg.cmsgs()? { if let socket::ControlMessageOwned::ScmRights(mut vec_fds) = cmsg { fds.append(&mut vec_fds) } else { @@ -250,7 +252,7 @@ where let mut nonblocking_polls = 0; let conn_result: Result = loop { - match socket::connect(send_fd, &unix_addr) { + match socket::connect(send_fd.as_raw_fd(), &unix_addr) { Ok(_) => break Ok(0), Err(e) => match e { /* If the new process hasn't created the upgrade sock we'll get an ENOENT. @@ -295,7 +297,7 @@ where let cmsg = [scm; 1]; loop { match socket::sendmsg( - send_fd, + send_fd.as_raw_fd(), &io_vec, &cmsg, socket::MsgFlags::empty(), @@ -347,6 +349,8 @@ where #[cfg(test)] #[cfg(target_os = "linux")] mod tests { + use std::os::fd::AsRawFd; + use super::*; use log::{debug, error}; @@ -415,7 +419,7 @@ mod tests { assert_eq!(1, buf[31]); }); - let fds = vec![dumb_fd]; + let fds = vec![dumb_fd.as_raw_fd()]; let buf: [u8; 128] = [1; 128]; match send_fds_to(fds, &buf, "/tmp/pingora_fds_receive.sock", None) { Ok(sent) => { @@ -442,7 +446,7 @@ mod tests { None, ) .unwrap(); - fds.add(key1.clone(), dumb_fd1); + fds.add(key1.clone(), dumb_fd1.as_raw_fd()); let key2 = "1.1.1.1:443".to_string(); let dumb_fd2 = socket::socket( AddressFamily::Unix, @@ -451,7 +455,7 @@ mod tests { None, ) .unwrap(); - fds.add(key2.clone(), dumb_fd2); + fds.add(key2.clone(), dumb_fd2.as_raw_fd()); let child = thread::spawn(move || { let mut fds2 = Fds::new(); @@ -478,7 +482,7 @@ mod tests { ) .unwrap(); - let fds = vec![dumb_fd]; + let fds = vec![dumb_fd.as_raw_fd()]; let buf: [u8; 32] = [1; 32]; // Try to send with a custom max_retries of 2