Skip to content

Commit 0cd7262

Browse files
committed
WIP: add MT support to Crystal::EventLoop::IoUring - part 3 [skip ci]
1 parent 58377ec commit 0cd7262

File tree

5 files changed

+133
-38
lines changed

5 files changed

+133
-38
lines changed

src/crystal/event_loop/io_uring.cr

Lines changed: 35 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,9 @@ class Crystal::EventLoop::IoUring < Crystal::EventLoop
4444
# we require are only available in Linux 5.18. The event loop is thus
4545
# incompatible with:
4646
#
47+
# FIXME: while IORING_OP_ASYNC_CANCEL was introduced in Linux 5.5, the
48+
# IORING_ASYNC_CANCEL_FD flag was only introduced in Linux 5.19.
49+
#
4750
# - Linux 5.4 LTS (EOL Dec 2025)
4851
# - Linux 5.10 SLTS (EOL Jan 2031)
4952
# - Linux 5.15 LTS (EOL Dec 2026)
@@ -70,10 +73,9 @@ class Crystal::EventLoop::IoUring < Crystal::EventLoop
7073
System::IoUring.supports_opcode?(LibC::IORING_OP_MSG_RING)
7174
end
7275

73-
DEFAULT_SQ_ENTRIES = 16
74-
DEFAULT_CQ_ENTRIES = 128
75-
# DEFAULT_SQ_THREAD_IDLE = 2000
76-
DEFAULT_SQ_THREAD_IDLE = nil
76+
DEFAULT_SQ_ENTRIES = 16
77+
DEFAULT_CQ_ENTRIES = 128
78+
DEFAULT_SQ_THREAD_IDLE = nil # 200 (in milliseconds)
7779

7880
CLOSE_RING_EVENT = GC.malloc(sizeof(Event)).as(Event*)
7981

@@ -377,13 +379,23 @@ class Crystal::EventLoop::IoUring < Crystal::EventLoop
377379
# search a waiting ring to wakeup
378380
return unless waiting_ring = @rings.find(&.try(&.waiting?))
379381

380-
# try to use the local ring and fallback to the actual ring (we might
381-
# interrupt from a raw thread)
382-
ring = ring? || waiting_ring
382+
# try to notify the waiting ring through the local ring (every scheduler
383+
# should have one), fallback to a syscall (Linux 6.13) or to a cross ring
384+
# submit for older kernels
385+
ring = ring?
386+
ring = waiting_ring if ring.nil? && !System::IoUring.supports_register_send_msg_ring?
383387

384-
ring.submit do |sqe|
385-
sqe.value.opcode = LibC::IORING_OP_MSG_RING
386-
sqe.value.fd = waiting_ring.fd
388+
if ring
389+
ring.submit do |sqe|
390+
sqe.value.opcode = LibC::IORING_OP_MSG_RING
391+
sqe.value.fd = waiting_ring.fd
392+
end
393+
else
394+
sqe = LibC::IoUringSqe.new
395+
sqe.opcode = LibC::IORING_OP_MSG_RING
396+
sqe.fd = waiting_ring.fd
397+
ret = System::Syscall.io_uring_register(-1, LibC::IORING_REGISTER_SEND_MSG_RING, pointerof(sqe).as(Void*), 1)
398+
raise RuntimeError.from_os_error("io_uring_register(IORING_REGISTER_SEND_MSG_RING)", Errno.new(-ret)) if ret < 0
387399
end
388400
end
389401

@@ -487,9 +499,19 @@ class Crystal::EventLoop::IoUring < Crystal::EventLoop
487499
return unless fd = file_descriptor.close_volatile_fd?
488500

489501
# FIXME: we must submit an IORING_OP_ASYNC_CANCEL to every ring (across
490-
# execution contexts) that has a pending READ/WRITE/POLL operation on the
491-
# IO::FileDescriptor; we can't just link CANCEL to CLOSE on the local
492-
# ring...
502+
# execution contexts) that has a pending IORING_OP_READ or IORING_OP_WRITE
503+
# operation on the IO::FileDescriptor; we can't just link CANCEL to CLOSE on
504+
# the local ring (requires SQ lock so any thread can submit to the ring).
505+
#
506+
# NOTE: IORING_OP_POLL_ADD operations don't need to be canceled (POLLERR
507+
# and/or POLLHUP / POLLRDHUP are enough).
508+
#
509+
# TODO: consider IORING_REGISTER_SYNC_CANCEL (Linux 6.0) that is synchronous
510+
# (bad) but allows to target any ring safely and to skip the SQ lock for
511+
# _every_ operation (good).
512+
#
513+
# OPTIMIZE: if the pending ops are on this ring (lucky) we can keep
514+
# submitting an IORING_OP_ASYNC_CANCEL linked to an IORING_OP_CLOSE.
493515

494516
async_close(fd) do |sqe|
495517
# one thread closing a fd won't interrupt reads or writes happening in

src/crystal/event_loop/io_uring/ring.cr

Lines changed: 26 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -4,27 +4,43 @@ class Crystal::EventLoop::IoUring < Crystal::EventLoop
44
# Extends the system abstraction with additional data and helpers tailored
55
# for the event loop implementation.
66
class Ring < System::IoUring
7-
# TODO: not needed after <https://github.com/crystal-lang/crystal/issues/16157>
8-
@rng = Random::PCG32.new
9-
10-
@sq_lock = Thread::Mutex.new
11-
@cq_lock = Thread::Mutex.new
127
getter? waiting : Bool = false
138

9+
def initialize
10+
# TODO: not needed after <https://github.com/crystal-lang/crystal/issues/16157>
11+
@rng = Random::PCG32.new
12+
13+
{% if flag?(:preview_mt) %}
14+
# unless IORING_REGISTER_SYNC_CANCEL (Linux 6.0) and
15+
# IORING_REGISTER_SEND_MSG_RING (Linux 6.13) are supported we may have
16+
# multiple threads trying to submit to the ring (on evloop interrupt and
17+
# before closing an IO::FileDescriptor)
18+
unless System::IoUring.supports_register_sync_cancel? && System::IoUring.supports_register_send_msg_ring?
19+
@sq_lock = Thread::Mutex.new
20+
end
21+
{% end %}
22+
23+
{% if flag?(:execution_context) %}
24+
@cq_lock = Thread::Mutex.new
25+
{% end %}
26+
end
27+
1428
def waiting(&)
1529
@waiting = true
1630
yield
1731
ensure
1832
@waiting = false
1933
end
2034

21-
# Acquires the SQ lock for the duration of the block.
35+
# Acquires the SQ lock for the duration of the block if required.
2236
def sq_lock(&)
23-
{% if flag?(:execution_context) %}
24-
@sq_lock.synchronize { yield }
25-
{% else %}
26-
yield
37+
{% if flag?(:preview_mt) %}
38+
if sq_lock = @sq_lock
39+
return sq_lock.synchronize { yield }
40+
end
2741
{% end %}
42+
43+
yield
2844
end
2945

3046
# Acquires the CQ lock for the duration of the block.

src/crystal/system/unix/io_uring.cr

Lines changed: 35 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
require "c/linux/io_uring"
22
require "./syscall"
3-
require "./eventfd"
43

54
# WARNING: while the syscalls are thread safe, the rings and the overall
65
# abstraction are not: accesses to the SQ and CQ rings aren't synchronized!
@@ -14,6 +13,8 @@ class Crystal::System::IoUring
1413
@@opcodes : Slice(LibC::IoUringProbeOp)?
1514

1615
class_getter?(supported : Bool) { check_kernel_support? }
16+
class_getter? supports_register_send_msg_ring : Bool = false
17+
class_getter? supports_register_sync_cancel : Bool = false
1718

1819
def self.check_kernel_support? : Bool
1920
# try with "no sqarray" flag first (available since linux 6.6)
@@ -32,20 +33,35 @@ class Crystal::System::IoUring
3233
@@no_sqarray = false
3334
end
3435

36+
# record supported features
3537
@@features = params.features
3638

3739
begin
40+
# probe supported opcodes
3841
probe_sz = sizeof(LibC::IoUringProbe) + LibC::IORING_OP_LAST * sizeof(LibC::IoUringProbeOp)
39-
probe = GC.malloc(probe_sz).as(LibC::IoUringProbe*)
42+
probe = GC.malloc_atomic(probe_sz).as(LibC::IoUringProbe*)
4043
raise RuntimeError.from_errno("malloc") unless probe
4144
LibIntrinsics.memset(probe, 0_u8, probe_sz, false)
4245

4346
ret = Syscall.io_uring_register(fd, LibC::IORING_REGISTER_PROBE, probe.as(Void*), LibC::IORING_OP_LAST)
4447
raise RuntimeError.from_os_error("io_uring_register", Errno.new(-ret)) if ret < 0
4548

4649
@@opcodes = Slice(LibC::IoUringProbeOp).new(probe.value.ops.to_unsafe, LibC::IORING_OP_LAST)
50+
51+
# probe supported register opcodes (must test one by one)
52+
sqe = LibC::IoUringSqe.new
53+
sqe.opcode = LibC::IORING_OP_MSG_RING
54+
sqe.fd = fd
55+
ret = System::Syscall.io_uring_register(-1, LibC::IORING_REGISTER_SEND_MSG_RING, pointerof(sqe).as(Void*), 1)
56+
@@supports_register_send_msg_ring = ret == 0
57+
58+
reg = LibC::IoUringSyncCancelReg.new
59+
reg.flags = LibC::IORING_ASYNC_CANCEL_FD
60+
reg.fd = 1
61+
ret = System::Syscall.io_uring_register(fd, LibC::IORING_REGISTER_SYNC_CANCEL, pointerof(reg).as(Void*), 1)
62+
@@supports_register_sync_cancel = ret != -LibC::ENOENT
4763
ensure
48-
LibC.close(fd) # if fd > 0
64+
LibC.close(fd)
4965
end
5066

5167
true
@@ -55,6 +71,13 @@ class Crystal::System::IoUring
5571
(@@features.not_nil! & feature) == feature
5672
end
5773

74+
def self.supports_register_send_msg_ring? : Bool
75+
sqe = LibC::IoUringSqe.new
76+
sqe.opcode = LibC::IORING_OP_MSG_RING
77+
sqe.fd = waiting_ring.fd
78+
ret = System::Syscall.io_uring_register(-1, LibC::IORING_REGISTER_SEND_MSG_RING, pointerof(sqe).as(Void*), 1)
79+
end
80+
5881
def self.supports_opcode?(opcode : UInt32) : Bool
5982
(@@opcodes.not_nil![opcode].flags & LibC::IO_URING_OP_SUPPORTED) == LibC::IO_URING_OP_SUPPORTED
6083
end
@@ -192,7 +215,6 @@ class Crystal::System::IoUring
192215
LibC.munmap(@sqes, @sqes_size)
193216

194217
LibC.close(fd)
195-
@eventfd.try(&.close)
196218
end
197219

198220
def sq_poll? : Bool
@@ -204,17 +226,18 @@ class Crystal::System::IoUring
204226
(sq_flags & LibC::IORING_SQ_NEED_WAKEUP) == LibC::IORING_SQ_NEED_WAKEUP
205227
end
206228

207-
# Call `io_uring_register` syscall, and raises on errno.
229+
# Call `io_uring_register` syscall. Raises on error.
208230
def register(opcode : UInt32, arg : Pointer | Nil = nil, arg_sz = 0) : Nil
209-
argp = arg ? arg.as(Void*) : Pointer(Void).null
210-
err = Syscall.io_uring_register(@fd, opcode, argp, arg_sz.to_u32)
211-
raise RuntimeError.from_os_error("io_uring_register", Errno.new(-err)) if err < 0
231+
if errno = register?(opcode, arg, arg_sz)
232+
raise RuntimeError.from_os_error("io_uring_register", errno)
233+
end
212234
end
213235

214-
# Register an `EventFD`.
215-
def register(@eventfd : EventFD) : Nil
216-
efd = eventfd.fd
217-
register(LibC::IORING_REGISTER_EVENTFD, pointerof(efd), 1)
236+
# Call `io_uring_register` syscall. Returns Errno on error.
237+
def register?(opcode : UInt32, arg : Pointer | Nil = nil, arg_sz = 0) : Errno?
238+
argp = arg ? arg.as(Void*) : Pointer(Void).null
239+
err = Syscall.io_uring_register(@fd, opcode, argp, arg_sz.to_u32)
240+
Errno.new(-err) if err < 0
218241
end
219242

220243
# Makes sure there is at least *count* SQE available in the SQ ring so we can

src/lib_c/x86_64-linux-gnu/c/linux/io_uring.cr

Lines changed: 32 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
require "../time"
2+
13
lib LibC
24
# IORING_FILE_INDEX_ALLOC = ~0_u32
35

@@ -25,7 +27,8 @@ lib LibC
2527
# IORING_SETUP_DEFER_TASKRUN = 1_u32 << 13
2628
# IORING_SETUP_NO_MMAP = 1_u32 << 14
2729
# IORING_SETUP_REGISTERED_FD_ONLY = 1_u32 << 15
28-
IORING_SETUP_NO_SQARRAY = 1_u32 << 16
30+
IORING_SETUP_NO_SQARRAY = 1_u32 << 16
31+
IORING_SETUP_HYBRID_IOPOLL = 1_u32 << 17
2932

3033
# IORING_TIMEOUT_ABS = 1_u32 << 0
3134
# IORING_TIMEOUT_UPDATE = 1_u32 << 1
@@ -86,6 +89,7 @@ lib LibC
8689
futex_flags : UInt32
8790
install_fd_flags : UInt32
8891
nop_flags : UInt32
92+
pipe_flags : UInt32
8993
end
9094

9195
struct IoUringSqe
@@ -155,6 +159,16 @@ lib LibC
155159
ts : UInt64
156160
end
157161

162+
struct IoUringSyncCancelReg
163+
addr : UInt64
164+
fd : Int32
165+
flags : UInt32
166+
timeout : KernelTimespec
167+
opcode : UInt8
168+
pad : UInt8[7]
169+
pad2 : UInt64[3]
170+
end
171+
158172
IORING_FEAT_SINGLE_MMAP = 1_u32 << 0
159173
IORING_FEAT_NODROP = 1_u32 << 1
160174
IORING_FEAT_SUBMIT_STABLE = 1_u32 << 2
@@ -169,6 +183,10 @@ lib LibC
169183
IORING_FEAT_CQE_SKIP = 1_u32 << 11
170184
IORING_FEAT_LINKED_FILE = 1_u32 << 12
171185
IORING_FEAT_REG_REG_RING = 1_u32 << 13
186+
IORING_FEAT_RECVSEND_BUNDLE = 1_u32 << 14
187+
IORING_FEAT_MIN_TIMEOUT = 1_u32 << 15
188+
IORING_FEAT_RW_ATTR = 1_u32 << 16
189+
IORING_FEAT_NO_IOWAIT = 1_u32 << 17
172190

173191
IORING_OP_NOP = 0_u32
174192
IORING_OP_READV = 1_u32
@@ -228,7 +246,12 @@ lib LibC
228246
IORING_OP_FTRUNCATE = 55_u32
229247
IORING_OP_BIND = 56_u32
230248
IORING_OP_LISTEN = 57_u32
231-
IORING_OP_LAST = 58_u32
249+
IORING_OP_RECV_ZC = 59_u32
250+
IORING_OP_EPOLL_WAIT = 60_u32
251+
IORING_OP_READV_FIXED = 61_u32
252+
IORING_OP_WRITEV_FIXED = 62_u32
253+
IORING_OP_PIPE = 63_u32
254+
IORING_OP_LAST = 64_u32
232255

233256
IORING_OFF_SQ_RING = 0_u64
234257
IORING_OFF_CQ_RING = 0x8000000_u64
@@ -266,7 +289,13 @@ lib LibC
266289
IORING_REGISTER_PBUF_STATUS = 26_u32
267290
IORING_REGISTER_NAPI = 27_u32
268291
IORING_UNREGISTER_NAPI = 28_u32
269-
IORING_REGISTER_LAST = 29_u32
292+
IORING_REGISTER_CLOCK = 29_u32
293+
IORING_REGISTER_CLONE_BUFFERS = 30_u32
294+
IORING_REGISTER_SEND_MSG_RING = 31_u32
295+
IORING_REGISTER_ZCRX_IFQ = 32_u32
296+
IORING_REGISTER_RESIZE_RINGS = 33_u32
297+
IORING_REGISTER_MEM_REGION = 34_u32
298+
IORING_REGISTER_LAST = 35_u32
270299
IORING_REGISTER_USE_REGISTERED_RING = 1_u32 << 31
271300

272301
IO_URING_OP_SUPPORTED = 1_u32

src/lib_c/x86_64-linux-gnu/c/time.cr

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,11 @@ lib LibC
1818
tm_zone : Char*
1919
end
2020

21+
struct KernelTimespec
22+
tv_sec : LongLong
23+
tv_nsec : LongLong
24+
end
25+
2126
struct Timespec
2227
tv_sec : TimeT
2328
tv_nsec : Long

0 commit comments

Comments
 (0)