Skip to content

Commit ff1677d

Browse files
authored
Register execution context schedulers with the event loop (#16519)
Some event loops need to be aware of the execution context schedulers, to allocate and cleanup resources as schedulers are started and stopped. For example the io_uring evloop will need to keep a ring per thread, yet can reuse the main ring for the first scheduler (index 0), and also needs to keep a list of all the active rings. - Add `Crystal::EventLoop#initialize(parallelism)`. - Add `Crystal::EventLoop#register(scheduler, index)`. - Add `Crystal::EventLoop#unregister(scheduler)`.
1 parent e53f032 commit ff1677d

File tree

9 files changed

+48
-13
lines changed

9 files changed

+48
-13
lines changed

src/crystal/event_loop.cr

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,14 @@ abstract class Crystal::EventLoop
2020
{% end %}
2121
end
2222

23-
# Creates an event loop instance
24-
def self.create : self
25-
backend_class.new
23+
# Creates an event loop instance.
24+
#
25+
# The *parallelism* arg is informational. It reports how many schedulers are
26+
# expected to register with the event loop instance. Because schedulers are
27+
# dynamically started and execution contexts can be resized, more or less
28+
# schedulers may really register in practice.
29+
def self.create(parallelism : Int32 = 1) : self
30+
backend_class.new(parallelism)
2631
end
2732

2833
def self.default_file_blocking? : Bool
@@ -67,6 +72,14 @@ abstract class Crystal::EventLoop
6772
# enqueueing in parallel, so the caller is responsible and in control for
6873
# when and how the fibers will be enqueued.
6974
abstract def run(queue : Fiber::List*, blocking : Bool) : Nil
75+
76+
# Called once before *scheduler* is started. Optional hook.
77+
def register(scheduler : Fiber::ExecutionContext::Scheduler, index : Int32) : Nil
78+
end
79+
80+
# Called once before *scheduler* is shut down. Optional hook.
81+
def unregister(scheduler : Fiber::ExecutionContext::Scheduler) : Nil
82+
end
7083
{% end %}
7184

7285
# Tells a blocking run loop to no longer wait for events to activate. It may

src/crystal/event_loop/epoll.cr

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ require "../system/unix/eventfd"
44
require "../system/unix/timerfd"
55

66
class Crystal::EventLoop::Epoll < Crystal::EventLoop::Polling
7-
def initialize
7+
def initialize(parallelism : Int32)
88
# the epoll instance
99
@epoll = System::Epoll.new
1010

src/crystal/event_loop/iocp.cr

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ class Crystal::EventLoop::IOCP < Crystal::EventLoop
2626
@timer_packet = LibC::HANDLE.null
2727
@timer_key : System::IOCP::CompletionKey?
2828

29-
def initialize
29+
def initialize(parallelism : Int32)
3030
@mutex = Thread::Mutex.new
3131
@timers = Timers(Timer).new
3232

src/crystal/event_loop/kqueue.cr

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ class Crystal::EventLoop::Kqueue < Crystal::EventLoop::Polling
1010
@pipe = uninitialized LibC::Int[2]
1111
{% end %}
1212

13-
def initialize
13+
def initialize(parallelism : Int32)
1414
# the kqueue instance
1515
@kqueue = System::Kqueue.new
1616

src/crystal/event_loop/libevent.cr

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,9 @@ class Crystal::EventLoop::LibEvent < Crystal::EventLoop
1212

1313
private getter(event_base) { Crystal::EventLoop::LibEvent::Event::Base.new }
1414

15+
def initialize(parallelism : Int32)
16+
end
17+
1518
{% unless flag?(:preview_mt) %}
1619
# Reinitializes the event loop after a fork.
1720
def after_fork : Nil

src/crystal/event_loop/wasi.cr

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,9 @@ class Crystal::EventLoop::Wasi < Crystal::EventLoop
88
false
99
end
1010

11+
def initialize(parallelism : Int32)
12+
end
13+
1114
# Runs the event loop.
1215
def run(blocking : Bool) : Bool
1316
raise NotImplementedError.new("Crystal::Wasi::EventLoop.run")

src/fiber/execution_context/isolated.cr

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,11 @@ module Fiber::ExecutionContext
4848
@main_fiber : Fiber
4949

5050
# :nodoc:
51-
getter(event_loop : Crystal::EventLoop) { Crystal::EventLoop.create }
51+
getter(event_loop : Crystal::EventLoop) do
52+
evloop = Crystal::EventLoop.create(parallelism: 1)
53+
evloop.register(self, index: 0)
54+
evloop
55+
end
5256

5357
getter? running : Bool = true
5458
@enqueued = false
@@ -207,7 +211,12 @@ module Fiber::ExecutionContext
207211
@running = false
208212
@wait_list.consume_each(&.value.enqueue)
209213
end
210-
ExecutionContext.execution_contexts.delete(self)
214+
215+
begin
216+
@event_loop.try(&.unregister(self))
217+
ensure
218+
ExecutionContext.execution_contexts.delete(self)
219+
end
211220
end
212221

213222
# Blocks the calling fiber until the isolated context fiber terminates.

src/fiber/execution_context/parallel.cr

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ module Fiber::ExecutionContext
6868
getter stack_pool : Fiber::StackPool = Fiber::StackPool.new
6969

7070
# :nodoc:
71-
getter event_loop : Crystal::EventLoop = Crystal::EventLoop.create
71+
getter event_loop : Crystal::EventLoop
7272
@event_loop_lock = Atomic(Bool).new(false)
7373

7474
@parked = Atomic(Int32).new(0)
@@ -107,6 +107,7 @@ module Fiber::ExecutionContext
107107
@global_queue = GlobalQueue.new(@mutex)
108108
@schedulers = Array(Scheduler).new(capacity)
109109
@threads = Array(Thread).new(capacity)
110+
@event_loop = Crystal::EventLoop.create(capacity)
110111

111112
@rng = Random::PCG32.new
112113

@@ -144,9 +145,13 @@ module Fiber::ExecutionContext
144145
# use an atomic/fence to make sure that @size can only be incremented
145146
# *after* the value has been written to @buffer.
146147
private def start_schedulers(capacity)
147-
capacity.times do |index|
148-
@schedulers << Scheduler.new(self, "#{@name}-#{index}")
149-
end
148+
capacity.times { |index| @schedulers << start_scheduler(index) }
149+
end
150+
151+
private def start_scheduler(index)
152+
scheduler = Scheduler.new(self, "#{@name}-#{index}")
153+
@event_loop.register(scheduler, index)
154+
scheduler
150155
end
151156

152157
# Attaches *scheduler* to the current `Thread`, usually the process' main
@@ -204,7 +209,7 @@ module Fiber::ExecutionContext
204209

205210
if new_capacity > old_capacity
206211
@schedulers = Array(Scheduler).new(new_capacity) do |index|
207-
old_schedulers[index]? || Scheduler.new(self, "#{@name}-#{index}")
212+
old_schedulers[index]? || start_scheduler(index)
208213
end
209214
threads = Array(Thread).new(new_capacity)
210215
old_threads.each { |thread| threads << thread }

src/fiber/execution_context/parallel/scheduler.cr

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -152,6 +152,8 @@ module Fiber::ExecutionContext
152152
Crystal.print_error_buffered("BUG: %s#run_loop [%s] crashed",
153153
self.class.name, @name, exception: exception)
154154
end
155+
ensure
156+
@event_loop.unregister(self)
155157
end
156158

157159
private def find_next_runnable : Fiber?

0 commit comments

Comments
 (0)