Skip to content

Commit 6427110

Browse files
Drop custom implementation of Fiber::ExecutionContext::Concurrent (#16135)
The `Concurrent` implementation mostly duplicates the `Parallel` one, and the parallel schedulers now take a few shortcuts when parallelism is set to 1 (skip stealing, quick evloop check) and `Concurrent` doesn't exhibit any noticeable performance improvement over `Parallel`. Having a single implementation instead of two, almost identical, will ease maintenance. I kept the type and didn't deprecate it yet. Its only value is that you can't mistakenly resize the context, and an exception is raised when you try to (related to #15956). Co-authored-by: Johannes Müller <[email protected]>
1 parent ca23c0b commit 6427110

File tree

2 files changed

+8
-260
lines changed

2 files changed

+8
-260
lines changed

spec/std/thread_spec.cr

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ pending_interpreted describe: Thread do
4646

4747
it "names the thread" do
4848
{% if flag?(:execution_context) %}
49-
Thread.current.name.should eq("DEFAULT")
49+
Thread.current.name.should eq("DEFAULT-0")
5050
{% else %}
5151
Thread.current.name.should be_nil
5252
{% end %}

src/fiber/execution_context/concurrent.cr

Lines changed: 7 additions & 259 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,4 @@
1-
require "./global_queue"
2-
require "./runnables"
3-
require "./scheduler"
1+
require "./parallel"
42

53
module Fiber::ExecutionContext
64
# Concurrent-only execution context.
@@ -55,267 +53,17 @@ module Fiber::ExecutionContext
5553
# In practice, we still recommended to always protect shared accesses to a
5654
# variable, for example using `Atomic#add` to increment *result* or a `Mutex`
5755
# for more complex operations.
58-
class Concurrent
59-
include ExecutionContext
60-
include ExecutionContext::Scheduler
61-
62-
getter name : String
63-
64-
protected getter thread : Thread
65-
@main_fiber : Fiber
66-
67-
@mutex : Thread::Mutex
68-
@global_queue : GlobalQueue
69-
@runnables : Runnables(256)
70-
71-
# :nodoc:
72-
getter stack_pool : Fiber::StackPool = Fiber::StackPool.new
73-
74-
# :nodoc:
75-
getter event_loop : Crystal::EventLoop = Crystal::EventLoop.create
76-
77-
@waiting = Atomic(Bool).new(false)
78-
@parked = Atomic(Bool).new(false)
79-
@spinning = Atomic(Bool).new(false)
80-
@tick : Int32 = 0
81-
82-
# :nodoc:
83-
protected def self.default : self
84-
new("DEFAULT", hijack: true)
56+
class Concurrent < Parallel
57+
def self.default : self
58+
new("DEFAULT", capacity: 1, hijack: true)
8559
end
8660

8761
def self.new(name : String) : self
88-
new(name, hijack: false)
89-
end
90-
91-
protected def initialize(@name : String, hijack : Bool)
92-
@mutex = Thread::Mutex.new
93-
@global_queue = GlobalQueue.new(@mutex)
94-
@runnables = Runnables(256).new(@global_queue)
95-
96-
@thread = uninitialized Thread
97-
@main_fiber = uninitialized Fiber
98-
@thread = hijack ? hijack_current_thread : start_thread
99-
100-
ExecutionContext.execution_contexts.push(self)
101-
end
102-
103-
# :nodoc:
104-
def execution_context : self
105-
self
106-
end
107-
108-
# :nodoc:
109-
def stack_pool? : Fiber::StackPool?
110-
@stack_pool
111-
end
112-
113-
# Initializes the scheduler on the current thread (usually the process'
114-
# main thread).
115-
private def hijack_current_thread : Thread
116-
thread = Thread.current
117-
thread.internal_name = @name
118-
thread.execution_context = self
119-
thread.scheduler = self
120-
@main_fiber = Fiber.new("#{@name}:loop", self) { run_loop }
121-
thread
122-
end
123-
124-
# Creates a new thread to initialize the scheduler.
125-
private def start_thread : Thread
126-
Thread.new(name: @name) do |thread|
127-
thread.execution_context = self
128-
thread.scheduler = self
129-
@main_fiber = thread.main_fiber
130-
@main_fiber.name = "#{@name}:loop"
131-
run_loop
132-
end
133-
end
134-
135-
# :nodoc:
136-
def spawn(*, name : String? = nil, same_thread : Bool, &block : ->) : Fiber
137-
# whatever the value of same_thread: the fibers will always run on the
138-
# same thread
139-
self.spawn(name: name, &block)
140-
end
141-
142-
# :nodoc:
143-
def enqueue(fiber : Fiber) : Nil
144-
if ExecutionContext.current? == self
145-
# local enqueue
146-
Crystal.trace :sched, "enqueue", fiber: fiber
147-
@runnables.push(fiber)
148-
else
149-
# cross context enqueue
150-
Crystal.trace :sched, "enqueue", fiber: fiber, to_context: self
151-
@global_queue.push(fiber)
152-
wake_scheduler
153-
end
154-
end
155-
156-
protected def reschedule : Nil
157-
Crystal.trace :sched, "reschedule"
158-
if fiber = quick_dequeue?
159-
resume fiber unless fiber == @thread.current_fiber
160-
else
161-
# nothing to do: switch back to the main loop to spin/park
162-
resume @main_fiber
163-
end
164-
end
165-
166-
protected def resume(fiber : Fiber) : Nil
167-
unless fiber.resumable?
168-
if fiber.dead?
169-
raise "BUG: tried to resume dead fiber #{fiber} (#{inspect})"
170-
else
171-
raise "BUG: can't resume running fiber #{fiber} (#{inspect})"
172-
end
173-
end
174-
swapcontext(fiber)
175-
end
176-
177-
private def quick_dequeue? : Fiber?
178-
# every once in a while: dequeue from global queue to avoid two fibers
179-
# constantly respawing each other to completely occupy the local queue
180-
if (@tick &+= 1) % 61 == 0
181-
if fiber = @global_queue.pop?
182-
return fiber
183-
end
184-
end
185-
186-
# try local queue
187-
if fiber = @runnables.shift?
188-
return fiber
189-
end
190-
191-
# try to refill local queue
192-
if fiber = @global_queue.grab?(@runnables, divisor: 1)
193-
return fiber
194-
end
195-
196-
# run the event loop to see if any event is activable
197-
list = Fiber::List.new
198-
@event_loop.run(pointerof(list), blocking: false)
199-
return enqueue_many(pointerof(list))
200-
end
201-
202-
private def run_loop : Nil
203-
Crystal.trace :sched, "started"
204-
205-
loop do
206-
if fiber = find_next_runnable
207-
spin_stop if @spinning.get(:relaxed)
208-
resume fiber
209-
else
210-
# the event loop enqueued a fiber (or was interrupted) or the
211-
# scheduler was unparked: go for the next iteration
212-
end
213-
rescue exception
214-
Crystal.print_error_buffered("BUG: %s#run_loop [%s] crashed",
215-
self.class.name, @name, exception: exception)
216-
end
217-
end
218-
219-
private def find_next_runnable : Fiber?
220-
find_next_runnable do |fiber|
221-
return fiber if fiber
222-
end
223-
end
224-
225-
private def find_next_runnable(&) : Nil
226-
list = Fiber::List.new
227-
228-
# nothing to do: start spinning
229-
spinning do
230-
yield @global_queue.grab?(@runnables, divisor: 1)
231-
232-
@event_loop.run(pointerof(list), blocking: false)
233-
yield enqueue_many(pointerof(list))
234-
end
235-
236-
# block on the event loop, waiting for pending event(s) to activate
237-
waiting do
238-
# there is a time window between stop spinning and start waiting during
239-
# which another context can enqueue a fiber, check again before waiting
240-
# on the event loop to avoid missing a runnable fiber
241-
yield @global_queue.grab?(@runnables, divisor: 1)
242-
243-
@event_loop.run(pointerof(list), blocking: true)
244-
yield enqueue_many(pointerof(list))
245-
246-
# the event loop was interrupted: restart the run loop
247-
return
248-
end
249-
end
250-
251-
private def enqueue_many(list : Fiber::List*) : Fiber?
252-
if fiber = list.value.pop?
253-
@runnables.bulk_push(list) unless list.value.empty?
254-
fiber
255-
end
256-
end
257-
258-
private def spinning(&)
259-
spin_start
260-
261-
4.times do |attempt|
262-
Thread.yield unless attempt == 0
263-
yield
264-
end
265-
266-
spin_stop
267-
end
268-
269-
private def spin_start : Nil
270-
@spinning.set(true, :release)
271-
end
272-
273-
private def spin_stop : Nil
274-
@spinning.set(false, :release)
275-
end
276-
277-
private def waiting(&)
278-
@waiting.set(true, :release)
279-
begin
280-
yield
281-
ensure
282-
@waiting.set(false, :release)
283-
end
284-
end
285-
286-
# This method runs in parallel to the rest of the ST scheduler!
287-
#
288-
# This is called from another context _after_ enqueueing into the global
289-
# queue to try and wakeup the ST thread running in parallel that may be
290-
# running, spinning or waiting on the event loop.
291-
private def wake_scheduler : Nil
292-
if @spinning.get(:acquire)
293-
return
294-
end
295-
296-
if @waiting.get(:acquire)
297-
@event_loop.interrupt
298-
end
299-
end
300-
301-
def inspect(io : IO) : Nil
302-
to_s(io)
303-
end
304-
305-
def to_s(io : IO) : Nil
306-
io << "#<" << self.class.name << ":0x"
307-
object_id.to_s(io, 16)
308-
io << ' ' << name << '>'
62+
new(name, capacity: 1, hijack: false)
30963
end
31064

311-
def status : String
312-
if @spinning.get(:relaxed)
313-
"spinning"
314-
elsif @waiting.get(:relaxed)
315-
"event-loop"
316-
else
317-
"running"
318-
end
65+
def resize(maximum : Int32) : Nil
66+
raise ArgumentError.new("Can't resize a concurrent context")
31967
end
32068
end
32169

0 commit comments

Comments
 (0)