Skip to content

Commit 0617bb5

Browse files
committed
io_uring/msg_ring: improve handling of target CQE posting
Use the exported helper for queueing task_work for message passing, rather than rolling our own. Note that this is only done for strict data messages for now, file descriptor passing messages still rely on the kernel task_work. It could get converted at some point if it's performance critical. This improves peak performance of message passing by about 5x in some basic testing, with 2 threads just sending messages to each other. Before this change, it was capped at around 700K/sec, with the change it's at over 4M/sec. Signed-off-by: Jens Axboe <[email protected]>
1 parent f33096a commit 0617bb5

File tree

1 file changed

+47
-43
lines changed

1 file changed

+47
-43
lines changed

io_uring/msg_ring.c

Lines changed: 47 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@
1313
#include "filetable.h"
1414
#include "msg_ring.h"
1515

16-
1716
/* All valid masks for MSG_RING */
1817
#define IORING_MSG_RING_MASK (IORING_MSG_RING_CQE_SKIP | \
1918
IORING_MSG_RING_FLAGS_PASS)
@@ -71,54 +70,43 @@ static inline bool io_msg_need_remote(struct io_ring_ctx *target_ctx)
7170
return target_ctx->task_complete;
7271
}
7372

74-
static int io_msg_exec_remote(struct io_kiocb *req, task_work_func_t func)
73+
static void io_msg_tw_complete(struct io_kiocb *req, struct io_tw_state *ts)
7574
{
76-
struct io_ring_ctx *ctx = req->file->private_data;
77-
struct io_msg *msg = io_kiocb_to_cmd(req, struct io_msg);
78-
struct task_struct *task = READ_ONCE(ctx->submitter_task);
79-
80-
if (unlikely(!task))
81-
return -EOWNERDEAD;
75+
struct io_ring_ctx *ctx = req->ctx;
8276

83-
init_task_work(&msg->tw, func);
84-
if (task_work_add(task, &msg->tw, TWA_SIGNAL))
85-
return -EOWNERDEAD;
77+
io_add_aux_cqe(ctx, req->cqe.user_data, req->cqe.res, req->cqe.flags);
78+
kmem_cache_free(req_cachep, req);
79+
percpu_ref_put(&ctx->refs);
80+
}
8681

87-
return IOU_ISSUE_SKIP_COMPLETE;
82+
static void io_msg_remote_post(struct io_ring_ctx *ctx, struct io_kiocb *req,
83+
int res, u32 cflags, u64 user_data)
84+
{
85+
req->cqe.user_data = user_data;
86+
io_req_set_res(req, res, cflags);
87+
percpu_ref_get(&ctx->refs);
88+
req->ctx = ctx;
89+
req->task = READ_ONCE(ctx->submitter_task);
90+
req->io_task_work.func = io_msg_tw_complete;
91+
io_req_task_work_add_remote(req, ctx, IOU_F_TWQ_LAZY_WAKE);
8892
}
8993

90-
static void io_msg_tw_complete(struct callback_head *head)
94+
static int io_msg_data_remote(struct io_kiocb *req)
9195
{
92-
struct io_msg *msg = container_of(head, struct io_msg, tw);
93-
struct io_kiocb *req = cmd_to_io_kiocb(msg);
9496
struct io_ring_ctx *target_ctx = req->file->private_data;
95-
int ret = 0;
96-
97-
if (current->flags & PF_EXITING) {
98-
ret = -EOWNERDEAD;
99-
} else {
100-
u32 flags = 0;
101-
102-
if (msg->flags & IORING_MSG_RING_FLAGS_PASS)
103-
flags = msg->cqe_flags;
104-
105-
/*
106-
* If the target ring is using IOPOLL mode, then we need to be
107-
* holding the uring_lock for posting completions. Other ring
108-
* types rely on the regular completion locking, which is
109-
* handled while posting.
110-
*/
111-
if (target_ctx->flags & IORING_SETUP_IOPOLL)
112-
mutex_lock(&target_ctx->uring_lock);
113-
if (!io_post_aux_cqe(target_ctx, msg->user_data, msg->len, flags))
114-
ret = -EOVERFLOW;
115-
if (target_ctx->flags & IORING_SETUP_IOPOLL)
116-
mutex_unlock(&target_ctx->uring_lock);
117-
}
97+
struct io_msg *msg = io_kiocb_to_cmd(req, struct io_msg);
98+
struct io_kiocb *target;
99+
u32 flags = 0;
118100

119-
if (ret < 0)
120-
req_set_fail(req);
121-
io_req_queue_tw_complete(req, ret);
101+
target = kmem_cache_alloc(req_cachep, GFP_KERNEL);
102+
if (unlikely(!target))
103+
return -ENOMEM;
104+
105+
if (msg->flags & IORING_MSG_RING_FLAGS_PASS)
106+
flags = msg->cqe_flags;
107+
108+
io_msg_remote_post(target_ctx, target, msg->len, flags, msg->user_data);
109+
return 0;
122110
}
123111

124112
static int io_msg_ring_data(struct io_kiocb *req, unsigned int issue_flags)
@@ -136,7 +124,7 @@ static int io_msg_ring_data(struct io_kiocb *req, unsigned int issue_flags)
136124
return -EBADFD;
137125

138126
if (io_msg_need_remote(target_ctx))
139-
return io_msg_exec_remote(req, io_msg_tw_complete);
127+
return io_msg_data_remote(req);
140128

141129
if (msg->flags & IORING_MSG_RING_FLAGS_PASS)
142130
flags = msg->cqe_flags;
@@ -216,6 +204,22 @@ static void io_msg_tw_fd_complete(struct callback_head *head)
216204
io_req_queue_tw_complete(req, ret);
217205
}
218206

207+
static int io_msg_fd_remote(struct io_kiocb *req)
208+
{
209+
struct io_ring_ctx *ctx = req->file->private_data;
210+
struct io_msg *msg = io_kiocb_to_cmd(req, struct io_msg);
211+
struct task_struct *task = READ_ONCE(ctx->submitter_task);
212+
213+
if (unlikely(!task))
214+
return -EOWNERDEAD;
215+
216+
init_task_work(&msg->tw, io_msg_tw_fd_complete);
217+
if (task_work_add(task, &msg->tw, TWA_SIGNAL))
218+
return -EOWNERDEAD;
219+
220+
return IOU_ISSUE_SKIP_COMPLETE;
221+
}
222+
219223
static int io_msg_send_fd(struct io_kiocb *req, unsigned int issue_flags)
220224
{
221225
struct io_ring_ctx *target_ctx = req->file->private_data;
@@ -238,7 +242,7 @@ static int io_msg_send_fd(struct io_kiocb *req, unsigned int issue_flags)
238242
}
239243

240244
if (io_msg_need_remote(target_ctx))
241-
return io_msg_exec_remote(req, io_msg_tw_fd_complete);
245+
return io_msg_fd_remote(req);
242246
return io_msg_install_complete(req, issue_flags);
243247
}
244248

0 commit comments

Comments
 (0)