Skip to content

Commit bc264d1

Browse files
authored
fix(driver,unix): set_result for OpenFile & CreateSocket (#701)
* fix(driver,unix): close fd correctly fr OpenFile & CreateSocket * test: cancel OpenFile * fix(driver,poll): imports * fix(driver,poll): set accepted_fd on immediate success
1 parent 182497c commit bc264d1

File tree

9 files changed

+135
-105
lines changed

9 files changed

+135
-105
lines changed

compio-driver/src/sys/iour/op.rs

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ use io_uring::{
1212
types::{Fd, FsyncFlags},
1313
};
1414
use pin_project_lite::pin_project;
15-
use socket2::{SockAddr, SockAddrStorage, socklen_t};
15+
use socket2::{SockAddr, SockAddrStorage, Socket as Socket2, socklen_t};
1616

1717
use super::OpCode;
1818
pub use crate::sys::unix_op::*;
@@ -73,6 +73,12 @@ unsafe impl<S: AsFd> OpCode for OpenFile<S> {
7373
fn call_blocking(self: Pin<&mut Self>) -> io::Result<usize> {
7474
self.call()
7575
}
76+
77+
unsafe fn set_result(self: Pin<&mut Self>, fd: usize) {
78+
// SAFETY: fd is a valid fd returned from kernel
79+
let fd = unsafe { OwnedFd::from_raw_fd(fd as _) };
80+
*self.project().opened_fd = Some(fd);
81+
}
7682
}
7783

7884
unsafe impl OpCode for CloseFile {
@@ -470,6 +476,12 @@ unsafe impl OpCode for CreateSocket {
470476
self.protocol
471477
))? as _)
472478
}
479+
480+
unsafe fn set_result(self: Pin<&mut Self>, fd: usize) {
481+
// SAFETY: fd is a valid fd returned from kernel
482+
let fd = unsafe { Socket2::from_raw_fd(fd as _) };
483+
*self.project().opened_fd = Some(fd);
484+
}
473485
}
474486

475487
unsafe impl<S: AsFd> OpCode for ShutdownSocket<S> {
@@ -511,7 +523,7 @@ unsafe impl<S: AsFd> OpCode for Accept<S> {
511523

512524
unsafe fn set_result(self: Pin<&mut Self>, fd: usize) {
513525
// SAFETY: fd is a valid fd returned from kernel
514-
let fd = unsafe { OwnedFd::from_raw_fd(fd as _) };
526+
let fd = unsafe { Socket2::from_raw_fd(fd as _) };
515527
*self.project().accepted_fd = Some(fd);
516528
}
517529
}

compio-driver/src/sys/poll/op.rs

Lines changed: 49 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ use std::{
44
ffi::CString,
55
io,
66
marker::PhantomPinned,
7-
os::fd::{AsRawFd, FromRawFd, IntoRawFd, OwnedFd},
7+
os::fd::{AsRawFd, FromRawFd, OwnedFd},
88
pin::Pin,
99
task::Poll,
1010
};
@@ -69,8 +69,10 @@ unsafe impl<S: AsFd> OpCode for OpenFile<S> {
6969
Ok(Decision::Blocking)
7070
}
7171

72-
fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
73-
Poll::Ready(self.call())
72+
fn operate(mut self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
73+
let fd = self.as_mut().call()?;
74+
*self.project().opened_fd = Some(unsafe { OwnedFd::from_raw_fd(fd as _) });
75+
Poll::Ready(Ok(fd))
7476
}
7577
}
7678

@@ -638,7 +640,8 @@ impl CreateSocket {
638640
target_os = "cygwin",
639641
)))]
640642
socket.set_nonblocking(true)?;
641-
Ok(socket.into_raw_fd())
643+
*self.project().opened_fd = Some(socket);
644+
Ok(fd)
642645
}
643646
}
644647

@@ -673,40 +676,44 @@ unsafe impl OpCode for CloseSocket {
673676
}
674677

675678
impl<S: AsFd> Accept<S> {
679+
// If the first call succeeds, there won't be another call.
676680
unsafe fn call(self: Pin<&mut Self>) -> libc::c_int {
677681
let this = self.project();
678-
#[cfg(any(
679-
target_os = "android",
680-
target_os = "dragonfly",
681-
target_os = "freebsd",
682-
target_os = "fuchsia",
683-
target_os = "illumos",
684-
target_os = "linux",
685-
target_os = "netbsd",
686-
target_os = "openbsd",
687-
target_os = "cygwin",
688-
))]
689-
unsafe {
690-
libc::accept4(
691-
this.fd.as_fd().as_raw_fd(),
692-
this.buffer as *mut _ as *mut _,
693-
this.addr_len,
694-
libc::SOCK_NONBLOCK | libc::SOCK_CLOEXEC,
695-
)
696-
}
697-
#[cfg(not(any(
698-
target_os = "android",
699-
target_os = "dragonfly",
700-
target_os = "freebsd",
701-
target_os = "fuchsia",
702-
target_os = "illumos",
703-
target_os = "linux",
704-
target_os = "netbsd",
705-
target_os = "openbsd",
706-
target_os = "cygwin",
707-
)))]
708-
{
709-
|| -> io::Result<libc::c_int> {
682+
|| -> io::Result<libc::c_int> {
683+
#[cfg(any(
684+
target_os = "android",
685+
target_os = "dragonfly",
686+
target_os = "freebsd",
687+
target_os = "fuchsia",
688+
target_os = "illumos",
689+
target_os = "linux",
690+
target_os = "netbsd",
691+
target_os = "openbsd",
692+
target_os = "cygwin",
693+
))]
694+
{
695+
let fd = syscall!(libc::accept4(
696+
this.fd.as_fd().as_raw_fd(),
697+
this.buffer as *mut _ as *mut _,
698+
this.addr_len,
699+
libc::SOCK_NONBLOCK | libc::SOCK_CLOEXEC,
700+
))?;
701+
let socket = unsafe { Socket2::from_raw_fd(fd) };
702+
*this.accepted_fd = Some(socket);
703+
Ok(fd)
704+
}
705+
#[cfg(not(any(
706+
target_os = "android",
707+
target_os = "dragonfly",
708+
target_os = "freebsd",
709+
target_os = "fuchsia",
710+
target_os = "illumos",
711+
target_os = "linux",
712+
target_os = "netbsd",
713+
target_os = "openbsd",
714+
target_os = "cygwin",
715+
)))]
716+
{
710717
let fd = syscall!(libc::accept(
711718
this.fd.as_fd().as_raw_fd(),
712719
this.buffer as *mut _ as *mut _,
@@ -715,10 +722,11 @@ impl<S: AsFd> Accept<S> {
715722
let socket = unsafe { Socket2::from_raw_fd(fd) };
716723
socket.set_cloexec(true)?;
717724
socket.set_nonblocking(true)?;
718-
Ok(socket.into_raw_fd())
719-
}()
720-
.unwrap_or(-1)
721-
}
725+
*this.accepted_fd = Some(socket);
726+
Ok(fd)
727+
}
728+
}()
729+
.unwrap_or(-1)
722730
}
723731
}
724732

@@ -733,13 +741,7 @@ unsafe impl<S: AsFd> OpCode for Accept<S> {
733741
}
734742

735743
fn operate(mut self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
736-
let res = syscall!(break self.as_mut().call());
737-
if let Poll::Ready(Ok(fd)) = res {
738-
// Safety: we own the fd returned by accept/accept4
739-
let fd = unsafe { OwnedFd::from_raw_fd(fd as _) };
740-
*self.project().accepted_fd = Some(fd);
741-
}
742-
res
744+
syscall!(break self.as_mut().call())
743745
}
744746
}
745747

compio-driver/src/sys/unix_op.rs

Lines changed: 40 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ use libc::{ftruncate as ftruncate64, off_t as off64_t};
2727
))]
2828
use libc::{ftruncate64, off64_t};
2929
use pin_project_lite::pin_project;
30-
use socket2::{SockAddr, SockAddrStorage, socklen_t};
30+
use socket2::{SockAddr, SockAddrStorage, Socket as Socket2, socklen_t};
3131

3232
use crate::{op::*, sys::aio::*, sys_slice::*, syscall};
3333

@@ -47,12 +47,15 @@ impl AsFd for CurrentDir {
4747
}
4848
}
4949

50-
/// Open or create a file with flags and mode.
51-
pub struct OpenFile<S: AsFd> {
52-
pub(crate) dirfd: S,
53-
pub(crate) path: CString,
54-
pub(crate) flags: i32,
55-
pub(crate) mode: libc::mode_t,
50+
pin_project! {
51+
/// Open or create a file with flags and mode.
52+
pub struct OpenFile<S: AsFd> {
53+
pub(crate) dirfd: S,
54+
pub(crate) path: CString,
55+
pub(crate) flags: i32,
56+
pub(crate) mode: libc::mode_t,
57+
pub(crate) opened_fd: Option<OwnedFd>,
58+
}
5659
}
5760

5861
impl<S: AsFd> OpenFile<S> {
@@ -63,6 +66,7 @@ impl<S: AsFd> OpenFile<S> {
6366
path,
6467
flags,
6568
mode,
69+
opened_fd: None,
6670
}
6771
}
6872

@@ -76,6 +80,14 @@ impl<S: AsFd> OpenFile<S> {
7680
}
7781
}
7882

83+
impl<S: AsFd> IntoInner for OpenFile<S> {
84+
type Inner = OwnedFd;
85+
86+
fn into_inner(self) -> Self::Inner {
87+
self.opened_fd.expect("file not opened")
88+
}
89+
}
90+
7991
impl CloseFile {
8092
pub(crate) fn call(self: Pin<&mut Self>) -> io::Result<usize> {
8193
Ok(syscall!(libc::close(self.fd.as_fd().as_raw_fd()))? as _)
@@ -462,11 +474,14 @@ impl<S1: AsFd, S2: AsFd> HardLink<S1, S2> {
462474
}
463475
}
464476

465-
/// Create a socket.
466-
pub struct CreateSocket {
467-
pub(crate) domain: i32,
468-
pub(crate) socket_type: i32,
469-
pub(crate) protocol: i32,
477+
pin_project! {
478+
/// Create a socket.
479+
pub struct CreateSocket {
480+
pub(crate) domain: i32,
481+
pub(crate) socket_type: i32,
482+
pub(crate) protocol: i32,
483+
pub(crate) opened_fd: Option<Socket2>,
484+
}
470485
}
471486

472487
impl CreateSocket {
@@ -476,10 +491,19 @@ impl CreateSocket {
476491
domain,
477492
socket_type,
478493
protocol,
494+
opened_fd: None,
479495
}
480496
}
481497
}
482498

499+
impl IntoInner for CreateSocket {
500+
type Inner = Socket2;
501+
502+
fn into_inner(self) -> Self::Inner {
503+
self.opened_fd.expect("socket not created")
504+
}
505+
}
506+
483507
impl<S: AsFd> ShutdownSocket<S> {
484508
pub(crate) fn how(&self) -> i32 {
485509
match self.how {
@@ -506,7 +530,7 @@ pin_project! {
506530
pub(crate) fd: S,
507531
pub(crate) buffer: SockAddrStorage,
508532
pub(crate) addr_len: socklen_t,
509-
pub(crate) accepted_fd: Option<OwnedFd>,
533+
pub(crate) accepted_fd: Option<Socket2>,
510534
_p: PhantomPinned,
511535
}
512536
}
@@ -526,9 +550,9 @@ impl<S> Accept<S> {
526550
}
527551

528552
/// Get the remote address from the inner buffer.
529-
pub fn into_addr(mut self) -> SockAddr {
530-
std::mem::forget(self.accepted_fd.take());
531-
unsafe { SockAddr::new(self.buffer, self.addr_len) }
553+
pub fn into_addr(mut self) -> (Socket2, SockAddr) {
554+
let socket = self.accepted_fd.take().expect("socket not accepted");
555+
(socket, unsafe { SockAddr::new(self.buffer, self.addr_len) })
532556
}
533557
}
534558

compio-driver/tests/file.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -52,8 +52,9 @@ fn open_file(driver: &mut Proactor) -> OwnedFd {
5252

5353
#[cfg(unix)]
5454
fn open_file(driver: &mut Proactor) -> OwnedFd {
55-
use std::{ffi::CString, os::fd::FromRawFd};
55+
use std::ffi::CString;
5656

57+
use compio_buf::IntoInner;
5758
use compio_driver::op::{CurrentDir, OpenFile};
5859

5960
let op = OpenFile::new(
@@ -62,8 +63,8 @@ fn open_file(driver: &mut Proactor) -> OwnedFd {
6263
libc::O_CLOEXEC | libc::O_RDONLY,
6364
0o666,
6465
);
65-
let (fd, _) = push_and_wait(driver, op).unwrap();
66-
unsafe { OwnedFd::from_raw_fd(fd as _) }
66+
let (_, op) = push_and_wait(driver, op).unwrap();
67+
op.into_inner()
6768
}
6869

6970
fn push_and_wait_extra<O: OpCode + 'static>(

compio-driver/tests/personality.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
#![cfg(io_uring)]
22

3-
use compio_buf::BufResult;
3+
use compio_buf::{BufResult, IntoInner};
44
use compio_driver::{
55
op::{CurrentDir, ReadAt},
66
*,
@@ -26,7 +26,7 @@ fn push_and_wait<O: OpCode + 'static>(
2626
}
2727

2828
fn open_file(driver: &mut Proactor, personality: u16) -> OwnedFd {
29-
use std::{ffi::CString, os::fd::FromRawFd};
29+
use std::ffi::CString;
3030

3131
use compio_driver::op::OpenFile;
3232

@@ -36,8 +36,8 @@ fn open_file(driver: &mut Proactor, personality: u16) -> OwnedFd {
3636
libc::O_CLOEXEC | libc::O_RDONLY,
3737
0o666,
3838
);
39-
let (fd, _) = push_and_wait(driver, op, personality).unwrap();
40-
unsafe { OwnedFd::from_raw_fd(fd as _) }
39+
let (_, op) = push_and_wait(driver, op, personality).unwrap();
40+
op.into_inner()
4141
}
4242

4343
#[test]

compio-fs/src/open_options/unix.rs

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,7 @@
1-
use std::{io, os::fd::FromRawFd, path::Path};
1+
use std::{io, path::Path};
22

3-
use compio_driver::{
4-
RawFd,
5-
op::{CurrentDir, OpenFile},
6-
};
3+
use compio_buf::{IntoInner, buf_try};
4+
use compio_driver::op::{CurrentDir, OpenFile};
75

86
use crate::{File, path_string};
97

@@ -89,7 +87,7 @@ impl OpenOptions {
8987
| (self.custom_flags as libc::c_int & !libc::O_ACCMODE);
9088
let p = path_string(p)?;
9189
let op = OpenFile::new(CurrentDir, p, flags, self.mode);
92-
let fd = compio_runtime::submit(op).await.0? as RawFd;
93-
File::from_std(unsafe { std::fs::File::from_raw_fd(fd) })
90+
let (_, op) = buf_try!(@try compio_runtime::submit(op).await);
91+
File::from_std(op.into_inner().into())
9492
}
9593
}

0 commit comments

Comments
 (0)