Skip to content

Commit ae98dbf

Browse files
committed
io_uring/kbuf: add support for incremental buffer consumption
By default, any recv/read operation that uses provided buffers will consume at least 1 buffer fully (and maybe more, in case of bundles). This adds support for incremental consumption, meaning that an application may add large buffers, and each read/recv will just consume the part of the buffer that it needs. For example, let's say an application registers 1MB buffers in a provided buffer ring, for streaming receives. If it gets a short recv, then the full 1MB buffer will be consumed and passed back to the application. With incremental consumption, only the part that was actually used is consumed, and the buffer remains the current one. This means that both the application and the kernel needs to keep track of what the current receive point is. Each recv will still pass back a buffer ID and the size consumed, the only difference is that before the next receive would always be the next buffer in the ring. Now the same buffer ID may return multiple receives, each at an offset into that buffer from where the previous receive left off. Example: Application registers a provided buffer ring, and adds two 32K buffers to the ring. Buffer1 address: 0x1000000 (buffer ID 0) Buffer2 address: 0x2000000 (buffer ID 1) A recv completion is received with the following values: cqe->res 0x1000 (4k bytes received) cqe->flags 0x11 (CQE_F_BUFFER|CQE_F_BUF_MORE set, buffer ID 0) and the application now knows that 4096b of data is available at 0x1000000, the start of that buffer, and that more data from this buffer will be coming. Now the next receive comes in: cqe->res 0x2010 (8k bytes received) cqe->flags 0x11 (CQE_F_BUFFER|CQE_F_BUF_MORE set, buffer ID 0) which tells the application that 8k is available where the last completion left off, at 0x1001000. Next completion is: cqe->res 0x5000 (20k bytes received) cqe->flags 0x1 (CQE_F_BUFFER set, buffer ID 0) and the application now knows that 20k of data is available at 0x1003000, which is where the previous receive ended. CQE_F_BUF_MORE isn't set, as no more data is available in this buffer ID. The next completion is then: cqe->res 0x1000 (4k bytes received) cqe->flags 0x10001 (CQE_F_BUFFER|CQE_F_BUF_MORE set, buffer ID 1) which tells the application that buffer ID 1 is now the current one, hence there's 4k of valid data at 0x2000000. 0x2001000 will be the next receive point for this buffer ID. When a buffer will be reused by future CQE completions, IORING_CQE_BUF_MORE will be set in cqe->flags. This tells the application that the kernel isn't done with the buffer yet, and that it should expect more completions for this buffer ID. Will only be set by provided buffer rings setup with IOU_PBUF_RING INC, as that's the only type of buffer that will see multiple consecutive completions for the same buffer ID. For any other provided buffer type, any completion that passes back a buffer to the application is final. Once a buffer has been fully consumed, the buffer ring head is incremented and the next receive will indicate the next buffer ID in the CQE cflags. On the send side, the application can manage how much data is sent from an existing buffer by setting sqe->len to the desired send length. An application can request incremental consumption by setting IOU_PBUF_RING_INC in the provided buffer ring registration. Outside of that, any provided buffer ring setup and buffer additions is done like before, no changes there. The only change is in how an application may see multiple completions for the same buffer ID, hence needing to know where the next receive will happen. Note that like existing provided buffer rings, this should not be used with IOSQE_ASYNC, as both really require the ring to remain locked over the duration of the buffer selection and the operation completion. It will consume a buffer otherwise regardless of the size of the IO done. Signed-off-by: Jens Axboe <[email protected]>
1 parent 6733e67 commit ae98dbf

File tree

3 files changed

+82
-20
lines changed

3 files changed

+82
-20
lines changed

include/uapi/linux/io_uring.h

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -440,11 +440,21 @@ struct io_uring_cqe {
440440
* IORING_CQE_F_SOCK_NONEMPTY If set, more data to read after socket recv
441441
* IORING_CQE_F_NOTIF Set for notification CQEs. Can be used to distinct
442442
* them from sends.
443+
* IORING_CQE_F_BUF_MORE If set, the buffer ID set in the completion will get
444+
* more completions. In other words, the buffer is being
445+
* partially consumed, and will be used by the kernel for
446+
* more completions. This is only set for buffers used via
447+
* the incremental buffer consumption, as provided by
448+
* a ring buffer setup with IOU_PBUF_RING_INC. For any
449+
* other provided buffer type, all completions with a
450+
* buffer passed back is automatically returned to the
451+
* application.
443452
*/
444453
#define IORING_CQE_F_BUFFER (1U << 0)
445454
#define IORING_CQE_F_MORE (1U << 1)
446455
#define IORING_CQE_F_SOCK_NONEMPTY (1U << 2)
447456
#define IORING_CQE_F_NOTIF (1U << 3)
457+
#define IORING_CQE_F_BUF_MORE (1U << 4)
448458

449459
#define IORING_CQE_BUFFER_SHIFT 16
450460

@@ -716,9 +726,17 @@ struct io_uring_buf_ring {
716726
* mmap(2) with the offset set as:
717727
* IORING_OFF_PBUF_RING | (bgid << IORING_OFF_PBUF_SHIFT)
718728
* to get a virtual mapping for the ring.
729+
* IOU_PBUF_RING_INC: If set, buffers consumed from this buffer ring can be
730+
* consumed incrementally. Normally one (or more) buffers
731+
* are fully consumed. With incremental consumptions, it's
732+
* feasible to register big ranges of buffers, and each
733+
* use of it will consume only as much as it needs. This
734+
* requires that both the kernel and application keep
735+
* track of where the current read/recv index is at.
719736
*/
720737
enum io_uring_register_pbuf_ring_flags {
721738
IOU_PBUF_RING_MMAP = 1,
739+
IOU_PBUF_RING_INC = 2,
722740
};
723741

724742
/* argument for IORING_(UN)REGISTER_PBUF_RING */

io_uring/kbuf.c

Lines changed: 30 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -212,14 +212,25 @@ static int io_ring_buffers_peek(struct io_kiocb *req, struct buf_sel_arg *arg,
212212
buf = io_ring_head_to_buf(br, head, bl->mask);
213213
if (arg->max_len) {
214214
u32 len = READ_ONCE(buf->len);
215-
size_t needed;
216215

217216
if (unlikely(!len))
218217
return -ENOBUFS;
219-
needed = (arg->max_len + len - 1) / len;
220-
needed = min_not_zero(needed, (size_t) PEEK_MAX_IMPORT);
221-
if (nr_avail > needed)
222-
nr_avail = needed;
218+
/*
219+
* Limit incremental buffers to 1 segment. No point trying
220+
* to peek ahead and map more than we need, when the buffers
221+
* themselves should be large when setup with
222+
* IOU_PBUF_RING_INC.
223+
*/
224+
if (bl->flags & IOBL_INC) {
225+
nr_avail = 1;
226+
} else {
227+
size_t needed;
228+
229+
needed = (arg->max_len + len - 1) / len;
230+
needed = min_not_zero(needed, (size_t) PEEK_MAX_IMPORT);
231+
if (nr_avail > needed)
232+
nr_avail = needed;
233+
}
223234
}
224235

225236
/*
@@ -244,16 +255,21 @@ static int io_ring_buffers_peek(struct io_kiocb *req, struct buf_sel_arg *arg,
244255

245256
req->buf_index = buf->bid;
246257
do {
247-
/* truncate end piece, if needed */
248-
if (buf->len > arg->max_len)
249-
buf->len = arg->max_len;
258+
u32 len = buf->len;
259+
260+
/* truncate end piece, if needed, for non partial buffers */
261+
if (len > arg->max_len) {
262+
len = arg->max_len;
263+
if (!(bl->flags & IOBL_INC))
264+
buf->len = len;
265+
}
250266

251267
iov->iov_base = u64_to_user_ptr(buf->addr);
252-
iov->iov_len = buf->len;
268+
iov->iov_len = len;
253269
iov++;
254270

255-
arg->out_len += buf->len;
256-
arg->max_len -= buf->len;
271+
arg->out_len += len;
272+
arg->max_len -= len;
257273
if (!arg->max_len)
258274
break;
259275

@@ -675,7 +691,7 @@ int io_register_pbuf_ring(struct io_ring_ctx *ctx, void __user *arg)
675691

676692
if (reg.resv[0] || reg.resv[1] || reg.resv[2])
677693
return -EINVAL;
678-
if (reg.flags & ~IOU_PBUF_RING_MMAP)
694+
if (reg.flags & ~(IOU_PBUF_RING_MMAP | IOU_PBUF_RING_INC))
679695
return -EINVAL;
680696
if (!(reg.flags & IOU_PBUF_RING_MMAP)) {
681697
if (!reg.ring_addr)
@@ -713,6 +729,8 @@ int io_register_pbuf_ring(struct io_ring_ctx *ctx, void __user *arg)
713729
if (!ret) {
714730
bl->nr_entries = reg.ring_entries;
715731
bl->mask = reg.ring_entries - 1;
732+
if (reg.flags & IOU_PBUF_RING_INC)
733+
bl->flags |= IOBL_INC;
716734

717735
io_buffer_add_list(ctx, bl, reg.bgid);
718736
return 0;

io_uring/kbuf.h

Lines changed: 34 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,9 @@ enum {
99
IOBL_BUF_RING = 1,
1010
/* ring mapped provided buffers, but mmap'ed by application */
1111
IOBL_MMAP = 2,
12+
/* buffers are consumed incrementally rather than always fully */
13+
IOBL_INC = 4,
14+
1215
};
1316

1417
struct io_buffer_list {
@@ -124,24 +127,45 @@ static inline bool io_kbuf_recycle(struct io_kiocb *req, unsigned issue_flags)
124127
/* Mapped buffer ring, return io_uring_buf from head */
125128
#define io_ring_head_to_buf(br, head, mask) &(br)->bufs[(head) & (mask)]
126129

127-
static inline void io_kbuf_commit(struct io_kiocb *req,
130+
static inline bool io_kbuf_commit(struct io_kiocb *req,
128131
struct io_buffer_list *bl, int len, int nr)
129132
{
130133
if (unlikely(!(req->flags & REQ_F_BUFFERS_COMMIT)))
131-
return;
132-
bl->head += nr;
134+
return true;
135+
133136
req->flags &= ~REQ_F_BUFFERS_COMMIT;
137+
138+
if (unlikely(len < 0))
139+
return true;
140+
141+
if (bl->flags & IOBL_INC) {
142+
struct io_uring_buf *buf;
143+
144+
buf = io_ring_head_to_buf(bl->buf_ring, bl->head, bl->mask);
145+
if (WARN_ON_ONCE(len > buf->len))
146+
len = buf->len;
147+
buf->len -= len;
148+
if (buf->len) {
149+
buf->addr += len;
150+
return false;
151+
}
152+
}
153+
154+
bl->head += nr;
155+
return true;
134156
}
135157

136-
static inline void __io_put_kbuf_ring(struct io_kiocb *req, int len, int nr)
158+
static inline bool __io_put_kbuf_ring(struct io_kiocb *req, int len, int nr)
137159
{
138160
struct io_buffer_list *bl = req->buf_list;
161+
bool ret = true;
139162

140163
if (bl) {
141-
io_kbuf_commit(req, bl, len, nr);
164+
ret = io_kbuf_commit(req, bl, len, nr);
142165
req->buf_index = bl->bgid;
143166
}
144167
req->flags &= ~REQ_F_BUFFER_RING;
168+
return ret;
145169
}
146170

147171
static inline void __io_put_kbuf_list(struct io_kiocb *req, int len,
@@ -176,10 +200,12 @@ static inline unsigned int __io_put_kbufs(struct io_kiocb *req, int len,
176200
return 0;
177201

178202
ret = IORING_CQE_F_BUFFER | (req->buf_index << IORING_CQE_BUFFER_SHIFT);
179-
if (req->flags & REQ_F_BUFFER_RING)
180-
__io_put_kbuf_ring(req, len, nbufs);
181-
else
203+
if (req->flags & REQ_F_BUFFER_RING) {
204+
if (!__io_put_kbuf_ring(req, len, nbufs))
205+
ret |= IORING_CQE_F_BUF_MORE;
206+
} else {
182207
__io_put_kbuf(req, len, issue_flags);
208+
}
183209
return ret;
184210
}
185211

0 commit comments

Comments
 (0)