Skip to content

Commit c92d11e

Browse files
fzakariapitr-ch
authored andcommitted
Introduce ThreadPoolExecutor without a Queue
This adds the ability for a ThreadPoolExecutor to have a queue depth of 0. This is useful if you'd like to perform the rejection handler if no available threads are availabile. This is analogous to Java's use of SynchronousQueue in ThreadPoolExecutor. See https://stackoverflow.com/a/10186825/143733 for more details
1 parent 2b5fd71 commit c92d11e

File tree

4 files changed

+79
-5
lines changed

4 files changed

+79
-5
lines changed

lib/concurrent-ruby/concurrent/executor/java_thread_pool_executor.rb

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,12 +21,18 @@ class JavaThreadPoolExecutor < JavaExecutorService
2121
# @!macro thread_pool_executor_constant_default_thread_timeout
2222
DEFAULT_THREAD_IDLETIMEOUT = 60
2323

24+
# @!macro thread_pool_executor_constant_default_synchronous
25+
DEFAULT_SYNCHRONOUS = false
26+
2427
# @!macro thread_pool_executor_attr_reader_max_length
2528
attr_reader :max_length
2629

2730
# @!macro thread_pool_executor_attr_reader_max_queue
2831
attr_reader :max_queue
2932

33+
# @!macro thread_pool_executor_attr_reader_synchronous
34+
attr_reader :synchronous
35+
3036
# @!macro thread_pool_executor_method_initialize
3137
def initialize(opts = {})
3238
super(opts)
@@ -94,16 +100,22 @@ def ns_initialize(opts)
94100
max_length = opts.fetch(:max_threads, DEFAULT_MAX_POOL_SIZE).to_i
95101
idletime = opts.fetch(:idletime, DEFAULT_THREAD_IDLETIMEOUT).to_i
96102
@max_queue = opts.fetch(:max_queue, DEFAULT_MAX_QUEUE_SIZE).to_i
103+
@synchronous = opts.fetch(:synchronous, DEFAULT_SYNCHRONOUS)
97104
@fallback_policy = opts.fetch(:fallback_policy, :abort)
98105

106+
raise ArgumentError.new("`synchronous` cannot be set unless `max_queue` is 0") if @synchronous && @max_queue > 0
99107
raise ArgumentError.new("`max_threads` cannot be less than #{DEFAULT_MIN_POOL_SIZE}") if max_length < DEFAULT_MIN_POOL_SIZE
100108
raise ArgumentError.new("`max_threads` cannot be greater than #{DEFAULT_MAX_POOL_SIZE}") if max_length > DEFAULT_MAX_POOL_SIZE
101109
raise ArgumentError.new("`min_threads` cannot be less than #{DEFAULT_MIN_POOL_SIZE}") if min_length < DEFAULT_MIN_POOL_SIZE
102110
raise ArgumentError.new("`min_threads` cannot be more than `max_threads`") if min_length > max_length
103111
raise ArgumentError.new("#{fallback_policy} is not a valid fallback policy") unless FALLBACK_POLICY_CLASSES.include?(@fallback_policy)
104112

105113
if @max_queue == 0
106-
queue = java.util.concurrent.LinkedBlockingQueue.new
114+
if @synchronous
115+
queue = java.util.concurrent.SynchronousQueue.new
116+
else
117+
queue = java.util.concurrent.LinkedBlockingQueue.new
118+
end
107119
else
108120
queue = java.util.concurrent.LinkedBlockingQueue.new(@max_queue)
109121
end

lib/concurrent-ruby/concurrent/executor/ruby_thread_pool_executor.rb

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
require 'concurrent/concern/logging'
44
require 'concurrent/executor/ruby_executor_service'
55
require 'concurrent/utility/monotonic_time'
6+
require 'concurrent/mvar'
67

78
module Concurrent
89

@@ -23,6 +24,9 @@ class RubyThreadPoolExecutor < RubyExecutorService
2324
# @!macro thread_pool_executor_constant_default_thread_timeout
2425
DEFAULT_THREAD_IDLETIMEOUT = 60
2526

27+
# @!macro thread_pool_executor_constant_default_synchronous
28+
DEFAULT_SYNCHRONOUS = false
29+
2630
# @!macro thread_pool_executor_attr_reader_max_length
2731
attr_reader :max_length
2832

@@ -35,6 +39,9 @@ class RubyThreadPoolExecutor < RubyExecutorService
3539
# @!macro thread_pool_executor_attr_reader_max_queue
3640
attr_reader :max_queue
3741

42+
# @!macro thread_pool_executor_attr_reader_synchronous
43+
attr_reader :synchronous
44+
3845
# @!macro thread_pool_executor_method_initialize
3946
def initialize(opts = {})
4047
super(opts)
@@ -114,9 +121,11 @@ def ns_initialize(opts)
114121
@max_length = opts.fetch(:max_threads, DEFAULT_MAX_POOL_SIZE).to_i
115122
@idletime = opts.fetch(:idletime, DEFAULT_THREAD_IDLETIMEOUT).to_i
116123
@max_queue = opts.fetch(:max_queue, DEFAULT_MAX_QUEUE_SIZE).to_i
124+
@synchronous = opts.fetch(:synchronous, DEFAULT_SYNCHRONOUS)
117125
@fallback_policy = opts.fetch(:fallback_policy, :abort)
118-
raise ArgumentError.new("#{@fallback_policy} is not a valid fallback policy") unless FALLBACK_POLICIES.include?(@fallback_policy)
119126

127+
raise ArgumentError.new("`synchronous` cannot be set unless `max_queue` is 0") if @synchronous && @max_queue > 0
128+
raise ArgumentError.new("#{@fallback_policy} is not a valid fallback policy") unless FALLBACK_POLICIES.include?(@fallback_policy)
120129
raise ArgumentError.new("`max_threads` cannot be less than #{DEFAULT_MIN_POOL_SIZE}") if @max_length < DEFAULT_MIN_POOL_SIZE
121130
raise ArgumentError.new("`max_threads` cannot be greater than #{DEFAULT_MAX_POOL_SIZE}") if @max_length > DEFAULT_MAX_POOL_SIZE
122131
raise ArgumentError.new("`min_threads` cannot be less than #{DEFAULT_MIN_POOL_SIZE}") if @min_length < DEFAULT_MIN_POOL_SIZE
@@ -145,8 +154,11 @@ def ns_limited_queue?
145154
def ns_execute(*args, &task)
146155
ns_reset_if_forked
147156

148-
if ns_assign_worker(*args, &task) || ns_enqueue(*args, &task)
157+
assigned_worker = ns_assign_worker(*args, &task)
158+
if assigned_worker
149159
@scheduled_task_count += 1
160+
elsif !@synchronous
161+
ns_enqueue(*args, &task)
150162
else
151163
handle_fallback(*args, &task)
152164
end
@@ -233,7 +245,7 @@ def ns_add_busy_worker
233245
#
234246
# @!visibility private
235247
def ns_ready_worker(worker, success = true)
236-
task_and_args = @queue.shift
248+
task_and_args = @queue.shift
237249
if task_and_args
238250
worker << task_and_args
239251
else

lib/concurrent-ruby/concurrent/executor/thread_pool_executor.rb

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,8 @@ class ThreadPoolExecutor < ThreadPoolExecutorImplementation
7373
# @option opts [Symbol] :fallback_policy (:abort) the policy for handling new
7474
# tasks that are received when the queue size has reached
7575
# `max_queue` or the executor has shut down
76-
#
76+
# @options opts [Boolean] :synchronous (DEFAULT_SYNCHRONOUS) whether or not a value of 0
77+
# for :max_queue means the queue must perform direct hand-off rather than unbounded.
7778
# @raise [ArgumentError] if `:max_threads` is less than one
7879
# @raise [ArgumentError] if `:min_threads` is less than zero
7980
# @raise [ArgumentError] if `:fallback_policy` is not one of the values specified

spec/concurrent/executor/thread_pool_executor_shared.rb

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -137,6 +137,55 @@
137137
end
138138
end
139139

140+
context '#synchronous' do
141+
142+
subject do
143+
described_class.new(
144+
min_threads: 1,
145+
max_threads: 2,
146+
max_queue: 0,
147+
synchronous: true,
148+
fallback_policy: :abort
149+
)
150+
end
151+
152+
it 'cannot be set unless `max_queue` is zero' do
153+
expect {
154+
described_class.new(
155+
min_threads: 2,
156+
max_threads: 5,
157+
max_queue: 1,
158+
fallback_policy: :discard,
159+
synchronous: true
160+
)
161+
}.to raise_error(ArgumentError)
162+
end
163+
164+
it 'executes fallback policy once max_threads has been reached' do
165+
latch = Concurrent::CountDownLatch.new(1)
166+
(subject.max_length).times do
167+
subject.post {
168+
latch.wait
169+
}
170+
end
171+
172+
expect(subject.queue_length).to eq 0
173+
174+
# verify nothing happening
175+
20.times {
176+
expect {
177+
subject.post {
178+
sleep
179+
}
180+
}.to raise_error(Concurrent::RejectedExecutionError)
181+
}
182+
183+
# release
184+
latch.count_down
185+
end
186+
187+
end
188+
140189
context '#queue_length', :truffle_bug => true do # only actually fails for RubyThreadPoolExecutor
141190

142191
let!(:expected_max){ 10 }

0 commit comments

Comments
 (0)