Skip to content

Commit 276483c

Browse files
committed
Merge pull request #41 from jdantonio/thread-pool-overload
Implemented thread pool overload policies.
2 parents a80e1ff + 375312e commit 276483c

18 files changed

+625
-169
lines changed

.travis.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,3 +14,4 @@ matrix:
1414
allow_failures:
1515
- rvm: ruby-head
1616
- rvm: jruby-head
17+
- rvm: 1.9.3

lib/concurrent/java_cached_thread_pool.rb

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,23 +14,28 @@ class JavaCachedThreadPool < JavaThreadPoolExecutor
1414
# of threads which may be created in the pool
1515
# @option opts [Integer] :idletime (+DEFAULT_THREAD_IDLETIMEOUT+) maximum
1616
# number of seconds a thread may be idle before it is reclaimed
17+
# @option opts [Symbol] :overflow_policy (+:abort+) the overflow policy
1718
#
1819
# @raise [ArgumentError] if +max_threads+ is less than or equal to zero
1920
# @raise [ArgumentError] if +idletime+ is less than or equal to zero
21+
# @raise [ArgumentError] if +overflow_policy+ is not a known policy
2022
#
2123
# @see http://docs.oracle.com/javase/8/docs/api/java/util/concurrent/Executors.html#newCachedThreadPool--
2224
def initialize(opts = {})
2325
max_length = opts.fetch(:max_threads, DEFAULT_MAX_POOL_SIZE).to_i
2426
idletime = opts.fetch(:idletime, DEFAULT_THREAD_IDLETIMEOUT).to_i
27+
@overflow_policy = opts.fetch(:overflow_policy, :abort)
28+
@max_queue = 0
2529

2630
raise ArgumentError.new('idletime must be greater than zero') if idletime <= 0
2731
raise ArgumentError.new('max_threads must be greater than zero') if max_length <= 0
32+
raise ArgumentError.new("#{@overflow_policy} is not a valid overflow policy") unless OVERFLOW_POLICIES.keys.include?(@overflow_policy)
2833

2934
@executor = java.util.concurrent.ThreadPoolExecutor.new(
30-
0, max_length,
35+
@max_queue, max_length,
3136
idletime, java.util.concurrent.TimeUnit::SECONDS,
3237
java.util.concurrent.SynchronousQueue.new,
33-
java.util.concurrent.ThreadPoolExecutor::AbortPolicy.new)
38+
OVERFLOW_POLICIES[@overflow_policy].new)
3439
end
3540
end
3641
end

lib/concurrent/java_fixed_thread_pool.rb

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
if defined? java.util
22

33
require 'concurrent/java_thread_pool_executor'
4-
require 'concurrent/utilities'
54

65
module Concurrent
76

@@ -10,15 +9,25 @@ class JavaFixedThreadPool < JavaThreadPoolExecutor
109

1110
# Create a new thread pool.
1211
#
12+
# @param [Hash] opts the options defining pool behavior.
13+
# @option opts [Symbol] :overflow_policy (+:abort+) the overflow policy
14+
#
15+
# @raise [ArgumentError] if +num_threads+ is less than or equal to zero
16+
# @raise [ArgumentError] if +overflow_policy+ is not a known policy
17+
#
1318
# @see http://docs.oracle.com/javase/8/docs/api/java/util/concurrent/Executors.html#newFixedThreadPool-int-
14-
def initialize(num_threads = Concurrent::processor_count)
19+
def initialize(num_threads, opts = {})
20+
@overflow_policy = opts.fetch(:overflow_policy, :abort)
21+
@max_queue = 0
22+
1523
raise ArgumentError.new('number of threads must be greater than zero') if num_threads < 1
24+
raise ArgumentError.new("#{@overflow_policy} is not a valid overflow policy") unless OVERFLOW_POLICIES.keys.include?(@overflow_policy)
1625

1726
@executor = java.util.concurrent.ThreadPoolExecutor.new(
1827
num_threads, num_threads,
19-
0, java.util.concurrent.TimeUnit::SECONDS,
28+
@max_queue, java.util.concurrent.TimeUnit::SECONDS,
2029
java.util.concurrent.LinkedBlockingQueue.new,
21-
java.util.concurrent.ThreadPoolExecutor::AbortPolicy.new)
30+
OVERFLOW_POLICIES[@overflow_policy].new)
2231
end
2332
end
2433
end

lib/concurrent/java_thread_pool_executor.rb

Lines changed: 26 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@
22

33
module Concurrent
44

5+
RejectedExecutionError = Class.new(StandardError) unless defined? RejectedExecutionError
6+
57
# @!macro thread_pool_executor
68
class JavaThreadPoolExecutor
79

@@ -30,31 +32,34 @@ class JavaThreadPoolExecutor
3032

3133
attr_reader :max_queue
3234

35+
attr_reader :overflow_policy
36+
3337
# Create a new thread pool.
3438
#
3539
# @see http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/ThreadPoolExecutor.html
3640
def initialize(opts = {})
3741
min_length = opts.fetch(:min_threads, DEFAULT_MIN_POOL_SIZE).to_i
38-
@max_length = opts.fetch(:max_threads, DEFAULT_MAX_POOL_SIZE).to_i
42+
max_length = opts.fetch(:max_threads, DEFAULT_MAX_POOL_SIZE).to_i
3943
idletime = opts.fetch(:idletime, DEFAULT_THREAD_IDLETIMEOUT).to_i
4044
@max_queue = opts.fetch(:max_queue, DEFAULT_MAX_QUEUE_SIZE).to_i
41-
overflow_policy = opts.fetch(:overflow_policy, :abort)
45+
@overflow_policy = opts.fetch(:overflow_policy, :abort)
4246

43-
raise ArgumentError.new('max_threads must be greater than zero') if @max_length <= 0
44-
raise ArgumentError.new("#{overflow_policy} is not a valid overflow policy") unless OVERFLOW_POLICIES.keys.include?(overflow_policy)
47+
raise ArgumentError.new('max_threads must be greater than zero') if max_length <= 0
48+
raise ArgumentError.new('min_threads cannot be less than zero') if min_length < 0
49+
raise ArgumentError.new("#{@overflow_policy} is not a valid overflow policy") unless OVERFLOW_POLICIES.keys.include?(@overflow_policy)
4550

46-
if min_length == 0 && max_queue == 0
51+
if min_length == 0 && @max_queue == 0
4752
queue = java.util.concurrent.SynchronousQueue.new
48-
elsif max_queue == 0
53+
elsif @max_queue == 0
4954
queue = java.util.concurrent.LinkedBlockingQueue.new
5055
else
51-
queue = java.util.concurrent.LinkedBlockingQueue.new(max_queue)
56+
queue = java.util.concurrent.LinkedBlockingQueue.new(@max_queue)
5257
end
5358

5459
@executor = java.util.concurrent.ThreadPoolExecutor.new(
55-
min_length, @max_length,
60+
min_length, max_length,
5661
idletime, java.util.concurrent.TimeUnit::SECONDS,
57-
queue, OVERFLOW_POLICIES[overflow_policy].new)
62+
queue, OVERFLOW_POLICIES[@overflow_policy].new)
5863
end
5964

6065
def min_length
@@ -91,14 +96,14 @@ def queue_length
9196
end
9297

9398
def remaining_capacity
94-
@executor.getQueue.remainingCapacity
99+
@max_queue == 0 ? -1 : @executor.getQueue.remainingCapacity
95100
end
96101

97102
# Is the thread pool running?
98103
#
99104
# @return [Boolean] +true+ when running, +false+ when shutting down or shutdown
100105
def running?
101-
! (shutdown? || terminated?)
106+
! (@executor.isShutdown || @executor.isTerminated || @executor.isTerminating)
102107
end
103108

104109
# Is the thread pool shutdown?
@@ -133,10 +138,14 @@ def wait_for_termination(timeout)
133138
# @raise [ArgumentError] if no task is given
134139
def post(*args)
135140
raise ArgumentError.new('no block given') unless block_given?
136-
@executor.submit{ yield(*args) }
137-
return true
141+
if running?
142+
@executor.submit{ yield(*args) }
143+
true
144+
else
145+
false
146+
end
138147
rescue Java::JavaUtilConcurrent::RejectedExecutionException => ex
139-
return false
148+
raise RejectedExecutionError
140149
end
141150

142151
# Submit a task to the thread pool for asynchronous processing.
@@ -147,16 +156,15 @@ def post(*args)
147156
def <<(task)
148157
@executor.submit(&task)
149158
rescue Java::JavaUtilConcurrent::RejectedExecutionException => ex
150-
# do nothing
151-
ensure
152-
return self
159+
raise RejectedExecutionError
153160
end
154161

155162
# Begin an orderly shutdown. Tasks already in the queue will be executed,
156163
# but no new tasks will be accepted. Has no additional effect if the
157164
# thread pool is not running.
158165
def shutdown
159166
@executor.shutdown
167+
@executor.getQueue.clear
160168
return nil
161169
end
162170

@@ -166,17 +174,9 @@ def shutdown
166174
# not running.
167175
def kill
168176
@executor.shutdownNow
177+
@executor.getQueue.clear
169178
return nil
170179
end
171-
172-
protected
173-
174-
# Were all tasks completed before shutdown?
175-
#
176-
# @return [Boolean] +true+ if shutdown and all tasks completed else +false+
177-
def terminated?
178-
@executor.isTerminated
179-
end
180180
end
181181
end
182182
end

lib/concurrent/ruby_cached_thread_pool.rb

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,16 +15,20 @@ class RubyCachedThreadPool < RubyThreadPoolExecutor
1515
#
1616
# @raise [ArgumentError] if +max_threads+ is less than or equal to zero
1717
# @raise [ArgumentError] if +idletime+ is less than or equal to zero
18+
# @raise [ArgumentError] if +overflow_policy+ is not a known policy
1819
def initialize(opts = {})
1920
max_length = opts.fetch(:max_threads, DEFAULT_MAX_POOL_SIZE).to_i
2021
idletime = opts.fetch(:idletime, DEFAULT_THREAD_IDLETIMEOUT).to_i
22+
overflow_policy = opts.fetch(:overflow_policy, :abort)
2123

2224
raise ArgumentError.new('idletime must be greater than zero') if idletime <= 0
2325
raise ArgumentError.new('max_threads must be greater than zero') if max_length <= 0
26+
raise ArgumentError.new("#{overflow_policy} is not a valid overflow policy") unless OVERFLOW_POLICIES.include?(overflow_policy)
2427

2528
opts = opts.merge(
2629
min_threads: 0,
2730
max_threads: max_length,
31+
num_threads: overflow_policy,
2832
idletime: idletime
2933
)
3034
super(opts)

lib/concurrent/ruby_fixed_thread_pool.rb

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,13 +8,21 @@ class RubyFixedThreadPool < RubyThreadPoolExecutor
88
# Create a new thread pool.
99
#
1010
# @param [Integer] num_threads the number of threads to allocate
11+
# @param [Hash] opts the options defining pool behavior.
12+
# @option opts [Symbol] :overflow_policy (+:abort+) the overflow policy
1113
#
1214
# @raise [ArgumentError] if +num_threads+ is less than or equal to zero
15+
# @raise [ArgumentError] if +overflow_policy+ is not a known policy
1316
def initialize(num_threads, opts = {})
17+
overflow_policy = opts.fetch(:overflow_policy, :abort)
18+
1419
raise ArgumentError.new('number of threads must be greater than zero') if num_threads < 1
20+
raise ArgumentError.new("#{overflow_policy} is not a valid overflow policy") unless OVERFLOW_POLICIES.include?(overflow_policy)
21+
1522
opts = opts.merge(
1623
min_threads: num_threads,
1724
max_threads: num_threads,
25+
num_threads: overflow_policy,
1826
idletime: 0
1927
)
2028
super(opts)

lib/concurrent/ruby_thread_pool_executor.rb

Lines changed: 27 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@
55

66
module Concurrent
77

8+
RejectedExecutionError = Class.new(StandardError) unless defined? RejectedExecutionError
9+
810
# @!macro thread_pool_executor
911
class RubyThreadPoolExecutor
1012

@@ -37,6 +39,8 @@ class RubyThreadPoolExecutor
3739

3840
attr_reader :max_queue
3941

42+
attr_reader :overflow_policy
43+
4044
# Create a new thread pool.
4145
#
4246
# @see http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/ThreadPoolExecutor.html
@@ -48,6 +52,7 @@ def initialize(opts = {})
4852
@overflow_policy = opts.fetch(:overflow_policy, :abort)
4953

5054
raise ArgumentError.new('max_threads must be greater than zero') if @max_length <= 0
55+
raise ArgumentError.new('min_threads cannot be less than zero') if @min_length < 0
5156
raise ArgumentError.new("#{overflow_policy} is not a valid overflow policy") unless OVERFLOW_POLICIES.include?(@overflow_policy)
5257

5358
@state = :running
@@ -59,7 +64,6 @@ def initialize(opts = {})
5964
@completed_task_count = 0
6065
@largest_length = 0
6166

62-
#@busy = 0
6367
@gc_interval = opts.fetch(:gc_interval, 1).to_i # undocumented
6468
@last_gc_time = Time.now.to_f - [1.0, (@gc_interval * 2.0)].max
6569
end
@@ -72,9 +76,11 @@ def length
7276
alias_method :current_length, :length
7377

7478
def queue_length
79+
@queue.length
7580
end
7681

7782
def remaining_capacity
83+
@mutex.synchronize { @max_queue == 0 ? -1 : @max_queue - @queue.length }
7884
end
7985

8086
# Is the thread pool running?
@@ -118,6 +124,7 @@ def post(*args, &task)
118124
raise ArgumentError.new('no block given') unless block_given?
119125
@mutex.synchronize do
120126
break false unless @state == :running
127+
return handle_overflow(*args, &task) if @max_queue != 0 && @queue.length >= @max_queue
121128
@scheduled_task_count += 1
122129
@queue << [args, task]
123130
if Time.now.to_f - @gc_interval >= @last_gc_time
@@ -145,6 +152,7 @@ def <<(task)
145152
def shutdown
146153
@mutex.synchronize do
147154
break unless @state == :running
155+
@queue.clear
148156
if @pool.empty?
149157
@state = :shutdown
150158
@terminator.set
@@ -162,24 +170,16 @@ def shutdown
162170
def kill
163171
@mutex.synchronize do
164172
break if @state == :shutdown
165-
@state = :shutdown
166173
@queue.clear
174+
@state = :shutdown
167175
drain_pool
168176
@terminator.set
169177
end
170178
end
171179

172-
## @!visibility private
173-
#def on_start_task # :nodoc:
174-
#@mutex.synchronize do
175-
#@busy += 1
176-
#end
177-
#end
178-
179180
# @!visibility private
180181
def on_end_task # :nodoc:
181182
@mutex.synchronize do
182-
#@busy -= 1
183183
@completed_task_count += 1 #if success
184184
break unless @state == :running
185185
end
@@ -198,6 +198,23 @@ def on_worker_exit(worker) # :nodoc:
198198

199199
protected
200200

201+
# @!visibility private
202+
def handle_overflow(*args) # :nodoc:
203+
case @overflow_policy
204+
when :abort
205+
raise RejectedExecutionError
206+
when :discard
207+
false
208+
when :caller_runs
209+
begin
210+
yield(*args)
211+
rescue
212+
# let it fail
213+
end
214+
true
215+
end
216+
end
217+
201218
# @!visibility private
202219
def prune_pool # :nodoc:
203220
@pool.delete_if do |worker|

lib/concurrent/ruby_thread_pool_worker.rb

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,6 @@ def run(thread = Thread.current) # :nodoc:
4949
end
5050

5151
begin
52-
#@parent.on_start_task
5352
task.last.call(*task.first)
5453
rescue => ex
5554
# let it fail

0 commit comments

Comments
 (0)