feat: recv/recvmsg multishot support for io_uring driver#379
feat: recv/recvmsg multishot support for io_uring driver#379dshulyak wants to merge 4 commits intobytedance:masterfrom
Conversation
Extends the lifecycle state machine to handle multishot operations.
Unlike single-shot operations that produce one completion per submission,
multishot operations yield multiple completions until explicitly terminated via the IORING_CQE_F_MORE flag.
Example: single-shot recv
```rust
let (result, buf) = socket.recv(buf).await;
// ├─ submit() → Lifecycle::Submitted, returns Op<Recv>
// ├─ .await → poll() sees Submitted, stores waker → Lifecycle::Waiting(waker)
// │ returns Poll::Pending, task suspends
// │
// │ kernel receives packet, posts CQE
// │
// ├─ complete()→ Lifecycle::Completed(result, flags), waker.wake()
// ├─ .await → poll() sees Completed, removes from slab, returns Poll::Ready
// └─ done
```
Example: multishot recv
```rust
let ring = UserRingBuf::new(4, 1500, 0)?; // 4 buffers, 1500 bytes each
let mut stream = socket.recv_multishot(ring)?; // Lifecycle::Multishot { queue: [], terminated: false }
// ↳ submits RecvMulti SQE to io_uring
sender.send_to(b"hello", addr).await; // kernel receives, posts CQE with MORE=1
// ↳ complete(): queue.push(cqe), waker.wake()
let buf = stream.next().await; // poll_multishot(): queue.pop() → Ready(cqe)
assert_eq!(&*buf, b"hello"); // ↳ buf holds buffer #0
drop(buf); // ↳ buffer #0 returned to ring
sender.send_to(b"world", addr).await; // kernel posts CQE with MORE=1
let buf = stream.next().await; // poll_multishot() → Ready(cqe)
assert_eq!(&*buf, b"world");
drop(buf);
drop(stream); // user drops while kernel still working
// ↳ drop_op(): Lifecycle → Ignored(boxed_data)
// ↳ slab entry kept alive
// once kernel finishes and posts cqe
// ↳ complete(): if MORE=0, self.remove()
// ↳ slab entry finally removed
```
Implements io_uring provided [buffer rings](https://man7.org/linux/man-pages/man3/io_uring_register_buf_ring.3.html). The kernel picks buffers directly from a shared ring. Buffer State Example: Ring entries (slots) and buffer memory are decoupled. When returning a buffer, we write its address to slot[tail & 3] (next available slot), not its "original" slot. ```rust let ring = UserRingBuf::new(4, 1500, 0)?; let mut stream = socket.recv_multishot(ring)?; // INITIAL tail=4 // ring slots: [slot0→buf0, slot1→buf1, slot2→buf2, slot3→buf3] // user holds: (none) sender.send_to(b"aaa", addr).await; let a = stream.next().await?; // kernel picks slot0→buf0, gives to user // STEP 1 tail=4 // ring slots: [slot0:empty, slot1→buf1, slot2→buf2, slot3→buf3] // user holds: a=buf0 sender.send_to(b"bbb", addr).await; let b = stream.next().await?; // kernel picks slot1→buf1, gives to user // STEP 2 tail=4 // ring slots: [slot0:empty, slot1:empty, slot2→buf2, slot3→buf3] // user holds: a=buf0, b=buf1 sender.send_to(b"ccc", addr).await; let c = stream.next().await?; // kernel picks slot2→buf2 drop(c); // return_buffer(buf2): // slot[4 & 3] = slot0 → buf2, tail=5 // STEP 3 tail=5 // ring slots: [slot0→buf2, slot1:empty, slot2:empty, slot3→buf3] // ↑ buf2 now in slot0 (was slot2) // user holds: a=buf0, b=buf1 sender.send_to(b"ddd", addr).await; let d = stream.next().await?; // kernel picks slot3→buf3 drop(d); // return_buffer(buf3): // slot[5 & 3] = slot1 → buf3, tail=6 // STEP 4 tail=6 // ring slots: [slot0→buf2, slot1→buf3, slot2:empty, slot3:empty] // ↑ buf3 now in slot1 (was slot3) // user holds: a=buf0, b=buf1 // // kernel cycles buf2↔buf3 via slots 0,1 // buf0,buf1 memory untouched - not in any slot drop(b); // slot[6 & 3] = slot2 → buf1, tail=7 drop(a); // slot[7 & 3] = slot3 → buf0, tail=8 // FINAL tail=8 // ring slots: [slot0→buf2, slot1→buf3, slot2→buf1, slot3→buf0] // user holds: (none) ``` For UserRecvMsgRingBuf, each buffer holds header + address + payload. Parser type parameter determines sockaddr size at compile time. ```bash ┌────────────────┬─────────────────────┬──────────────────────┐ │ RecvMsgOut Hdr │ sockaddr_in[6] │ Payload │ │ (16 bytes) │ (16-28 bytes) │ (variable) │ ├────────────────┴─────────────────────┼──────────────────────┤ │ parsed by parse_recvmsg() │ returned as RawBuffer│ └──────────────────────────────────────┴──────────────────────┘ ```
Generic `MultishotOp<T>` wrapper for io_uring multishot operations. Submit once, receive multiple completions until IORING_CQE_F_MORE=0. io_uring multishot operations: - recv multishot - with provided buffers (implemented) - recvmsg multishot - with provided buffers (implemented) - accept multishot - IORING_ACCEPT_MULTISHOT (not implemented) - poll multishot - IORING_POLL_ADD_MULTI (not implemented) - timeout multishot - IORING_TIMEOUT_MULTISHOT, since kernel 6.4 (not implemented) - read multishot - IORING_OP_READ_MULTISHOT, since kernel 6.4 (not implemented) To add new multishot ops: implement OpAble with uring_op() returning the multishot SQE, wrap in `MultishotOp<T>`.
| Ready(MultishotCqe), | ||
| Terminated(MultishotCqe), | ||
| Pending, | ||
| Done, |
There was a problem hiding this comment.
should be possible to remove this
Uses [recv_multishot](https://man7.org/linux/man-pages/man3/io_uring_prep_recv_multishot.3.html) and [recvmsg_multishot](https://man7.org/linux/man-pages/man3/io_uring_prep_recvmsg_multishot.3.html). ```rust pub struct RecvMultishot<R: RingBuf> { ring: R, // owns the buffer ring op: MultishotOp<RecvMultishotOp>, // owns the io_uring operation cancellation_guard: Option<AssociateGuard>, } pub struct RecvMultishotStream<'a, R: RingBuf> { ring: &'a R, // borrows ring op: &'a mut MultishotOp<RecvMultishotOp>, // borrows op mutably cancellation_guard: &'a Option<AssociateGuard>, } ``` This separation enforces a single mutable poller (stream holds `&mut op`), while allowing multiple buffers to be held simultaneously without borrow conflicts (buffers only need `&ring`). ```rust let mut multishot = socket.recv_multishot(ring)?; let mut stream = multishot.stream(); let buf1 = stream.next().await?; // Buffer<'a> let buf2 = stream.next().await?; // Buffer<'a>, buf1 still valid let buf3 = stream.next().await?; // all three coexist process(&buf1, &buf2, &buf3); drop(buf3); drop(buf2); drop(buf1); // returned to ring in any order ``` Ring is reclaimable only after the multishot operation is fully terminated: 1. Kernel posts final CQE with IORING_CQE_F_MORE=0 (no more completions coming) 2. User drains all queued completions until stream.next() returns None Termination happens naturally on error (e.g. ENOBUFS) or can be triggered via cancellation: ```rust canceller.cancel(); // sends AsyncCancel SQE to io_uring // kernel posts final CQE with MORE=0 while stream.next().await.is_some() {} // drain queue until None // None means: terminated AND queue empty drop(stream); // release borrow let ring = multishot.try_into_ring()?; // Ok only if terminated, Err(self) otherwise ring.unregister()?; // safe now: no in-flight buffers ```
e0613ed to
17721c0
Compare
|
Looks good, but there's some format issue. Thanks a lot. I'll review it asap. |
There was a problem hiding this comment.
Pull request overview
This PR adds io_uring multishot recv/recvmsg support with provided buffer rings, enabling efficient zero-copy UDP packet reception. The implementation extends the lifecycle state machine to handle multishot operations that produce multiple completions per submission, implements buffer ring management for io_uring provided buffers, and adds cancelable multishot recv/recvmsg operations for UDP sockets.
Key changes:
- Extended lifecycle state machine with
Multishotvariant that queues completions until terminated (IORING_CQE_F_MORE=0) - Implemented
UserRingBufandUserRecvMsgRingBuffor managing io_uring provided buffer rings with automatic registration/unregistration - Added
MultishotOpwrapper andRecvMultishotOp/RecvMsgMultishotOpoperations with stream-based API
Reviewed changes
Copilot reviewed 12 out of 12 changed files in this pull request and generated 8 comments.
Show a summary per file
| File | Description |
|---|---|
| monoio/src/driver/uring/lifecycle.rs | Added Multishot lifecycle variant with completion queue, termination tracking, and poll_multishot method |
| monoio/src/buf/ring_buf.rs | Implemented UserRingBuf with mmap-based ring entries, buffer return logic, and recvmsg parsing support with IPv4/IPv6/Any parsers |
| monoio/src/driver/op.rs | Added MultishotOp wrapper with poll_next/is_terminated methods and buffer_id extraction from completion flags |
| monoio/src/driver/op/recv_multishot.rs | Implemented RecvMultishotOp and RecvMsgMultishotOp with io_uring opcode integration |
| monoio/src/net/udp.rs | Added recv_multishot/recvmsg_multishot APIs with stream-based interface and cancellation support |
| monoio/src/driver/uring/mod.rs | Added submit_multishot_with, poll_multishot_op, register/unregister_buf_ring methods; changed insert to use type parameter |
| monoio/src/driver/mod.rs | Added driver layer wrappers for multishot and buffer ring operations |
| monoio/src/io/util/cancel.rs | Added is_operation_canceled helper and changed to use libc::ECANCELED constant |
| monoio/src/io/util/mod.rs | Exported AssociateGuard and is_operation_canceled for internal use |
| monoio/src/io/mod.rs | Updated exports to include internal cancellation utilities |
| monoio/src/buf/mod.rs | Exported ring buffer types and parsers |
| monoio/tests/udp.rs | Added comprehensive tests for multishot recv/recvmsg covering buffer reuse, exhaustion/recovery, cancellation, and held buffers |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| impl<P: RecvMsgParser> UserRecvMsgRingBuf<P> { | ||
| /// Create new recvmsg buffer ring. | ||
| pub fn new(buf_count: u16, payload_size: usize, group_id: u16) -> io::Result<Self> { | ||
| let total_size = P::min_buffer_size() + payload_size; |
There was a problem hiding this comment.
The calculation P::min_buffer_size() + payload_size could overflow usize, leading to a smaller-than-expected buffer allocation and potential memory corruption. Consider using checked_add() and returning an error if overflow occurs.
| let total_size = P::min_buffer_size() + payload_size; | |
| let total_size = P::min_buffer_size() | |
| .checked_add(payload_size) | |
| .ok_or_else(|| io::Error::new(io::ErrorKind::InvalidInput, "buffer size overflow"))?; |
| let parsed = io_uring::types::RecvMsgOut::parse(raw, msghdr) | ||
| .map_err(|()| io::Error::new(io::ErrorKind::InvalidData, "failed to parse recvmsg"))?; | ||
| let addr = P::parse_address(parsed.name_data())?; | ||
| let payload_len = parsed.payload_data().len(); |
There was a problem hiding this comment.
The calculation len - payload_len could underflow if payload_len > len, resulting in a panic or incorrect value. While RecvMsgOut::parse should guarantee this doesn't happen, consider adding defensive validation or documenting this invariant more explicitly.
| let payload_len = parsed.payload_data().len(); | |
| let payload_len = parsed.payload_data().len(); | |
| if payload_len > len { | |
| return Err(io::Error::new( | |
| io::ErrorKind::InvalidData, | |
| "recvmsg payload length exceeds buffer length", | |
| )); | |
| } |
|
|
||
| canceller.cancel(); | ||
| while stream.stream().next().await.is_some() {} | ||
| stream.try_into_ring().ok().unwrap().unregister().unwrap(); |
There was a problem hiding this comment.
Using .ok().unwrap() is redundant and provides a less helpful panic message than calling .unwrap() or .expect() directly on the Result. The Result variant would include information about why try_into_ring failed (operation not terminated), while the Option variant just says "called Option::unwrap() on a None value". Consider using .expect("operation should be terminated") instead.
| fn return_buffer(&self, buf_id: u16) { | ||
| let ptr = self.buf_ring_ptr.expect("UserRingBuf already unmapped"); | ||
| let tail = unsafe { self.shared_tail.as_ref() }; | ||
| let local_tail = tail.load(Ordering::Relaxed); |
There was a problem hiding this comment.
The atomic load on line 362 uses Relaxed ordering, but the store on line 375 uses Release ordering. This appears to be intentional for performance (Relaxed load is cheaper), and should be safe since we're only reading a local copy to calculate the ring index. However, consider documenting why Relaxed is safe here, as the asymmetry might confuse readers. The Release store ensures the buffer entry updates are visible before the tail is updated.
| } | ||
| Lifecycle::Completed(..) => { | ||
| self.remove(); | ||
| *waker = Some(cx.waker().clone()); |
There was a problem hiding this comment.
The waker is being unconditionally replaced without checking if the new waker will wake the same task. This is inconsistent with the single-shot poll_op implementation (line 143) which uses will_wake() to avoid unnecessary cloning. Consider checking will_wake() here to avoid unnecessary waker clones when the same task polls multiple times.
| *waker = Some(cx.waker().clone()); | |
| match waker { | |
| Some(existing) if existing.will_wake(cx.waker()) => {} | |
| _ => { | |
| *waker = Some(cx.waker().clone()); | |
| } | |
| } |
| return Err(io::Error::last_os_error()); | ||
| } | ||
|
|
||
| let buf_ring_ptr = NonNull::new(ring_mem as *mut BufRingEntry).expect("mmap returned null"); |
There was a problem hiding this comment.
The expect message "mmap returned null" is misleading. Since MAP_FAILED is checked on line 258 and MAP_FAILED is defined as (void*)-1 (not null), the NonNull::new should never return None here. Consider using unwrap() or changing the message to something like "unexpected: mmap returned valid address but NonNull creation failed".
| let buf_ring_ptr = NonNull::new(ring_mem as *mut BufRingEntry).expect("mmap returned null"); | |
| let buf_ring_ptr = NonNull::new(ring_mem as *mut BufRingEntry) | |
| .expect("unexpected: mmap returned valid address but NonNull creation failed"); |
| let buf_ring_ptr = NonNull::new(ring_mem as *mut BufRingEntry).expect("mmap returned null"); | ||
|
|
||
| let buf_len = buf_size as usize; | ||
| let mut data = vec![0u8; buf_count as usize * buf_len].into_boxed_slice(); |
There was a problem hiding this comment.
The buffer allocation on line 265 could panic or allocate excessive memory due to integer overflow. The multiplication buf_count as usize * buf_len is not checked. For example, buf_count=32768 (max power of 2 for u16) with buf_size=4MB would attempt to allocate 128GB. Consider adding validation to check for overflow using checked_mul() and/or enforce reasonable maximum values for total buffer size.
| let mut data = vec![0u8; buf_count as usize * buf_len].into_boxed_slice(); | |
| let total_len = (buf_count as usize) | |
| .checked_mul(buf_len) | |
| .ok_or_else(|| { | |
| io::Error::new( | |
| io::ErrorKind::InvalidInput, | |
| "buffer ring total size overflows usize", | |
| ) | |
| })?; | |
| let mut data = vec![0u8; total_len].into_boxed_slice(); |
| } | ||
| let ring_entries = buf_count; | ||
| let entry_size = std::mem::size_of::<BufRingEntry>(); | ||
| let ring_mmap_size = entry_size * ring_entries as usize; |
There was a problem hiding this comment.
The ring size calculation entry_size * ring_entries as usize could overflow. While less likely than the buffer allocation issue since BufRingEntry is smaller, consider using checked_mul() for defensive programming.
If you have time for review and interested in the changes I can also split this PR into 4 separate ones, i kept it this way to better demonstrate the scope of changes. Otherwise would appreciate any comments on the implementation.
1. feat: add multishot lifecycle support
Extends the lifecycle state machine to handle multishot operations.
Unlike single-shot operations that produce one completion per submission,
multishot operations yield multiple completions until explicitly terminated via the IORING_CQE_F_MORE flag.
Example: single-shot recv
Example: multishot recv
2. feat: add ringbuf implementation
Implements io_uring provided buffer rings. The kernel picks buffers directly from a shared ring.
Buffer State Example:
Ring entries (slots) and buffer memory are decoupled. When returning a buffer, we write its address to slot[tail & 3] (next available slot), not its "original" slot.
For UserRecvMsgRingBuf, each buffer holds header + address + payload. Parser type parameter determines sockaddr size at compile time.
┌────────────────┬─────────────────────┬──────────────────────┐ │ RecvMsgOut Hdr │ sockaddr_in[6] │ Payload │ │ (16 bytes) │ (16-28 bytes) │ (variable) │ ├────────────────┴─────────────────────┼──────────────────────┤ │ parsed by parse_recvmsg() │ returned as RawBuffer│ └──────────────────────────────────────┴──────────────────────┘3. feat: add multishot op
Generic
MultishotOp<T>wrapper for io_uring multishot operations.io_uring multishot operations:
To add new multishot ops: implement OpAble with uring_op() returning the multishot SQE, wrap in
MultishotOp<T>.4. feat: add recv/recvmsg multishot with tests
Uses recv_multishot
and recvmsg_multishot.
This separation enforces a single mutable poller (stream holds
&mut op),while allowing multiple buffers to be held simultaneously without borrow conflicts (buffers only need
&ring).Ring is reclaimable only after the multishot operation is fully terminated:
Termination happens naturally on error (e.g. ENOBUFS) or can be triggered via cancellation: