Skip to content

Commit 2c0755b

Browse files
authored
Merge pull request #853 from fzakaria/faridzakaria/bounded_queue
Introduce ThreadPoolExecutor without a Queue
2 parents 048c2db + 16f15a6 commit 2c0755b

File tree

6 files changed

+86
-3
lines changed

6 files changed

+86
-3
lines changed

CHANGELOG.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,9 @@
11
## Current
22

3+
concurrent-ruby:
4+
5+
* (#853) Introduce ThreadPoolExecutor without a Queue
6+
37
## Release v1.1.6, edge v0.6.0 (10 Feb 2020)
48

59
concurrent-ruby:

lib/concurrent-ruby/concurrent/executor/fixed_thread_pool.rb

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,9 @@ module Concurrent
1616
# Default maximum number of seconds a thread in the pool may remain idle
1717
# before being reclaimed.
1818

19+
# @!macro thread_pool_executor_constant_default_synchronous
20+
# Default value of the :synchronous option.
21+
1922
# @!macro thread_pool_executor_attr_reader_max_length
2023
# The maximum number of threads that may be created in the pool.
2124
# @return [Integer] The maximum number of threads that may be created in the pool.
@@ -40,6 +43,10 @@ module Concurrent
4043
# The number of seconds that a thread may be idle before being reclaimed.
4144
# @return [Integer] The number of seconds that a thread may be idle before being reclaimed.
4245

46+
# @!macro thread_pool_executor_attr_reader_synchronous
47+
# Whether or not a value of 0 for :max_queue option means the queue must perform direct hand-off or rather unbounded queue.
48+
# @return [true, false]
49+
4350
# @!macro thread_pool_executor_attr_reader_max_queue
4451
# The maximum number of tasks that may be waiting in the work queue at any one time.
4552
# When the queue size reaches `max_queue` subsequent tasks will be rejected in

lib/concurrent-ruby/concurrent/executor/java_thread_pool_executor.rb

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,12 +21,18 @@ class JavaThreadPoolExecutor < JavaExecutorService
2121
# @!macro thread_pool_executor_constant_default_thread_timeout
2222
DEFAULT_THREAD_IDLETIMEOUT = 60
2323

24+
# @!macro thread_pool_executor_constant_default_synchronous
25+
DEFAULT_SYNCHRONOUS = false
26+
2427
# @!macro thread_pool_executor_attr_reader_max_length
2528
attr_reader :max_length
2629

2730
# @!macro thread_pool_executor_attr_reader_max_queue
2831
attr_reader :max_queue
2932

33+
# @!macro thread_pool_executor_attr_reader_synchronous
34+
attr_reader :synchronous
35+
3036
# @!macro thread_pool_executor_method_initialize
3137
def initialize(opts = {})
3238
super(opts)
@@ -94,16 +100,22 @@ def ns_initialize(opts)
94100
max_length = opts.fetch(:max_threads, DEFAULT_MAX_POOL_SIZE).to_i
95101
idletime = opts.fetch(:idletime, DEFAULT_THREAD_IDLETIMEOUT).to_i
96102
@max_queue = opts.fetch(:max_queue, DEFAULT_MAX_QUEUE_SIZE).to_i
103+
@synchronous = opts.fetch(:synchronous, DEFAULT_SYNCHRONOUS)
97104
@fallback_policy = opts.fetch(:fallback_policy, :abort)
98105

106+
raise ArgumentError.new("`synchronous` cannot be set unless `max_queue` is 0") if @synchronous && @max_queue > 0
99107
raise ArgumentError.new("`max_threads` cannot be less than #{DEFAULT_MIN_POOL_SIZE}") if max_length < DEFAULT_MIN_POOL_SIZE
100108
raise ArgumentError.new("`max_threads` cannot be greater than #{DEFAULT_MAX_POOL_SIZE}") if max_length > DEFAULT_MAX_POOL_SIZE
101109
raise ArgumentError.new("`min_threads` cannot be less than #{DEFAULT_MIN_POOL_SIZE}") if min_length < DEFAULT_MIN_POOL_SIZE
102110
raise ArgumentError.new("`min_threads` cannot be more than `max_threads`") if min_length > max_length
103111
raise ArgumentError.new("#{fallback_policy} is not a valid fallback policy") unless FALLBACK_POLICY_CLASSES.include?(@fallback_policy)
104112

105113
if @max_queue == 0
106-
queue = java.util.concurrent.LinkedBlockingQueue.new
114+
if @synchronous
115+
queue = java.util.concurrent.SynchronousQueue.new
116+
else
117+
queue = java.util.concurrent.LinkedBlockingQueue.new
118+
end
107119
else
108120
queue = java.util.concurrent.LinkedBlockingQueue.new(@max_queue)
109121
end

lib/concurrent-ruby/concurrent/executor/ruby_thread_pool_executor.rb

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,9 @@ class RubyThreadPoolExecutor < RubyExecutorService
2323
# @!macro thread_pool_executor_constant_default_thread_timeout
2424
DEFAULT_THREAD_IDLETIMEOUT = 60
2525

26+
# @!macro thread_pool_executor_constant_default_synchronous
27+
DEFAULT_SYNCHRONOUS = false
28+
2629
# @!macro thread_pool_executor_attr_reader_max_length
2730
attr_reader :max_length
2831

@@ -35,6 +38,9 @@ class RubyThreadPoolExecutor < RubyExecutorService
3538
# @!macro thread_pool_executor_attr_reader_max_queue
3639
attr_reader :max_queue
3740

41+
# @!macro thread_pool_executor_attr_reader_synchronous
42+
attr_reader :synchronous
43+
3844
# @!macro thread_pool_executor_method_initialize
3945
def initialize(opts = {})
4046
super(opts)
@@ -114,9 +120,11 @@ def ns_initialize(opts)
114120
@max_length = opts.fetch(:max_threads, DEFAULT_MAX_POOL_SIZE).to_i
115121
@idletime = opts.fetch(:idletime, DEFAULT_THREAD_IDLETIMEOUT).to_i
116122
@max_queue = opts.fetch(:max_queue, DEFAULT_MAX_QUEUE_SIZE).to_i
123+
@synchronous = opts.fetch(:synchronous, DEFAULT_SYNCHRONOUS)
117124
@fallback_policy = opts.fetch(:fallback_policy, :abort)
118-
raise ArgumentError.new("#{@fallback_policy} is not a valid fallback policy") unless FALLBACK_POLICIES.include?(@fallback_policy)
119125

126+
raise ArgumentError.new("`synchronous` cannot be set unless `max_queue` is 0") if @synchronous && @max_queue > 0
127+
raise ArgumentError.new("#{@fallback_policy} is not a valid fallback policy") unless FALLBACK_POLICIES.include?(@fallback_policy)
120128
raise ArgumentError.new("`max_threads` cannot be less than #{DEFAULT_MIN_POOL_SIZE}") if @max_length < DEFAULT_MIN_POOL_SIZE
121129
raise ArgumentError.new("`max_threads` cannot be greater than #{DEFAULT_MAX_POOL_SIZE}") if @max_length > DEFAULT_MAX_POOL_SIZE
122130
raise ArgumentError.new("`min_threads` cannot be less than #{DEFAULT_MIN_POOL_SIZE}") if @min_length < DEFAULT_MIN_POOL_SIZE
@@ -201,6 +209,8 @@ def ns_assign_worker(*args, &task)
201209
#
202210
# @!visibility private
203211
def ns_enqueue(*args, &task)
212+
return false if @synchronous
213+
204214
if !ns_limited_queue? || @queue.size < @max_queue
205215
@queue << [task, args]
206216
true

lib/concurrent-ruby/concurrent/executor/thread_pool_executor.rb

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,8 @@ class ThreadPoolExecutor < ThreadPoolExecutorImplementation
7373
# @option opts [Symbol] :fallback_policy (:abort) the policy for handling new
7474
# tasks that are received when the queue size has reached
7575
# `max_queue` or the executor has shut down
76-
#
76+
# @option opts [Boolean] :synchronous (DEFAULT_SYNCHRONOUS) whether or not a value of 0
77+
# for :max_queue means the queue must perform direct hand-off rather than unbounded.
7778
# @raise [ArgumentError] if `:max_threads` is less than one
7879
# @raise [ArgumentError] if `:min_threads` is less than zero
7980
# @raise [ArgumentError] if `:fallback_policy` is not one of the values specified

spec/concurrent/executor/thread_pool_executor_shared.rb

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -137,6 +137,55 @@
137137
end
138138
end
139139

140+
context '#synchronous' do
141+
142+
subject do
143+
described_class.new(
144+
min_threads: 1,
145+
max_threads: 2,
146+
max_queue: 0,
147+
synchronous: true,
148+
fallback_policy: :abort
149+
)
150+
end
151+
152+
it 'cannot be set unless `max_queue` is zero' do
153+
expect {
154+
described_class.new(
155+
min_threads: 2,
156+
max_threads: 5,
157+
max_queue: 1,
158+
fallback_policy: :discard,
159+
synchronous: true
160+
)
161+
}.to raise_error(ArgumentError)
162+
end
163+
164+
it 'executes fallback policy once max_threads has been reached' do
165+
latch = Concurrent::CountDownLatch.new(1)
166+
(subject.max_length).times do
167+
subject.post {
168+
latch.wait
169+
}
170+
end
171+
172+
expect(subject.queue_length).to eq 0
173+
174+
# verify nothing happening
175+
20.times {
176+
expect {
177+
subject.post {
178+
sleep
179+
}
180+
}.to raise_error(Concurrent::RejectedExecutionError)
181+
}
182+
183+
# release
184+
latch.count_down
185+
end
186+
187+
end
188+
140189
context '#queue_length', :truffle_bug => true do # only actually fails for RubyThreadPoolExecutor
141190

142191
let!(:expected_max){ 10 }

0 commit comments

Comments
 (0)