Skip to content

Commit 845e1d6

Browse files
committed
caller_runs execution should not block the work queue
caller_runs execution was being done while the executor lock was held. This caused the entire executor’s work queue to be blocked, with worker threads not able to take new work. We’ve restructured so that the execution is done as deferred work, outside of the lock.
1 parent 4cfc0a1 commit 845e1d6

File tree

5 files changed

+70
-17
lines changed

5 files changed

+70
-17
lines changed

lib/concurrent-ruby/concurrent/executor/abstract_executor_service.rb

Lines changed: 16 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -75,28 +75,31 @@ def auto_terminate=(value)
7575

7676
private
7777

78-
# Handler which executes the `fallback_policy` once the queue size
79-
# reaches `max_queue`.
78+
# Returns an action which executes the `fallback_policy` once the queue
79+
# size reaches `max_queue`. The reason for the indirection of an action
80+
# is so that the work can be deferred outside of synchronization.
8081
#
8182
# @param [Array] args the arguments to the task which is being handled.
8283
#
8384
# @!visibility private
84-
def handle_fallback(*args)
85+
def fallback_action(*args)
8586
case fallback_policy
8687
when :abort
87-
raise RejectedExecutionError
88+
lambda { raise RejectedExecutionError }
8889
when :discard
89-
false
90+
lambda { false }
9091
when :caller_runs
91-
begin
92-
yield(*args)
93-
rescue => ex
94-
# let it fail
95-
log DEBUG, ex
96-
end
97-
true
92+
lambda {
93+
begin
94+
yield(*args)
95+
rescue => ex
96+
# let it fail
97+
log DEBUG, ex
98+
end
99+
true
100+
}
98101
else
99-
fail "Unknown fallback policy #{fallback_policy}"
102+
lambda { fail "Unknown fallback policy #{fallback_policy}" }
100103
end
101104
end
102105

lib/concurrent-ruby/concurrent/executor/java_executor_service.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ class JavaExecutorService < AbstractExecutorService
2020

2121
def post(*args, &task)
2222
raise ArgumentError.new('no block given') unless block_given?
23-
return handle_fallback(*args, &task) unless running?
23+
return fallback_action(*args, &task).call unless running?
2424
@executor.submit Job.new(args, task)
2525
true
2626
rescue Java::JavaUtilConcurrent::RejectedExecutionException

lib/concurrent-ruby/concurrent/executor/ruby_executor_service.rb

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,19 @@ def initialize(*args, &block)
1616

1717
def post(*args, &task)
1818
raise ArgumentError.new('no block given') unless block_given?
19+
deferred_action = nil
1920
synchronize do
2021
# If the executor is shut down, reject this task
21-
return handle_fallback(*args, &task) unless running?
22-
ns_execute(*args, &task)
22+
unless running?
23+
deferred_action = fallback_action(*args, &task)
24+
break
25+
end
26+
deferred_action = ns_execute(*args, &task)
27+
end
28+
29+
if deferred_action
30+
deferred_action.call
31+
else
2332
true
2433
end
2534
end

lib/concurrent-ruby/concurrent/executor/ruby_thread_pool_executor.rb

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -156,10 +156,11 @@ def ns_execute(*args, &task)
156156
if ns_assign_worker(*args, &task) || ns_enqueue(*args, &task)
157157
@scheduled_task_count += 1
158158
else
159-
handle_fallback(*args, &task)
159+
return fallback_action(*args, &task)
160160
end
161161

162162
ns_prune_pool if @next_gc_time < Concurrent.monotonic_time
163+
nil
163164
end
164165

165166
# @!visibility private

spec/concurrent/executor/thread_pool_executor_shared.rb

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -616,6 +616,46 @@
616616
executor << proc { latch.count_down }
617617
latch.wait(0.1)
618618
end
619+
620+
specify '#post does not block other jobs running on the worker threads' do
621+
log = Queue.new
622+
623+
# Using a custom instance
624+
executor = described_class.new(
625+
min_threads: 1,
626+
max_threads: 1,
627+
max_queue: 1,
628+
fallback_policy: :caller_runs)
629+
630+
worker_unblocker = Concurrent::CountDownLatch.new(1)
631+
executor_unblocker = Concurrent::CountDownLatch.new(1)
632+
queue_done = Concurrent::CountDownLatch.new(1)
633+
634+
# Block the worker thread
635+
executor << proc { worker_unblocker.wait }
636+
637+
# Fill the queue
638+
executor << proc { log.push :queued; queue_done.count_down }
639+
640+
# Block in a caller_runs job
641+
caller_runs_thread = Thread.new {
642+
executor << proc { executor_unblocker.wait; log.push :unblocked }
643+
}
644+
645+
# Wait until the caller_runs job is blocked
646+
Thread.pass until caller_runs_thread.status == 'sleep'
647+
648+
# Now unblock the worker thread
649+
worker_unblocker.count_down
650+
queue_done.wait
651+
executor_unblocker.count_down
652+
653+
# Tidy up
654+
caller_runs_thread.join
655+
656+
# We will see the queued jobs run before the caller_runs job unblocks
657+
expect([log.pop, log.pop]).to eq [:queued, :unblocked]
658+
end
619659
end
620660
end
621661
end

0 commit comments

Comments
 (0)