Skip to content

Commit 58377ec

Browse files
committed
WIP: add MT support to Crystal::EventLoop::IoUring - part 2 [skip ci]
Now with timers and a bunch of fixes to make it compile.
1 parent 60d7778 commit 58377ec

File tree

8 files changed

+247
-120
lines changed

8 files changed

+247
-120
lines changed

src/crystal/event_loop/io_uring.cr

Lines changed: 200 additions & 103 deletions
Large diffs are not rendered by default.

src/crystal/event_loop/io_uring/event.cr

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
require "crystal/pointer_pairing_heap"
2+
13
struct Crystal::EventLoop::IoUring::Event
24
enum Type
35
Async
@@ -12,6 +14,10 @@ struct Crystal::EventLoop::IoUring::Event
1214
# property! flags : UInt32
1315

1416
getter? timeout : Time::Span?
17+
property! wake_at : Time::Span?
18+
19+
# The event can be added to the `Timers` list.
20+
include PointerPairingHeap::Node
1521

1622
# When using SQPOLL (or when IORING_FEAT_SUBMIT_STABLE is missing) the pointer
1723
# to the timespec struct must be reachable until the SQE has been successfully
@@ -33,4 +39,8 @@ struct Crystal::EventLoop::IoUring::Event
3339
protected def timespec : Pointer(LibC::Timespec)
3440
pointerof(@timespec)
3541
end
42+
43+
def heap_compare(other : Pointer(self)) : Bool
44+
wake_at < other.value.wake_at
45+
end
3646
end

src/crystal/event_loop/io_uring/fiber_event.cr

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,13 +7,14 @@ class Crystal::EventLoop::IoUring::FiberEvent
77
end
88

99
def add(timeout : Time::Span) : Nil
10-
@event.timeout = timeout
10+
s, ns = System::Time.monotonic
11+
@event.wake_at = Time::Span.new(seconds: s, nanoseconds: ns) + timeout
1112
EventLoop.current.as(IoUring).add_timer(pointerof(@event))
1213
end
1314

1415
# select timeout has been cancelled
1516
def delete : Nil
16-
return unless @event.timeout?
17+
return unless @event.wake_at?
1718
EventLoop.current.as(IoUring).delete_timer(pointerof(@event))
1819
clear
1920
end
@@ -25,6 +26,6 @@ class Crystal::EventLoop::IoUring::FiberEvent
2526

2627
# the timer triggered (already dequeued from eventloop)
2728
def clear : Nil
28-
@event.timeout = nil
29+
@event.wake_at = nil
2930
end
3031
end

src/crystal/event_loop/io_uring/ring.cr

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,25 +1,24 @@
11
require "../../system/unix/io_uring"
22

33
class Crystal::EventLoop::IoUring < Crystal::EventLoop
4+
# Extends the system abstraction with additional data and helpers tailored
5+
# for the event loop implementation.
46
class Ring < System::IoUring
5-
@tick = 0_u32
7+
# TODO: not needed after <https://github.com/crystal-lang/crystal/issues/16157>
68
@rng = Random::PCG32.new
9+
710
@sq_lock = Thread::Mutex.new
811
@cq_lock = Thread::Mutex.new
9-
1012
getter? waiting : Bool = false
1113

12-
def once_in_a_while? : Bool
13-
(@tick &+= 1) == 53
14-
end
15-
1614
def waiting(&)
1715
@waiting = true
1816
yield
1917
ensure
2018
@waiting = false
2119
end
2220

21+
# Acquires the SQ lock for the duration of the block.
2322
def sq_lock(&)
2423
{% if flag?(:execution_context) %}
2524
@sq_lock.synchronize { yield }
@@ -28,6 +27,7 @@ class Crystal::EventLoop::IoUring < Crystal::EventLoop
2827
{% end %}
2928
end
3029

30+
# Acquires the CQ lock for the duration of the block.
3131
def cq_lock(&)
3232
{% if flag?(:execution_context) %}
3333
@cq_lock.synchronize { yield }
@@ -36,6 +36,8 @@ class Crystal::EventLoop::IoUring < Crystal::EventLoop
3636
{% end %}
3737
end
3838

39+
# Tries to acquire the CQ lock for the duration of the block. Returns
40+
# immediately if the CQ lock couldn't be acquired.
3941
def cq_trylock?(&)
4042
{% if flag?(:execution_context) %}
4143
if @cq_lock.try_lock
@@ -50,6 +52,7 @@ class Crystal::EventLoop::IoUring < Crystal::EventLoop
5052
{% end %}
5153
end
5254

55+
# Locks the SQ ring, reserves exactly one SQE and submits before returning.
5356
def submit(&)
5457
sq_lock do
5558
sqe = next_sqe
@@ -60,6 +63,8 @@ class Crystal::EventLoop::IoUring < Crystal::EventLoop
6063
end
6164
end
6265

66+
# Locks the SQ ring, reserves as many SQE as needed to fill *sqes* and
67+
# submits before returning.
6368
def submit(sqes, &)
6469
sq_lock do
6570
reserve(sqes.size)

src/crystal/event_loop/timers.cr

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,13 +18,20 @@ struct Crystal::EventLoop::Timers(T)
1818
@heap.empty?
1919
end
2020

21-
# Returns the time of the next ready timer (if any).
21+
# Returns the absolute time of the next ready timer (if any).
2222
def next_ready? : Time::Span?
2323
if event = @heap.first?
2424
event.value.wake_at
2525
end
2626
end
2727

28+
# Returns the relative time until the next ready timer (if any).
29+
def next_ready_as_timeout? : Time::Span?
30+
if abstime = next_ready?
31+
abstime - now
32+
end
33+
end
34+
2835
# Dequeues and yields each ready timer (their `#wake_at` is lower than
2936
# `System::Time.monotonic`) from the oldest to the most recent (i.e. time
3037
# ascending).

src/crystal/system/unix/io_uring.cr

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ class Crystal::System::IoUring
5959
(@@opcodes.not_nil![opcode].flags & LibC::IO_URING_OP_SUPPORTED) == LibC::IO_URING_OP_SUPPORTED
6060
end
6161

62-
@fd : Int32
62+
getter fd : Int32
6363
@flags : UInt32
6464
@sq_size : UInt32
6565
@cq_size : UInt32
@@ -90,6 +90,10 @@ class Crystal::System::IoUring
9090
params.flags |= LibC::IORING_SETUP_ATTACH_WQ
9191
end
9292

93+
# The following is boilerplate code to map the rings from kernel land to
94+
# user land, with some slight differences based on the running kernel
95+
# supported features (for example single mmap, no SQ array).
96+
9397
Crystal.trace :evloop, "io_uring_setup"
9498
@fd = Syscall.io_uring_setup(sq_entries.to_u32, pointerof(params))
9599
raise RuntimeError.from_os_error("io_uring_setup", Errno.new(-@fd)) if @fd < 0
@@ -278,17 +282,15 @@ class Crystal::System::IoUring
278282
# Call `io_uring_enter` syscall. Panics on EBADR (can't recover from lost
279283
# CQE), returns -EINTR or -EBUSY, and raises on other errnos, otherwise
280284
# returns the int returned by the syscall.
281-
def enter(to_submit : UInt32 = 0, min_complete : UInt32 = 0, flags : UInt32 = 0, timeout : Time::Span?) : Int32
285+
def enter(to_submit : Int = 0, min_complete : Int = 0, flags : UInt32 = 0, timeout : ::Time::Span? = nil) : Int32
282286
if timeout
283287
flags |= LibC::IORING_ENTER_EXT_ARG
284288

285289
ts = uninitialized LibC::Timespec
286290
ts.tv_sec = typeof(ts.tv_sec).new(timeout.@seconds)
287291
ts.tv_nsec = typeof(ts.tv_nsec).new(timeout.@nanoseconds)
288292

289-
args = LibC::IoUringGetEventsArg.new(
290-
ts : pointerof(ts).address.to_u64!
291-
)
293+
args = LibC::IoUringGetEventsArg.new(ts: pointerof(ts).address.to_u64!)
292294
arg = pointerof(args).as(Void*)
293295
argsz = LibC::SizeT.new(sizeof(LibC::IoUringGetEventsArg))
294296
else
@@ -303,7 +305,7 @@ class Crystal::System::IoUring
303305
timeout: timeout,
304306
flags: ENTERS.new(flags).to_s
305307

306-
ret = Syscall.io_uring_enter(@fd, to_submit, min_complete, flags, arg, argsz)
308+
ret = Syscall.io_uring_enter(@fd, to_submit.to_u32, min_complete.to_u32, flags, arg, argsz)
307309
return ret if ret >= 0
308310

309311
case ret

src/fiber/execution_context/parallel.cr

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -145,7 +145,7 @@ module Fiber::ExecutionContext
145145
# use an atomic/fence to make sure that @size can only be incremented
146146
# *after* the value has been written to @buffer.
147147
private def start_schedulers(capacity)
148-
capacity.times { |index| @schedulers << start_scheduler }
148+
capacity.times { |index| @schedulers << start_scheduler(index) }
149149
end
150150

151151
private def start_scheduler(index)

src/fiber/execution_context/scheduler.cr

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,11 @@ module Fiber::ExecutionContext
66
Thread.current.scheduler
77
end
88

9+
@[AlwaysInline]
10+
def self.current? : Scheduler?
11+
Thread.current.scheduler?
12+
end
13+
914
protected abstract def thread : Thread
1015
protected abstract def execution_context : ExecutionContext
1116

0 commit comments

Comments
 (0)