Skip to content

Commit 94a9ec6

Browse files
committed
Start adding stuff to support sendmsg_zc.
1 parent 9b654c1 commit 94a9ec6

File tree

4 files changed

+80
-0
lines changed

4 files changed

+80
-0
lines changed

src/io/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,8 @@ mod send_to;
2828

2929
mod send_zc;
3030

31+
mod sendmsg_zc;
32+
3133
mod shared_fd;
3234
pub(crate) use shared_fd::SharedFd;
3335

src/io/sendmsg_zc.rs

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
use crate::buf::BoundedBuf;
2+
use crate::io::SharedFd;
3+
use crate::runtime::driver::op::{Completable, CqeResult, Op};
4+
use crate::runtime::CONTEXT;
5+
use crate::BufResult;
6+
use socket2::SockAddr;
7+
use std::io::IoSlice;
8+
use std::{boxed::Box, io, net::SocketAddr};
9+
10+
pub(crate) struct SendMsgZc<T> {
11+
#[allow(dead_code)]
12+
fd: SharedFd,
13+
14+
/*pub(crate) buf: T,
15+
16+
#[allow(dead_code)]
17+
io_slices: Vec<IoSlice<'static>>,
18+
19+
#[allow(dead_code)]
20+
socket_addr: Box<SockAddr>,*/
21+
22+
pub(crate) msghdr: Box<libc::msghdr>,
23+
}
24+
25+
impl<T: BoundedBuf> Op<SendZc<T>, MultiCQEFuture> {
26+
pub(crate) fn sendmsg_zc(fd: &SharedFd, msghdr: Box<libc::msghdr>) -> io::Result<Self> {
27+
use io_uring::{opcode, types};
28+
29+
/*let io_slices = vec![IoSlice::new(unsafe {
30+
std::slice::from_raw_parts(buf.stable_ptr(), buf.bytes_init())
31+
})];
32+
33+
let socket_addr = Box::new(SockAddr::from(socket_addr));
34+
35+
let mut msghdr: Box<libc::msghdr> = Box::new(unsafe { std::mem::zeroed() });
36+
msghdr.msg_iov = io_slices.as_ptr() as *mut _;
37+
msghdr.msg_iovlen = io_slices.len() as _;
38+
msghdr.msg_name = socket_addr.as_ptr() as *mut libc::c_void;
39+
msghdr.msg_namelen = socket_addr.len();*/
40+
41+
CONTEXT.with(|x| {
42+
x.handle().expect("Not in a runtime context").submit_op(
43+
SendMsgZc {
44+
fd: fd.clone(),
45+
/*buf,
46+
io_slices,
47+
socket_addr,*/
48+
msghdr: msghdr.clone(),
49+
},
50+
|sendmsg_zc| {
51+
opcode::SendMsgZc::new(types::Fd(sendmsg_zc.fd.raw_fd()), sendmsg_zc.msghdr.as_ref() as *const _).build()
52+
},
53+
)
54+
})
55+
}
56+
}
57+
58+
impl<T> Completable for SendMsgZc<T> {
59+
type Output = BufResult<usize, T>;
60+
61+
fn complete(self, cqe: CqeResult) -> Self::Output {
62+
// Convert the operation result to `usize`
63+
let res = cqe.result.map(|v| v as usize);
64+
// Recover the buffer
65+
let buf = self.buf;
66+
67+
(res, buf)
68+
}
69+
}

src/io/socket.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -147,6 +147,11 @@ impl Socket {
147147
op.await
148148
}
149149

150+
pub(crate) async fn sendmsg_zc<T: Box<libc::msghdr>(&self, msghdr: T) -> crate::BufResult<usize, T> {
151+
let op = Op::sendmsg_zc(&self.fd, msghdr).unwrap();
152+
op.await
153+
}
154+
150155
pub(crate) async fn read<T: BoundedBufMut>(&self, buf: T) -> crate::BufResult<usize, T> {
151156
let op = Op::read_at(&self.fd, buf, 0).unwrap();
152157
op.await

src/net/udp.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -220,6 +220,10 @@ impl UdpSocket {
220220
self.inner.send_zc(buf).await
221221
}
222222

223+
pub async fn sendmsg_zc<T: Box<libc::msghdr>>(&self, msghdr: T) -> crate::BufResult<usize, T> {
224+
self.inner.sendmsg_zc(msghdr).await
225+
}
226+
223227
/// Receives a single datagram message on the socket. On success, returns
224228
/// the number of bytes read and the origin.
225229
pub async fn recv_from<T: BoundedBufMut>(

0 commit comments

Comments
 (0)