Skip to content

Commit f66f734

Browse files
isilenceaxboe
authored andcommitted
io_uring: skip spinlocking for ->task_complete
->task_complete was added to serialised CQE posting by doing it from the task context only (or fallback wq when the task is dead), and now we can use that to avoid taking ->completion_lock while filling CQ entries. The patch skips spinlocking only in two spots, __io_submit_flush_completions() and flushing in io_aux_cqe, it's safer and covers all cases we care about. Extra care is taken to force taking the lock while queueing overflow entries. It fundamentally relies on SINGLE_ISSUER to have only one task posting events. It also need to take into account overflowed CQEs, flushing of which happens in the cq wait path, and so this implementation also needs DEFER_TASKRUN to limit waiters. For the same reason we disable it for SQPOLL, and for IOPOLL as it won't benefit from it in any case. DEFER_TASKRUN, SQPOLL and IOPOLL requirement may be relaxed in the future. Signed-off-by: Pavel Begunkov <[email protected]> Link: https://lore.kernel.org/r/2a8c91fd82cfcdcc1d2e5bac7051fe2c183bda73.1670384893.git.asml.silence@gmail.com [axboe: modify to apply] Signed-off-by: Jens Axboe <[email protected]>
1 parent 6d043ee commit f66f734

File tree

2 files changed

+61
-20
lines changed

2 files changed

+61
-20
lines changed

io_uring/io_uring.c

Lines changed: 52 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -584,21 +584,36 @@ void __io_commit_cqring_flush(struct io_ring_ctx *ctx)
584584
io_eventfd_flush_signal(ctx);
585585
}
586586

587+
static inline void __io_cq_lock(struct io_ring_ctx *ctx)
588+
__acquires(ctx->completion_lock)
589+
{
590+
if (!ctx->task_complete)
591+
spin_lock(&ctx->completion_lock);
592+
}
593+
594+
static inline void __io_cq_unlock(struct io_ring_ctx *ctx)
595+
{
596+
if (!ctx->task_complete)
597+
spin_unlock(&ctx->completion_lock);
598+
}
599+
587600
/* keep it inlined for io_submit_flush_completions() */
588-
static inline void io_cq_unlock_post_inline(struct io_ring_ctx *ctx)
601+
static inline void __io_cq_unlock_post(struct io_ring_ctx *ctx)
589602
__releases(ctx->completion_lock)
590603
{
591604
io_commit_cqring(ctx);
592-
spin_unlock(&ctx->completion_lock);
593-
605+
__io_cq_unlock(ctx);
594606
io_commit_cqring_flush(ctx);
595607
io_cqring_wake(ctx);
596608
}
597609

598610
void io_cq_unlock_post(struct io_ring_ctx *ctx)
599611
__releases(ctx->completion_lock)
600612
{
601-
io_cq_unlock_post_inline(ctx);
613+
io_commit_cqring(ctx);
614+
spin_unlock(&ctx->completion_lock);
615+
io_commit_cqring_flush(ctx);
616+
io_cqring_wake(ctx);
602617
}
603618

604619
/* Returns true if there are no backlogged entries after the flush */
@@ -785,12 +800,13 @@ struct io_uring_cqe *__io_get_cqe(struct io_ring_ctx *ctx, bool overflow)
785800
return &rings->cqes[off];
786801
}
787802

788-
static bool io_fill_cqe_aux(struct io_ring_ctx *ctx, u64 user_data, s32 res, u32 cflags,
789-
bool allow_overflow)
803+
static bool io_fill_cqe_aux(struct io_ring_ctx *ctx, u64 user_data, s32 res,
804+
u32 cflags)
790805
{
791806
struct io_uring_cqe *cqe;
792807

793-
lockdep_assert_held(&ctx->completion_lock);
808+
if (!ctx->task_complete)
809+
lockdep_assert_held(&ctx->completion_lock);
794810

795811
ctx->cq_extra++;
796812

@@ -813,10 +829,6 @@ static bool io_fill_cqe_aux(struct io_ring_ctx *ctx, u64 user_data, s32 res, u32
813829
}
814830
return true;
815831
}
816-
817-
if (allow_overflow)
818-
return io_cqring_event_overflow(ctx, user_data, res, cflags, 0, 0);
819-
820832
return false;
821833
}
822834

@@ -830,7 +842,17 @@ static void __io_flush_post_cqes(struct io_ring_ctx *ctx)
830842
for (i = 0; i < state->cqes_count; i++) {
831843
struct io_uring_cqe *cqe = &state->cqes[i];
832844

833-
io_fill_cqe_aux(ctx, cqe->user_data, cqe->res, cqe->flags, true);
845+
if (!io_fill_cqe_aux(ctx, cqe->user_data, cqe->res, cqe->flags)) {
846+
if (ctx->task_complete) {
847+
spin_lock(&ctx->completion_lock);
848+
io_cqring_event_overflow(ctx, cqe->user_data,
849+
cqe->res, cqe->flags, 0, 0);
850+
spin_unlock(&ctx->completion_lock);
851+
} else {
852+
io_cqring_event_overflow(ctx, cqe->user_data,
853+
cqe->res, cqe->flags, 0, 0);
854+
}
855+
}
834856
}
835857
state->cqes_count = 0;
836858
}
@@ -841,7 +863,10 @@ static bool __io_post_aux_cqe(struct io_ring_ctx *ctx, u64 user_data, s32 res, u
841863
bool filled;
842864

843865
io_cq_lock(ctx);
844-
filled = io_fill_cqe_aux(ctx, user_data, res, cflags, allow_overflow);
866+
filled = io_fill_cqe_aux(ctx, user_data, res, cflags);
867+
if (!filled && allow_overflow)
868+
filled = io_cqring_event_overflow(ctx, user_data, res, cflags, 0, 0);
869+
845870
io_cq_unlock_post(ctx);
846871
return filled;
847872
}
@@ -865,10 +890,10 @@ bool io_aux_cqe(struct io_ring_ctx *ctx, bool defer, u64 user_data, s32 res, u32
865890
lockdep_assert_held(&ctx->uring_lock);
866891

867892
if (ctx->submit_state.cqes_count == length) {
868-
io_cq_lock(ctx);
893+
__io_cq_lock(ctx);
869894
__io_flush_post_cqes(ctx);
870895
/* no need to flush - flush is deferred */
871-
io_cq_unlock(ctx);
896+
__io_cq_unlock_post(ctx);
872897
}
873898

874899
/* For defered completions this is not as strict as it is otherwise,
@@ -1403,18 +1428,26 @@ static void __io_submit_flush_completions(struct io_ring_ctx *ctx)
14031428
struct io_wq_work_node *node, *prev;
14041429
struct io_submit_state *state = &ctx->submit_state;
14051430

1406-
io_cq_lock(ctx);
1431+
__io_cq_lock(ctx);
14071432
/* must come first to preserve CQE ordering in failure cases */
14081433
if (state->cqes_count)
14091434
__io_flush_post_cqes(ctx);
14101435
wq_list_for_each(node, prev, &state->compl_reqs) {
14111436
struct io_kiocb *req = container_of(node, struct io_kiocb,
14121437
comp_list);
14131438

1414-
if (!(req->flags & REQ_F_CQE_SKIP))
1415-
__io_fill_cqe_req(ctx, req);
1439+
if (!(req->flags & REQ_F_CQE_SKIP) &&
1440+
unlikely(!__io_fill_cqe_req(ctx, req))) {
1441+
if (ctx->task_complete) {
1442+
spin_lock(&ctx->completion_lock);
1443+
io_req_cqe_overflow(req);
1444+
spin_unlock(&ctx->completion_lock);
1445+
} else {
1446+
io_req_cqe_overflow(req);
1447+
}
1448+
}
14161449
}
1417-
io_cq_unlock_post_inline(ctx);
1450+
__io_cq_unlock_post(ctx);
14181451

14191452
if (!wq_list_empty(&ctx->submit_state.compl_reqs)) {
14201453
io_free_batch_list(ctx, state->compl_reqs.first);

io_uring/io_uring.h

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -133,7 +133,7 @@ static inline bool __io_fill_cqe_req(struct io_ring_ctx *ctx,
133133
*/
134134
cqe = io_get_cqe(ctx);
135135
if (unlikely(!cqe))
136-
return io_req_cqe_overflow(req);
136+
return false;
137137

138138
trace_io_uring_complete(req->ctx, req, req->cqe.user_data,
139139
req->cqe.res, req->cqe.flags,
@@ -156,6 +156,14 @@ static inline bool __io_fill_cqe_req(struct io_ring_ctx *ctx,
156156
return true;
157157
}
158158

159+
static inline bool io_fill_cqe_req(struct io_ring_ctx *ctx,
160+
struct io_kiocb *req)
161+
{
162+
if (likely(__io_fill_cqe_req(ctx, req)))
163+
return true;
164+
return io_req_cqe_overflow(req);
165+
}
166+
159167
static inline void req_set_fail(struct io_kiocb *req)
160168
{
161169
req->flags |= REQ_F_FAIL;

0 commit comments

Comments
 (0)