Skip to content

Commit 12324fe

Browse files
authored
feat(net): zerocopy API (#756)
* feat(net): zerocopy API * feat: tests
1 parent 0b322a3 commit 12324fe

File tree

8 files changed

+320
-11
lines changed

8 files changed

+320
-11
lines changed

compio-net/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ cfg-if = { workspace = true }
2424
either = "1.9.0"
2525
once_cell = { workspace = true }
2626
socket2 = { workspace = true }
27+
futures-util = { workspace = true }
2728

2829
[target.'cfg(windows)'.dependencies]
2930
widestring = { workspace = true }
@@ -40,7 +41,6 @@ libc = { workspace = true }
4041
# Shared dev dependencies for all platforms
4142
[dev-dependencies]
4243
compio-macros = { workspace = true }
43-
futures-util = { workspace = true }
4444
tempfile = { workspace = true }
4545

4646
[features]

compio-net/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ pub type CMsgBuilder<'a> = compio_io::ancillary::AncillaryBuilder<'a>;
4848
pub type PollFd<T> = compio_runtime::fd::PollFd<T>;
4949
pub use opts::SocketOpts;
5050
pub use resolve::ToSocketAddrsAsync;
51-
pub(crate) use resolve::{each_addr, first_addr_buf};
51+
pub(crate) use resolve::{each_addr, first_addr_buf, first_addr_buf_zerocopy};
5252
pub(crate) use socket::*;
5353
pub use split::*;
5454
pub use tcp::*;

compio-net/src/resolve/mod.rs

Lines changed: 32 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ cfg_if::cfg_if! {
99
}
1010

1111
use std::{
12-
future::Future,
12+
future::{Future, Ready, ready},
1313
io,
1414
net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr, SocketAddrV4, SocketAddrV6},
1515
};
@@ -169,3 +169,34 @@ pub async fn first_addr_buf<T, B, F: Future<Output = BufResult<T, B>>>(
169169
)
170170
}
171171
}
172+
173+
pub async fn first_addr_buf_zerocopy<B, F1, F2>(
174+
addr: impl ToSocketAddrsAsync,
175+
buffer: B,
176+
f: impl FnOnce(SocketAddr, B) -> F1,
177+
) -> BufResult<usize, Either<Ready<B>, F2>>
178+
where
179+
F1: Future<Output = BufResult<usize, F2>>,
180+
F2: Future<Output = B>,
181+
{
182+
fn ret<T, F>(fut: T) -> Either<Ready<T>, F> {
183+
Either::Left(ready(fut))
184+
}
185+
186+
let mut addrs = match addr.to_socket_addrs_async().await {
187+
Ok(addrs) => addrs,
188+
Err(e) => return BufResult(Err(e), ret(buffer)),
189+
};
190+
if let Some(addr) = addrs.next() {
191+
let BufResult(res, fut) = f(addr, buffer).await;
192+
BufResult(res, Either::Right(fut))
193+
} else {
194+
BufResult(
195+
Err(io::Error::new(
196+
io::ErrorKind::InvalidInput,
197+
"could not operate on first address",
198+
)),
199+
ret(buffer),
200+
)
201+
}
202+
}

compio-net/src/socket.rs

Lines changed: 91 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,15 +8,17 @@ use compio_buf::{BufResult, IntoInner, IoBuf, IoBufMut, IoVectoredBuf, IoVectore
88
#[cfg(unix)]
99
use compio_driver::op::CreateSocket;
1010
use compio_driver::{
11-
AsRawFd, ToSharedFd, impl_raw_fd,
11+
AsRawFd, OpCode, ToSharedFd, impl_raw_fd,
1212
op::{
1313
Accept, BufResultExt, CloseSocket, Connect, Recv, RecvFrom, RecvFromManaged,
1414
RecvFromVectored, RecvManaged, RecvMsg, RecvResultExt, RecvVectored, ResultTakeBuffer,
15-
Send, SendMsg, SendTo, SendToVectored, SendVectored, ShutdownSocket, VecBufResultExt,
15+
Send, SendMsg, SendMsgZc, SendTo, SendToVectored, SendToVectoredZc, SendToZc, SendVectored,
16+
SendVectoredZc, SendZc, ShutdownSocket, VecBufResultExt,
1617
},
1718
syscall,
1819
};
1920
use compio_runtime::{Attacher, BorrowedBuffer, BufferPool, fd::PollFd};
21+
use futures_util::StreamExt;
2022
use socket2::{Domain, Protocol, SockAddr, Socket as Socket2, Type};
2123

2224
#[derive(Debug, Clone)]
@@ -208,6 +210,22 @@ impl Socket {
208210
compio_runtime::submit(op).await.into_inner()
209211
}
210212

213+
pub async fn send_zerocopy<T: IoBuf>(
214+
&self,
215+
buf: T,
216+
flags: i32,
217+
) -> BufResult<usize, impl Future<Output = T> + use<T>> {
218+
submit_zerocopy(SendZc::new(self.to_shared_fd(), buf, flags)).await
219+
}
220+
221+
pub async fn send_zerocopy_vectored<T: IoVectoredBuf>(
222+
&self,
223+
buf: T,
224+
flags: i32,
225+
) -> BufResult<usize, impl Future<Output = T> + use<T>> {
226+
submit_zerocopy(SendVectoredZc::new(self.to_shared_fd(), buf, flags)).await
227+
}
228+
211229
pub async fn recv_from<T: IoBufMut>(
212230
&self,
213231
buffer: T,
@@ -275,6 +293,26 @@ impl Socket {
275293
compio_runtime::submit(op).await.into_inner()
276294
}
277295

296+
pub async fn send_to_zerocopy<T: IoBuf>(
297+
&self,
298+
buffer: T,
299+
addr: &SockAddr,
300+
flags: i32,
301+
) -> BufResult<usize, impl Future<Output = T> + use<T>> {
302+
let op = SendToZc::new(self.to_shared_fd(), buffer, addr.clone(), flags);
303+
submit_zerocopy(op).await
304+
}
305+
306+
pub async fn send_to_zerocopy_vectored<T: IoVectoredBuf>(
307+
&self,
308+
buffer: T,
309+
addr: &SockAddr,
310+
flags: i32,
311+
) -> BufResult<usize, impl Future<Output = T> + use<T>> {
312+
let op = SendToVectoredZc::new(self.to_shared_fd(), buffer, addr.clone(), flags);
313+
submit_zerocopy(op).await
314+
}
315+
278316
pub async fn send_msg<T: IoBuf, C: IoBuf>(
279317
&self,
280318
buffer: T,
@@ -299,6 +337,33 @@ impl Socket {
299337
compio_runtime::submit(op).await.into_inner()
300338
}
301339

340+
pub async fn send_msg_zerocopy<T: IoBuf, C: IoBuf>(
341+
&self,
342+
buffer: T,
343+
control: C,
344+
addr: Option<&SockAddr>,
345+
flags: i32,
346+
) -> BufResult<usize, impl Future<Output = (T, C)> + use<T, C>> {
347+
self.send_msg_zerocopy_vectored([buffer], control, addr, flags)
348+
.await
349+
.map_buffer(|fut| async move {
350+
let ([buffer], control) = fut.await;
351+
(buffer, control)
352+
})
353+
}
354+
355+
pub async fn send_msg_zerocopy_vectored<T: IoVectoredBuf, C: IoBuf>(
356+
&self,
357+
buffer: T,
358+
control: C,
359+
addr: Option<&SockAddr>,
360+
flags: i32,
361+
) -> BufResult<usize, impl Future<Output = (T, C)> + use<T, C>> {
362+
let fd = self.to_shared_fd();
363+
let op = SendMsgZc::new(fd, buffer, control, addr.cloned(), flags);
364+
submit_zerocopy(op).await
365+
}
366+
302367
#[cfg(unix)]
303368
pub unsafe fn get_socket_option<T: Copy>(&self, level: i32, name: i32) -> io::Result<T> {
304369
let mut value: MaybeUninit<T> = MaybeUninit::uninit();
@@ -377,3 +442,27 @@ impl Socket {
377442
}
378443

379444
impl_raw_fd!(Socket, Socket2, socket, socket);
445+
446+
async fn submit_zerocopy<T: OpCode + IntoInner + 'static>(
447+
op: T,
448+
) -> BufResult<usize, impl Future<Output = T::Inner> + use<T>> {
449+
let mut stream = compio_runtime::submit_multi(op);
450+
let res = stream
451+
.next()
452+
.await
453+
.expect("SubmitMulti should yield at least one item")
454+
.0;
455+
456+
let fut = async move {
457+
// we don't need 2nd CQE's result
458+
_ = stream.next().await;
459+
460+
stream
461+
.try_take()
462+
.map_err(|_| ())
463+
.expect("Cannot retrieve buffer")
464+
.into_inner()
465+
};
466+
467+
BufResult(res, fut)
468+
}

compio-net/src/split.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -37,19 +37,19 @@ where
3737
for<'a> &'a T: AsyncWrite,
3838
{
3939
async fn write<B: IoBuf>(&mut self, buf: B) -> BufResult<usize, B> {
40-
self.0.write(buf).await
40+
(self.0).write(buf).await
4141
}
4242

4343
async fn write_vectored<B: IoVectoredBuf>(&mut self, buf: B) -> BufResult<usize, B> {
44-
self.0.write_vectored(buf).await
44+
(self.0).write_vectored(buf).await
4545
}
4646

4747
async fn flush(&mut self) -> io::Result<()> {
48-
self.0.flush().await
48+
(self.0).flush().await
4949
}
5050

5151
async fn shutdown(&mut self) -> io::Result<()> {
52-
self.0.shutdown().await
52+
(self.0).shutdown().await
5353
}
5454
}
5555

compio-net/src/tcp.rs

Lines changed: 28 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -328,6 +328,30 @@ impl TcpStream {
328328

329329
self.inner.send(buf, MSG_OOB).await
330330
}
331+
332+
/// Sends data using [zero-copy send](https://man7.org/linux/man-pages/man3/io_uring_prep_send_zc.3.html).
333+
///
334+
/// If the underlying platform doesn't support zero-copy send, it will fall
335+
/// back to normal send.
336+
pub async fn send_zerocopy<T: IoBuf>(
337+
&self,
338+
buf: T,
339+
flags: i32,
340+
) -> BufResult<usize, impl Future<Output = T> + use<T>> {
341+
self.inner.send_zerocopy(buf, flags).await
342+
}
343+
344+
/// Sends vectorized data using [zero-copy send](https://man7.org/linux/man-pages/man3/io_uring_prep_send_zc.3.html).
345+
///
346+
/// If the underlying platform doesn't support zero-copy send, it will fall
347+
/// back to normal send.
348+
pub async fn send_zerocopy_vectored<T: IoVectoredBuf>(
349+
&self,
350+
buf: T,
351+
flags: i32,
352+
) -> BufResult<usize, impl Future<Output = T> + use<T>> {
353+
self.inner.send_zerocopy_vectored(buf, flags).await
354+
}
331355
}
332356

333357
impl AsyncRead for TcpStream {
@@ -405,12 +429,14 @@ impl AsyncWrite for TcpStream {
405429
impl AsyncWrite for &TcpStream {
406430
#[inline]
407431
async fn write<T: IoBuf>(&mut self, buf: T) -> BufResult<usize, T> {
408-
self.inner.send(buf, 0).await
432+
let BufResult(res, fut) = self.send_zerocopy(buf, 0).await;
433+
BufResult(res, fut.await)
409434
}
410435

411436
#[inline]
412437
async fn write_vectored<T: IoVectoredBuf>(&mut self, buf: T) -> BufResult<usize, T> {
413-
self.inner.send_vectored(buf, 0).await
438+
let BufResult(res, fut) = self.send_zerocopy_vectored(buf, 0).await;
439+
BufResult(res, fut.await)
414440
}
415441

416442
#[inline]

compio-net/src/udp.rs

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -389,6 +389,76 @@ impl UdpSocket {
389389
.await
390390
}
391391

392+
/// Sends data on the socket to the given address with zero copy.
393+
///
394+
/// Returns the result of send and a future that resolves to the
395+
/// original buffer when the send is complete.
396+
pub async fn send_to_zerocopy<A: ToSocketAddrsAsync, T: IoBuf>(
397+
&self,
398+
buffer: T,
399+
addr: A,
400+
) -> BufResult<usize, impl Future<Output = T> + use<A, T>> {
401+
super::first_addr_buf_zerocopy(addr, buffer, |addr, buffer| async move {
402+
self.inner.send_to_zerocopy(buffer, &addr.into(), 0).await
403+
})
404+
.await
405+
}
406+
407+
/// Sends vectored data on the socket to the given address with zero copy.
408+
///
409+
/// Returns the result of send and a future that resolves to the
410+
/// original buffer when the send is complete.
411+
pub async fn send_to_zerocopy_vectored<A: ToSocketAddrsAsync, T: IoVectoredBuf>(
412+
&self,
413+
buffer: T,
414+
addr: A,
415+
) -> BufResult<usize, impl Future<Output = T> + use<A, T>> {
416+
super::first_addr_buf_zerocopy(addr, buffer, |addr, buffer| async move {
417+
self.inner
418+
.send_to_zerocopy_vectored(buffer, &addr.into(), 0)
419+
.await
420+
})
421+
.await
422+
}
423+
424+
/// Sends data with control message on the socket to the given address with
425+
/// zero copy.
426+
///
427+
/// Returns the result of send and a future that resolves to the
428+
/// original buffer when the send is complete.
429+
pub async fn send_msg_zerocopy<A: ToSocketAddrsAsync, T: IoBuf, C: IoBuf>(
430+
&self,
431+
buffer: T,
432+
control: C,
433+
addr: A,
434+
) -> BufResult<usize, impl Future<Output = (T, C)> + use<A, T, C>> {
435+
super::first_addr_buf_zerocopy(addr, (buffer, control), |addr, (b, c)| async move {
436+
self.inner
437+
.send_msg_zerocopy(b, c, Some(&addr.into()), 0)
438+
.await
439+
})
440+
.await
441+
}
442+
443+
/// Sends vectored data with control message on the socket to the given
444+
/// address with zero copy.
445+
///
446+
/// Returns the result of send and a future that resolves to the
447+
/// original buffer when the send is complete.
448+
pub async fn send_msg_zerocopy_vectored<A: ToSocketAddrsAsync, T: IoVectoredBuf, C: IoBuf>(
449+
&self,
450+
buffer: T,
451+
control: C,
452+
addr: A,
453+
) -> BufResult<usize, impl Future<Output = (T, C)> + use<A, T, C>> {
454+
super::first_addr_buf_zerocopy(addr, (buffer, control), |addr, (b, c)| async move {
455+
self.inner
456+
.send_msg_zerocopy_vectored(b, c, Some(&addr.into()), 0)
457+
.await
458+
})
459+
.await
460+
}
461+
392462
/// Gets a socket option.
393463
///
394464
/// # Safety

0 commit comments

Comments
 (0)