Skip to content

Commit 0b322a3

Browse files
authored
feat(driver)!: accept multi (#747)
* test(driver): rewrite push_and_wait_multi * feat(driver): accept multi * fix(driver,stub): add accept multi * fix(driver): use returned op correctly * fix(driver,stub): typo * fix(driver,unix): don't return addr in AcceptMulti
1 parent 9f20a09 commit 0b322a3

File tree

8 files changed

+274
-44
lines changed

8 files changed

+274
-44
lines changed

compio-driver/src/op.rs

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -16,14 +16,14 @@ pub use crate::sys::op::{
1616
Accept, Recv, RecvFrom, RecvFromVectored, RecvMsg, RecvVectored, Send, SendMsg, SendMsgZc,
1717
SendTo, SendToVectored, SendToVectoredZc, SendToZc, SendVectored, SendVectoredZc, SendZc,
1818
};
19-
#[cfg(windows)]
20-
pub use crate::sys::op::{ConnectNamedPipe, DeviceIoControl};
2119
#[cfg(unix)]
2220
pub use crate::sys::op::{
23-
CreateDir, CreateSocket, CurrentDir, FileStat, HardLink, Interest, OpenFile, PathStat,
24-
PollOnce, ReadVectored, ReadVectoredAt, Rename, Stat, Symlink, TruncateFile, Unlink,
21+
AcceptMulti, CreateDir, CreateSocket, CurrentDir, FileStat, HardLink, Interest, OpenFile,
22+
PathStat, PollOnce, ReadVectored, ReadVectoredAt, Rename, Stat, Symlink, TruncateFile, Unlink,
2523
WriteVectored, WriteVectoredAt,
2624
};
25+
#[cfg(windows)]
26+
pub use crate::sys::op::{ConnectNamedPipe, DeviceIoControl};
2727
#[cfg(io_uring)]
2828
pub use crate::sys::op::{
2929
ReadManaged, ReadManagedAt, ReadMulti, ReadMultiAt, RecvFromManaged, RecvManaged, RecvMulti,
@@ -665,7 +665,9 @@ pub(crate) mod managed {
665665
}
666666

667667
#[cfg(not(io_uring))]
668-
pub use managed::*;
668+
pub use managed::{
669+
ReadManaged, ReadManagedAt, ReadMulti, ReadMultiAt, RecvFromManaged, RecvManaged, RecvMulti,
670+
};
669671

670672
bitflags::bitflags! {
671673
/// Flags for operations.

compio-driver/src/sys/fusion/op.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,7 @@ mod iour { pub use crate::sys::iour::{op::*, OpCode}; }
126126
#[rustfmt::skip]
127127
mod poll { pub use crate::sys::poll::{op::*, OpCode}; }
128128

129+
op!(<S: AsFd> AcceptMulti(fd: S));
129130
op!(<T: IoBufMut, S: AsFd> RecvFrom(fd: S, buffer: T, flags: i32));
130131
op!(<T: IoBuf, S: AsFd> SendTo(fd: S, buffer: T, addr: SockAddr, flags: i32));
131132
op!(<T: IoVectoredBufMut, S: AsFd> RecvFromVectored(fd: S, buffer: T, flags: i32));

compio-driver/src/sys/iour/op.rs

Lines changed: 78 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,9 @@
11
use std::{
2+
collections::VecDeque,
23
ffi::CString,
34
io,
45
marker::PhantomPinned,
5-
os::fd::{AsFd, AsRawFd, FromRawFd, OwnedFd},
6+
os::fd::{AsFd, AsRawFd, FromRawFd, IntoRawFd, OwnedFd},
67
pin::Pin,
78
};
89

@@ -557,6 +558,82 @@ unsafe impl<S: AsFd> OpCode for Accept<S> {
557558
}
558559
}
559560

561+
struct AcceptMultishotResult {
562+
res: io::Result<Socket2>,
563+
extra: crate::Extra,
564+
}
565+
566+
impl AcceptMultishotResult {
567+
pub unsafe fn new(res: io::Result<usize>, extra: crate::Extra) -> Self {
568+
Self {
569+
res: res.map(|fd| unsafe { Socket2::from_raw_fd(fd as _) }),
570+
extra,
571+
}
572+
}
573+
574+
pub fn into_result(self) -> BufResult<usize, crate::Extra> {
575+
BufResult(self.res.map(|fd| fd.into_raw_fd() as _), self.extra)
576+
}
577+
}
578+
579+
pin_project! {
580+
/// Accept multiple connections.
581+
pub struct AcceptMulti<S> {
582+
#[pin]
583+
pub(crate) op: Accept<S>,
584+
multishots: VecDeque<AcceptMultishotResult>
585+
}
586+
}
587+
588+
impl<S> AcceptMulti<S> {
589+
/// Create [`AcceptMulti`].
590+
pub fn new(fd: S) -> Self {
591+
Self {
592+
op: Accept::new(fd),
593+
multishots: VecDeque::new(),
594+
}
595+
}
596+
}
597+
598+
unsafe impl<S: AsFd> OpCode for AcceptMulti<S> {
599+
fn create_entry(self: Pin<&mut Self>) -> OpEntry {
600+
let this = self.project();
601+
opcode::AcceptMulti::new(Fd(this.op.fd.as_fd().as_raw_fd()))
602+
.flags(libc::SOCK_CLOEXEC)
603+
.build()
604+
.into()
605+
}
606+
607+
fn create_entry_fallback(self: Pin<&mut Self>) -> OpEntry {
608+
self.project().op.create_entry()
609+
}
610+
611+
unsafe fn set_result(self: Pin<&mut Self>, res: &io::Result<usize>, extra: &crate::Extra) {
612+
unsafe { self.project().op.set_result(res, extra) }
613+
}
614+
615+
unsafe fn push_multishot(self: Pin<&mut Self>, res: io::Result<usize>, extra: crate::Extra) {
616+
self.project()
617+
.multishots
618+
.push_back(unsafe { AcceptMultishotResult::new(res, extra) });
619+
}
620+
621+
fn pop_multishot(self: Pin<&mut Self>) -> Option<BufResult<usize, crate::sys::Extra>> {
622+
self.project()
623+
.multishots
624+
.pop_front()
625+
.map(|res| res.into_result())
626+
}
627+
}
628+
629+
impl<S> IntoInner for AcceptMulti<S> {
630+
type Inner = Socket2;
631+
632+
fn into_inner(self) -> Self::Inner {
633+
self.op.into_inner().0
634+
}
635+
}
636+
560637
unsafe impl<S: AsFd> OpCode for Connect<S> {
561638
fn create_entry(self: Pin<&mut Self>) -> OpEntry {
562639
opcode::Connect::new(

compio-driver/src/sys/poll/op.rs

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -772,6 +772,44 @@ unsafe impl<S: AsFd> OpCode for Accept<S> {
772772
}
773773
}
774774

775+
pin_project! {
776+
/// Accept multiple connections.
777+
pub struct AcceptMulti<S> {
778+
#[pin]
779+
pub(crate) op: Accept<S>,
780+
}
781+
}
782+
783+
impl<S> AcceptMulti<S> {
784+
/// Create [`AcceptMulti`].
785+
pub fn new(fd: S) -> Self {
786+
Self {
787+
op: Accept::new(fd),
788+
}
789+
}
790+
}
791+
792+
unsafe impl<S: AsFd> OpCode for AcceptMulti<S> {
793+
fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
794+
self.project().op.pre_submit()
795+
}
796+
797+
fn op_type(self: Pin<&mut Self>) -> Option<OpType> {
798+
self.project().op.op_type()
799+
}
800+
801+
fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
802+
self.project().op.operate()
803+
}
804+
}
805+
806+
impl<S> IntoInner for AcceptMulti<S> {
807+
type Inner = Socket2;
808+
809+
fn into_inner(self) -> Self::Inner {
810+
self.op.into_inner().0
811+
}
812+
}
775813
unsafe impl<S: AsFd> OpCode for Connect<S> {
776814
fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
777815
syscall!(

compio-driver/src/sys/stub/op.rs

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
use std::ffi::CString;
44

55
use compio_buf::{BufResult, IntoInner, IoBuf, IoBufMut, IoVectoredBuf, IoVectoredBufMut};
6-
use socket2::{SockAddr, SockAddrStorage, socklen_t};
6+
use socket2::{SockAddr, SockAddrStorage, Socket as Socket2, socklen_t};
77

88
pub use self::{
99
Send as SendZc, SendMsg as SendMsgZc, SendTo as SendToZc, SendToVectored as SendToVectoredZc,
@@ -129,6 +129,28 @@ impl OpCode for CloseSocket {}
129129

130130
impl<S: AsFd> OpCode for Accept<S> {}
131131

132+
/// Accept multiple connections.
133+
pub struct AcceptMulti<S> {
134+
fd: S,
135+
}
136+
137+
impl<S> AcceptMulti<S> {
138+
/// Create [`AcceptMulti`].
139+
pub fn new(fd: S) -> Self {
140+
Self { fd }
141+
}
142+
}
143+
144+
impl<S> IntoInner for AcceptMulti<S> {
145+
type Inner = Socket2;
146+
147+
fn into_inner(self) -> Self::Inner {
148+
stub_unimpl()
149+
}
150+
}
151+
152+
impl<S: AsFd> OpCode for AcceptMulti<S> {}
153+
132154
impl<S: AsFd> OpCode for Connect<S> {}
133155

134156
impl<T: IoBufMut, S: AsFd> OpCode for Recv<T, S> {}

compio-driver/src/sys/unix_op.rs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -548,9 +548,12 @@ impl<S> Accept<S> {
548548
_p: PhantomPinned,
549549
}
550550
}
551+
}
552+
553+
impl<S> IntoInner for Accept<S> {
554+
type Inner = (Socket2, SockAddr);
551555

552-
/// Get the remote address from the inner buffer.
553-
pub fn into_addr(mut self) -> (Socket2, SockAddr) {
556+
fn into_inner(mut self) -> Self::Inner {
554557
let socket = self.accepted_fd.take().expect("socket not accepted");
555558
(socket, unsafe { SockAddr::new(self.buffer, self.addr_len) })
556559
}

0 commit comments

Comments
 (0)