Skip to content

Commit bda5216

Browse files
committed
io_uring: make CQ ring wakeups be more efficient
For batched IO, it's not uncommon for waiters to ask for more than 1 IO to complete before being woken up. This is a problem with wait_event() since tasks will get woken for every IO that completes, re-check condition, then go back to sleep. For batch counts on the order of what you do for high IOPS, that can result in 10s of extra wakeups for the waiting task. Add a private wake function that checks for the wake up count criteria being met before calling autoremove_wake_function(). Pavel reports that one test case he has runs 40% faster with proper batching of wakeups. Reported-by: Pavel Begunkov <[email protected]> Tested-by: Pavel Begunkov <[email protected]> Reviewed-by: Pavel Begunkov <[email protected]> Signed-off-by: Jens Axboe <[email protected]>
1 parent daa5de5 commit bda5216

File tree

1 file changed

+56
-10
lines changed

1 file changed

+56
-10
lines changed

fs/io_uring.c

Lines changed: 56 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -2768,15 +2768,55 @@ static int io_ring_submit(struct io_ring_ctx *ctx, unsigned int to_submit,
27682768
return submit;
27692769
}
27702770

2771+
struct io_wait_queue {
2772+
struct wait_queue_entry wq;
2773+
struct io_ring_ctx *ctx;
2774+
unsigned to_wait;
2775+
unsigned nr_timeouts;
2776+
};
2777+
2778+
static inline bool io_should_wake(struct io_wait_queue *iowq)
2779+
{
2780+
struct io_ring_ctx *ctx = iowq->ctx;
2781+
2782+
/*
2783+
* Wake up if we have enough events, or if a timeout occured since we
2784+
* started waiting. For timeouts, we always want to return to userspace,
2785+
* regardless of event count.
2786+
*/
2787+
return io_cqring_events(ctx->rings) >= iowq->to_wait ||
2788+
atomic_read(&ctx->cq_timeouts) != iowq->nr_timeouts;
2789+
}
2790+
2791+
static int io_wake_function(struct wait_queue_entry *curr, unsigned int mode,
2792+
int wake_flags, void *key)
2793+
{
2794+
struct io_wait_queue *iowq = container_of(curr, struct io_wait_queue,
2795+
wq);
2796+
2797+
if (!io_should_wake(iowq))
2798+
return -1;
2799+
2800+
return autoremove_wake_function(curr, mode, wake_flags, key);
2801+
}
2802+
27712803
/*
27722804
* Wait until events become available, if we don't already have some. The
27732805
* application must reap them itself, as they reside on the shared cq ring.
27742806
*/
27752807
static int io_cqring_wait(struct io_ring_ctx *ctx, int min_events,
27762808
const sigset_t __user *sig, size_t sigsz)
27772809
{
2810+
struct io_wait_queue iowq = {
2811+
.wq = {
2812+
.private = current,
2813+
.func = io_wake_function,
2814+
.entry = LIST_HEAD_INIT(iowq.wq.entry),
2815+
},
2816+
.ctx = ctx,
2817+
.to_wait = min_events,
2818+
};
27782819
struct io_rings *rings = ctx->rings;
2779-
unsigned nr_timeouts;
27802820
int ret;
27812821

27822822
if (io_cqring_events(rings) >= min_events)
@@ -2795,15 +2835,21 @@ static int io_cqring_wait(struct io_ring_ctx *ctx, int min_events,
27952835
return ret;
27962836
}
27972837

2798-
nr_timeouts = atomic_read(&ctx->cq_timeouts);
2799-
/*
2800-
* Return if we have enough events, or if a timeout occured since
2801-
* we started waiting. For timeouts, we always want to return to
2802-
* userspace.
2803-
*/
2804-
ret = wait_event_interruptible(ctx->wait,
2805-
io_cqring_events(rings) >= min_events ||
2806-
atomic_read(&ctx->cq_timeouts) != nr_timeouts);
2838+
ret = 0;
2839+
iowq.nr_timeouts = atomic_read(&ctx->cq_timeouts);
2840+
do {
2841+
prepare_to_wait_exclusive(&ctx->wait, &iowq.wq,
2842+
TASK_INTERRUPTIBLE);
2843+
if (io_should_wake(&iowq))
2844+
break;
2845+
schedule();
2846+
if (signal_pending(current)) {
2847+
ret = -ERESTARTSYS;
2848+
break;
2849+
}
2850+
} while (1);
2851+
finish_wait(&ctx->wait, &iowq.wq);
2852+
28072853
restore_saved_sigmask_unless(ret == -ERESTARTSYS);
28082854
if (ret == -ERESTARTSYS)
28092855
ret = -EINTR;

0 commit comments

Comments
 (0)