Skip to content

Commit 375312e

Browse files
committed
Implemented thread pool overload policies.
1 parent 863c63f commit 375312e

15 files changed

+421
-48
lines changed

lib/concurrent/java_cached_thread_pool.rb

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,23 +14,28 @@ class JavaCachedThreadPool < JavaThreadPoolExecutor
1414
# of threads which may be created in the pool
1515
# @option opts [Integer] :idletime (+DEFAULT_THREAD_IDLETIMEOUT+) maximum
1616
# number of seconds a thread may be idle before it is reclaimed
17+
# @option opts [Symbol] :overflow_policy (+:abort+) the overflow policy
1718
#
1819
# @raise [ArgumentError] if +max_threads+ is less than or equal to zero
1920
# @raise [ArgumentError] if +idletime+ is less than or equal to zero
21+
# @raise [ArgumentError] if +overflow_policy+ is not a known policy
2022
#
2123
# @see http://docs.oracle.com/javase/8/docs/api/java/util/concurrent/Executors.html#newCachedThreadPool--
2224
def initialize(opts = {})
2325
max_length = opts.fetch(:max_threads, DEFAULT_MAX_POOL_SIZE).to_i
2426
idletime = opts.fetch(:idletime, DEFAULT_THREAD_IDLETIMEOUT).to_i
27+
@overflow_policy = opts.fetch(:overflow_policy, :abort)
28+
@max_queue = 0
2529

2630
raise ArgumentError.new('idletime must be greater than zero') if idletime <= 0
2731
raise ArgumentError.new('max_threads must be greater than zero') if max_length <= 0
32+
raise ArgumentError.new("#{@overflow_policy} is not a valid overflow policy") unless OVERFLOW_POLICIES.keys.include?(@overflow_policy)
2833

2934
@executor = java.util.concurrent.ThreadPoolExecutor.new(
30-
0, max_length,
35+
@max_queue, max_length,
3136
idletime, java.util.concurrent.TimeUnit::SECONDS,
3237
java.util.concurrent.SynchronousQueue.new,
33-
java.util.concurrent.ThreadPoolExecutor::AbortPolicy.new)
38+
OVERFLOW_POLICIES[@overflow_policy].new)
3439
end
3540
end
3641
end

lib/concurrent/java_fixed_thread_pool.rb

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
if defined? java.util
22

33
require 'concurrent/java_thread_pool_executor'
4-
require 'concurrent/utilities'
54

65
module Concurrent
76

@@ -10,15 +9,25 @@ class JavaFixedThreadPool < JavaThreadPoolExecutor
109

1110
# Create a new thread pool.
1211
#
12+
# @param [Hash] opts the options defining pool behavior.
13+
# @option opts [Symbol] :overflow_policy (+:abort+) the overflow policy
14+
#
15+
# @raise [ArgumentError] if +num_threads+ is less than or equal to zero
16+
# @raise [ArgumentError] if +overflow_policy+ is not a known policy
17+
#
1318
# @see http://docs.oracle.com/javase/8/docs/api/java/util/concurrent/Executors.html#newFixedThreadPool-int-
14-
def initialize(num_threads = Concurrent::processor_count)
19+
def initialize(num_threads, opts = {})
20+
@overflow_policy = opts.fetch(:overflow_policy, :abort)
21+
@max_queue = 0
22+
1523
raise ArgumentError.new('number of threads must be greater than zero') if num_threads < 1
24+
raise ArgumentError.new("#{@overflow_policy} is not a valid overflow policy") unless OVERFLOW_POLICIES.keys.include?(@overflow_policy)
1625

1726
@executor = java.util.concurrent.ThreadPoolExecutor.new(
1827
num_threads, num_threads,
19-
0, java.util.concurrent.TimeUnit::SECONDS,
28+
@max_queue, java.util.concurrent.TimeUnit::SECONDS,
2029
java.util.concurrent.LinkedBlockingQueue.new,
21-
java.util.concurrent.ThreadPoolExecutor::AbortPolicy.new)
30+
OVERFLOW_POLICIES[@overflow_policy].new)
2231
end
2332
end
2433
end

lib/concurrent/java_thread_pool_executor.rb

Lines changed: 18 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@
22

33
module Concurrent
44

5+
RejectedExecutionError = Class.new(StandardError) unless defined? RejectedExecutionError
6+
57
# @!macro thread_pool_executor
68
class JavaThreadPoolExecutor
79

@@ -37,13 +39,14 @@ class JavaThreadPoolExecutor
3739
# @see http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/ThreadPoolExecutor.html
3840
def initialize(opts = {})
3941
min_length = opts.fetch(:min_threads, DEFAULT_MIN_POOL_SIZE).to_i
40-
@max_length = opts.fetch(:max_threads, DEFAULT_MAX_POOL_SIZE).to_i
42+
max_length = opts.fetch(:max_threads, DEFAULT_MAX_POOL_SIZE).to_i
4143
idletime = opts.fetch(:idletime, DEFAULT_THREAD_IDLETIMEOUT).to_i
4244
@max_queue = opts.fetch(:max_queue, DEFAULT_MAX_QUEUE_SIZE).to_i
4345
@overflow_policy = opts.fetch(:overflow_policy, :abort)
4446

45-
raise ArgumentError.new('max_threads must be greater than zero') if @max_length <= 0
46-
raise ArgumentError.new("#{overflow_policy} is not a valid overflow policy") unless OVERFLOW_POLICIES.keys.include?(overflow_policy)
47+
raise ArgumentError.new('max_threads must be greater than zero') if max_length <= 0
48+
raise ArgumentError.new('min_threads cannot be less than zero') if min_length < 0
49+
raise ArgumentError.new("#{@overflow_policy} is not a valid overflow policy") unless OVERFLOW_POLICIES.keys.include?(@overflow_policy)
4750

4851
if min_length == 0 && @max_queue == 0
4952
queue = java.util.concurrent.SynchronousQueue.new
@@ -54,7 +57,7 @@ def initialize(opts = {})
5457
end
5558

5659
@executor = java.util.concurrent.ThreadPoolExecutor.new(
57-
min_length, @max_length,
60+
min_length, max_length,
5861
idletime, java.util.concurrent.TimeUnit::SECONDS,
5962
queue, OVERFLOW_POLICIES[@overflow_policy].new)
6063
end
@@ -100,7 +103,7 @@ def remaining_capacity
100103
#
101104
# @return [Boolean] +true+ when running, +false+ when shutting down or shutdown
102105
def running?
103-
! (shutdown? || terminated?)
106+
! (@executor.isShutdown || @executor.isTerminated || @executor.isTerminating)
104107
end
105108

106109
# Is the thread pool shutdown?
@@ -135,10 +138,14 @@ def wait_for_termination(timeout)
135138
# @raise [ArgumentError] if no task is given
136139
def post(*args)
137140
raise ArgumentError.new('no block given') unless block_given?
138-
@executor.submit{ yield(*args) }
139-
return true
141+
if running?
142+
@executor.submit{ yield(*args) }
143+
true
144+
else
145+
false
146+
end
140147
rescue Java::JavaUtilConcurrent::RejectedExecutionException => ex
141-
return false
148+
raise RejectedExecutionError
142149
end
143150

144151
# Submit a task to the thread pool for asynchronous processing.
@@ -149,17 +156,15 @@ def post(*args)
149156
def <<(task)
150157
@executor.submit(&task)
151158
rescue Java::JavaUtilConcurrent::RejectedExecutionException => ex
152-
# do nothing
153-
ensure
154-
return self
159+
raise RejectedExecutionError
155160
end
156161

157162
# Begin an orderly shutdown. Tasks already in the queue will be executed,
158163
# but no new tasks will be accepted. Has no additional effect if the
159164
# thread pool is not running.
160165
def shutdown
161-
@executor.getQueue.clear
162166
@executor.shutdown
167+
@executor.getQueue.clear
163168
return nil
164169
end
165170

@@ -168,19 +173,10 @@ def shutdown
168173
# will be accepted. Has no additional effect if the thread pool is
169174
# not running.
170175
def kill
171-
@executor.getQueue.clear
172176
@executor.shutdownNow
177+
@executor.getQueue.clear
173178
return nil
174179
end
175-
176-
protected
177-
178-
# Were all tasks completed before shutdown?
179-
#
180-
# @return [Boolean] +true+ if shutdown and all tasks completed else +false+
181-
def terminated?
182-
@executor.isTerminated
183-
end
184180
end
185181
end
186182
end

lib/concurrent/ruby_cached_thread_pool.rb

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,16 +15,20 @@ class RubyCachedThreadPool < RubyThreadPoolExecutor
1515
#
1616
# @raise [ArgumentError] if +max_threads+ is less than or equal to zero
1717
# @raise [ArgumentError] if +idletime+ is less than or equal to zero
18+
# @raise [ArgumentError] if +overflow_policy+ is not a known policy
1819
def initialize(opts = {})
1920
max_length = opts.fetch(:max_threads, DEFAULT_MAX_POOL_SIZE).to_i
2021
idletime = opts.fetch(:idletime, DEFAULT_THREAD_IDLETIMEOUT).to_i
22+
overflow_policy = opts.fetch(:overflow_policy, :abort)
2123

2224
raise ArgumentError.new('idletime must be greater than zero') if idletime <= 0
2325
raise ArgumentError.new('max_threads must be greater than zero') if max_length <= 0
26+
raise ArgumentError.new("#{overflow_policy} is not a valid overflow policy") unless OVERFLOW_POLICIES.include?(overflow_policy)
2427

2528
opts = opts.merge(
2629
min_threads: 0,
2730
max_threads: max_length,
31+
num_threads: overflow_policy,
2832
idletime: idletime
2933
)
3034
super(opts)

lib/concurrent/ruby_fixed_thread_pool.rb

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,13 +8,21 @@ class RubyFixedThreadPool < RubyThreadPoolExecutor
88
# Create a new thread pool.
99
#
1010
# @param [Integer] num_threads the number of threads to allocate
11+
# @param [Hash] opts the options defining pool behavior.
12+
# @option opts [Symbol] :overflow_policy (+:abort+) the overflow policy
1113
#
1214
# @raise [ArgumentError] if +num_threads+ is less than or equal to zero
15+
# @raise [ArgumentError] if +overflow_policy+ is not a known policy
1316
def initialize(num_threads, opts = {})
17+
overflow_policy = opts.fetch(:overflow_policy, :abort)
18+
1419
raise ArgumentError.new('number of threads must be greater than zero') if num_threads < 1
20+
raise ArgumentError.new("#{overflow_policy} is not a valid overflow policy") unless OVERFLOW_POLICIES.include?(overflow_policy)
21+
1522
opts = opts.merge(
1623
min_threads: num_threads,
1724
max_threads: num_threads,
25+
num_threads: overflow_policy,
1826
idletime: 0
1927
)
2028
super(opts)

lib/concurrent/ruby_thread_pool_executor.rb

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@
55

66
module Concurrent
77

8+
RejectedExecutionError = Class.new(StandardError) unless defined? RejectedExecutionError
9+
810
# @!macro thread_pool_executor
911
class RubyThreadPoolExecutor
1012

@@ -50,6 +52,7 @@ def initialize(opts = {})
5052
@overflow_policy = opts.fetch(:overflow_policy, :abort)
5153

5254
raise ArgumentError.new('max_threads must be greater than zero') if @max_length <= 0
55+
raise ArgumentError.new('min_threads cannot be less than zero') if @min_length < 0
5356
raise ArgumentError.new("#{overflow_policy} is not a valid overflow policy") unless OVERFLOW_POLICIES.include?(@overflow_policy)
5457

5558
@state = :running
@@ -121,7 +124,7 @@ def post(*args, &task)
121124
raise ArgumentError.new('no block given') unless block_given?
122125
@mutex.synchronize do
123126
break false unless @state == :running
124-
return false if @max_queue != 0 && @queue.length >= @max_queue
127+
return handle_overflow(*args, &task) if @max_queue != 0 && @queue.length >= @max_queue
125128
@scheduled_task_count += 1
126129
@queue << [args, task]
127130
if Time.now.to_f - @gc_interval >= @last_gc_time
@@ -195,6 +198,23 @@ def on_worker_exit(worker) # :nodoc:
195198

196199
protected
197200

201+
# @!visibility private
202+
def handle_overflow(*args) # :nodoc:
203+
case @overflow_policy
204+
when :abort
205+
raise RejectedExecutionError
206+
when :discard
207+
false
208+
when :caller_runs
209+
begin
210+
yield(*args)
211+
rescue
212+
# let it fail
213+
end
214+
true
215+
end
216+
end
217+
198218
# @!visibility private
199219
def prune_pool # :nodoc:
200220
@pool.delete_if do |worker|

spec/concurrent/cached_thread_pool_shared.rb

Lines changed: 16 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,12 @@
44
share_examples_for :cached_thread_pool do
55

66
let!(:max_threads){ 5 }
7-
subject { described_class.new(max_threads: max_threads) }
7+
subject do
8+
described_class.new(
9+
max_threads: max_threads,
10+
overflow_policy: :discard
11+
)
12+
end
813

914
after(:each) do
1015
subject.kill
@@ -106,14 +111,19 @@
106111

107112
context 'worker creation and caching' do
108113

109-
subject{ described_class.new(idletime: 1, max_threads: 5) }
114+
subject do
115+
described_class.new(
116+
idletime: 1,
117+
max_threads: 5,
118+
overflow_policy: :discard
119+
)
120+
end
110121

111122
it 'never creates more than :max_threads threads' do
112-
pool = described_class.new(max_threads: 5)
113-
100.times{ pool << proc{ sleep(1) } }
123+
100.times{ subject << proc{ sleep(1) } }
114124
sleep(0.1)
115-
pool.length.should eq 5
116-
pool.kill
125+
subject.length.should eq 5
126+
subject.kill
117127
end
118128
end
119129
end

spec/concurrent/java_cached_thread_pool_spec.rb

Lines changed: 44 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,14 +8,57 @@ module Concurrent
88

99
describe JavaCachedThreadPool do
1010

11-
subject { described_class.new(max_threads: 5) }
11+
subject { described_class.new(max_threads: 5, overflow_policy: :discard) }
1212

1313
after(:each) do
1414
subject.kill
1515
sleep(0.1)
1616
end
1717

1818
it_should_behave_like :cached_thread_pool
19+
20+
context '#initialize' do
21+
22+
it 'sets :min_length correctly' do
23+
subject = JavaCachedThreadPool.new
24+
subject.min_length.should eq 0
25+
end
26+
27+
it 'sets :max_length correctly' do
28+
subject = JavaCachedThreadPool.new(max_threads: 5)
29+
subject.max_length.should eq 5
30+
end
31+
32+
it 'sets :idletime correctly' do
33+
subject = JavaCachedThreadPool.new(idletime: 10)
34+
subject.idletime.should eq 10
35+
end
36+
37+
it 'sets :max_queue correctly' do
38+
subject = JavaCachedThreadPool.new
39+
subject.max_queue.should eq 0
40+
end
41+
42+
it 'sets :overflow_policy correctly' do
43+
clazz = java.util.concurrent.ThreadPoolExecutor::DiscardPolicy
44+
policy = clazz.new
45+
clazz.should_receive(:new).at_least(:once).with(any_args).and_return(policy)
46+
47+
subject = JavaCachedThreadPool.new(overflow_policy: :discard)
48+
subject.overflow_policy.should eq :discard
49+
end
50+
51+
it 'defaults :overflow_policy to :abort' do
52+
subject = JavaCachedThreadPool.new
53+
subject.overflow_policy.should eq :abort
54+
end
55+
56+
it 'raises an exception if given an invalid :overflow_policy' do
57+
expect {
58+
JavaCachedThreadPool.new(overflow_policy: :bogus)
59+
}.to raise_error(ArgumentError)
60+
end
61+
end
1962
end
2063
end
2164
end

0 commit comments

Comments
 (0)