Skip to content

Commit 1362e67

Browse files
feat(net): add recv_from_managed (#710)
1 parent 43c6ef4 commit 1362e67

File tree

3 files changed

+52
-3
lines changed

3 files changed

+52
-3
lines changed

compio-net/src/socket.rs

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,9 +10,9 @@ use compio_driver::op::CreateSocket;
1010
use compio_driver::{
1111
AsRawFd, ToSharedFd, impl_raw_fd,
1212
op::{
13-
Accept, BufResultExt, CloseSocket, Connect, Recv, RecvFrom, RecvFromVectored, RecvManaged,
14-
RecvMsg, RecvResultExt, RecvVectored, ResultTakeBuffer, Send, SendMsg, SendTo,
15-
SendToVectored, SendVectored, ShutdownSocket, VecBufResultExt,
13+
Accept, BufResultExt, CloseSocket, Connect, Recv, RecvFrom, RecvFromManaged,
14+
RecvFromVectored, RecvManaged, RecvMsg, RecvResultExt, RecvVectored, ResultTakeBuffer,
15+
Send, SendMsg, SendTo, SendToVectored, SendVectored, ShutdownSocket, VecBufResultExt,
1616
},
1717
syscall,
1818
};
@@ -177,6 +177,21 @@ impl Socket {
177177
.take_buffer(buffer_pool)
178178
}
179179

180+
pub async fn recv_from_managed<'a>(
181+
&self,
182+
buffer_pool: &'a BufferPool,
183+
len: usize,
184+
flags: i32,
185+
) -> io::Result<(BorrowedBuffer<'a>, SockAddr)> {
186+
let fd = self.to_shared_fd();
187+
let buffer_pool = buffer_pool.try_inner()?;
188+
let op = RecvFromManaged::new(fd, buffer_pool, len, flags)?;
189+
compio_runtime::submit(op)
190+
.with_extra()
191+
.await
192+
.take_buffer(buffer_pool)
193+
}
194+
180195
pub async fn send<T: IoBuf>(&self, buffer: T, flags: i32) -> BufResult<usize, T> {
181196
let fd = self.to_shared_fd();
182197
let op = Send::new(fd, buffer, flags);

compio-net/src/udp.rs

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -219,6 +219,22 @@ impl UdpSocket {
219219
self.inner.recv_managed(buffer_pool, len, 0).await
220220
}
221221

222+
/// Read some bytes from this source with [`BufferPool`] and return
223+
/// a [`BorrowedBuffer`] with the sender address.
224+
///
225+
/// If `len` == 0, will use [`BufferPool`] inner buffer size as the max len,
226+
/// if `len` > 0, `min(len, inner buffer size)` will be the read max len
227+
pub async fn recv_from_managed<'a>(
228+
&self,
229+
buffer_pool: &'a BufferPool,
230+
len: usize,
231+
) -> io::Result<(BorrowedBuffer<'a>, SocketAddr)> {
232+
self.inner
233+
.recv_from_managed(buffer_pool, len, 0)
234+
.await
235+
.map(|(buffer, addr)| (buffer, addr.as_socket().expect("should be SocketAddr")))
236+
}
237+
222238
/// Sends some data to the socket from the buffer, returning the original
223239
/// buffer and quantity of data sent.
224240
pub async fn send<T: IoBuf>(&self, buffer: T) -> BufResult<usize, T> {

compio-net/tests/buffer_pool.rs

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,24 @@ async fn test_udp_read_buffer_pool() {
5858
);
5959
}
6060

61+
#[compio_macros::test]
62+
async fn test_udp_recv_from_buffer_pool() {
63+
let listener = UdpSocket::bind((Ipv6Addr::LOCALHOST, 0)).await.unwrap();
64+
let listener_addr = listener.local_addr().unwrap();
65+
let connected = UdpSocket::bind((Ipv6Addr::LOCALHOST, 0)).await.unwrap();
66+
let connected_addr = connected.local_addr().unwrap();
67+
68+
compio_runtime::spawn(async move {
69+
connected.send_to(b"test", listener_addr).await.unwrap();
70+
})
71+
.detach();
72+
73+
let buffer_pool = BufferPool::new(1, 4).unwrap();
74+
let (buffer, addr) = listener.recv_from_managed(&buffer_pool, 0).await.unwrap();
75+
assert_eq!(buffer.as_ref(), b"test");
76+
assert_eq!(addr, connected_addr);
77+
}
78+
6179
#[compio_macros::test]
6280
async fn test_uds_recv_buffer_pool() {
6381
let dir = tempfile::Builder::new()

0 commit comments

Comments
 (0)