Skip to content

Commit 28d9b7a

Browse files
committed
Merge pull request #115 from billdueber/pool_spec_fixes
Pool spec fixes
2 parents f6c9e70 + e3ff810 commit 28d9b7a

15 files changed

+169
-79
lines changed

lib/concurrent/executor/java_fixed_thread_pool.rb

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,14 +17,22 @@ class JavaFixedThreadPool < JavaThreadPoolExecutor
1717
#
1818
# @see http://docs.oracle.com/javase/8/docs/api/java/util/concurrent/Executors.html#newFixedThreadPool-int-
1919
def initialize(num_threads, opts = {})
20-
@overflow_policy = opts.fetch(:overflow_policy, :abort)
21-
@max_queue = 0
2220

23-
raise ArgumentError.new('number of threads must be greater than zero') if num_threads < 1
24-
raise ArgumentError.new("#{@overflow_policy} is not a valid overflow policy") unless OVERFLOW_POLICIES.keys.include?(@overflow_policy)
21+
opts = {
22+
min_threads: num_threads,
23+
max_threads: num_threads
24+
}.merge(opts)
25+
super(opts)
2526

26-
@executor = java.util.concurrent.Executors.newFixedThreadPool(num_threads)
27-
@executor.setRejectedExecutionHandler(OVERFLOW_POLICIES[@overflow_policy].new)
27+
28+
#@overflow_policy = opts.fetch(:overflow_policy, :abort)
29+
#@max_queue = 0
30+
#
31+
#raise ArgumentError.new('number of threads must be greater than zero') if num_threads < 1
32+
#raise ArgumentError.new("#{@overflow_policy} is not a valid overflow policy") unless OVERFLOW_POLICIES.keys.include?(@overflow_policy)
33+
#
34+
#@executor = java.util.concurrent.Executors.newFixedThreadPool(num_threads)
35+
#@executor.setRejectedExecutionHandler(OVERFLOW_POLICIES[@overflow_policy].new)
2836

2937
set_shutdown_hook
3038
end

lib/concurrent/executor/java_thread_pool_executor.rb

Lines changed: 2 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,7 @@ def initialize(opts = {})
7171

7272
raise ArgumentError.new('max_threads must be greater than zero') if max_length <= 0
7373
raise ArgumentError.new('min_threads cannot be less than zero') if min_length < 0
74+
raise ArgumentError.new('min_threads cannot be more than max_threads') if min_length > max_length
7475
raise ArgumentError.new("#{@overflow_policy} is not a valid overflow policy") unless OVERFLOW_POLICIES.keys.include?(@overflow_policy)
7576

7677
if min_length == 0 && @max_queue == 0
@@ -171,16 +172,7 @@ def status
171172
#
172173
# @return [Boolean] `true` when running, `false` when shutting down or shutdown
173174
def running?
174-
super && ! @executor.isTerminating
175-
end
176-
177-
# Begin an orderly shutdown. Tasks already in the queue will be executed,
178-
# but no new tasks will be accepted. Has no additional effect if the
179-
# thread pool is not running.
180-
def shutdown
181-
super
182-
@executor.getQueue.clear
183-
nil
175+
super && !@executor.isTerminating
184176
end
185177
end
186178
end

lib/concurrent/executor/ruby_fixed_thread_pool.rb

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,13 +19,13 @@ def initialize(num_threads, opts = {})
1919
raise ArgumentError.new('number of threads must be greater than zero') if num_threads < 1
2020
raise ArgumentError.new("#{overflow_policy} is not a valid overflow policy") unless OVERFLOW_POLICIES.include?(overflow_policy)
2121

22-
opts = opts.merge(
22+
opts = {
2323
min_threads: num_threads,
2424
max_threads: num_threads,
25-
num_threads: overflow_policy,
25+
overflow_policy: overflow_policy,
2626
max_queue: DEFAULT_MAX_QUEUE_SIZE,
27-
idletime: 0
28-
)
27+
idletime: DEFAULT_THREAD_IDLETIMEOUT,
28+
}.merge(opts)
2929
super(opts)
3030
end
3131
end

lib/concurrent/executor/ruby_thread_pool_executor.rb

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,7 @@ def initialize(opts = {})
8686
raise ArgumentError.new('max_threads must be greater than zero') if @max_length <= 0
8787
raise ArgumentError.new('min_threads cannot be less than zero') if @min_length < 0
8888
raise ArgumentError.new("#{overflow_policy} is not a valid overflow policy") unless OVERFLOW_POLICIES.include?(@overflow_policy)
89+
raise ArgumentError.new('min_threads cannot be more than max_threads') if min_length > max_length
8990

9091
init_executor
9192

spec/concurrent/executor/fixed_thread_pool_shared.rb

Lines changed: 103 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
require 'spec_helper'
22
require_relative 'thread_pool_shared'
3+
require 'thread'
34

45
share_examples_for :fixed_thread_pool do
56

@@ -13,13 +14,64 @@
1314

1415
it_should_behave_like :thread_pool
1516

16-
context '#initialize' do
17+
context '#initialize default values' do
18+
19+
subject { described_class.new(5) }
20+
21+
it 'defaults :min_length correctly' do
22+
subject.min_length.should eq 5
23+
end
24+
25+
it 'defaults :max_length correctly' do
26+
subject.max_length.should eq 5
27+
end
28+
29+
it 'defaults :overflow_policy to :abort' do
30+
subject.overflow_policy.should eq :abort
31+
end
32+
33+
34+
it 'defaults :idletime correctly' do
35+
subject.idletime.should eq subject.class.const_get(:DEFAULT_THREAD_IDLETIMEOUT)
36+
end
37+
38+
it 'defaults default :max_queue to zero' do
39+
subject.max_queue.should eq 0
40+
end
41+
42+
end
43+
44+
context '#initialize explicit values' do
1745

1846
it 'raises an exception when the pool length is less than one' do
1947
lambda {
2048
described_class.new(0)
2149
}.should raise_error(ArgumentError)
2250
end
51+
52+
53+
it 'sets explicit :max_queue correctly' do
54+
subject = described_class.new(5, :max_queue => 10)
55+
subject.max_queue.should eq 10
56+
end
57+
58+
it 'correctly sets valid :overflow_policy' do
59+
subject = described_class.new(5, :overflow_policy => :caller_runs)
60+
subject.overflow_policy.should eq :caller_runs
61+
end
62+
63+
it "correctly sets valid :idletime" do
64+
subject = described_class.new(5, :idletime => 10)
65+
subject.idletime.should eq 10
66+
end
67+
68+
it 'raises an exception if given an invalid :overflow_policy' do
69+
expect {
70+
described_class.new(5, overflow_policy: :bogus)
71+
}.to raise_error(ArgumentError)
72+
end
73+
74+
2375
end
2476

2577
context '#min_length' do
@@ -102,12 +154,6 @@
102154
end
103155
end
104156

105-
context '#idletime' do
106-
107-
it 'returns zero' do
108-
subject.idletime.should eq 0
109-
end
110-
end
111157

112158
context '#kill' do
113159

@@ -133,4 +179,54 @@
133179
pool.kill
134180
end
135181
end
182+
183+
context 'overflow policy' do
184+
185+
before do
186+
@queue = Queue.new
187+
end
188+
189+
# On abort, it should raise an error
190+
it "raises an error when overflow on abort" do
191+
subject = described_class.new(2, :max_queue => 2, :overflow_policy => :abort)
192+
expect {
193+
5.times do |i|
194+
subject.post { sleep 0.1; @queue << i}
195+
end
196+
subject.shutdown
197+
subject.wait_for_termination(1)
198+
}.to raise_error
199+
end
200+
201+
# On discard, we'd expect no error, but also not all five results
202+
it 'discards when overflow is :discard' do
203+
subject = described_class.new(2, :max_queue => 2, :overflow_policy => :discard)
204+
5.times do |i|
205+
subject.post { sleep 0.1; @queue << i}
206+
end
207+
subject.shutdown
208+
subject.wait_for_termination(1)
209+
@queue.length.should be < 5
210+
end
211+
212+
# To check for caller_runs, we'll check how many unique threads
213+
# actually ran the block
214+
215+
it 'uses the calling thread for overflow under caller_runs' do
216+
subject = described_class.new(2, :max_queue => 2, :overflow_policy => :caller_runs)
217+
5.times do |i|
218+
subject.post { sleep 0.1; @queue << Thread.current}
219+
end
220+
subject.shutdown
221+
subject.wait_for_termination(1)
222+
# Turn the queue into an array
223+
a = []
224+
a << @queue.shift until @queue.empty?
225+
226+
a.size.should eq 5 # one for each run of the block
227+
a.uniq.size.should eq 3 # one for each of teh two threads, plus the caller
228+
end
229+
end
230+
231+
136232
end

spec/concurrent/executor/java_cached_thread_pool_spec.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66

77
module Concurrent
88

9-
describe JavaCachedThreadPool do
9+
describe JavaCachedThreadPool, :type=>:jruby do
1010

1111
subject { described_class.new(overflow_policy: :discard) }
1212

spec/concurrent/executor/java_fixed_thread_pool_spec.rb

Lines changed: 2 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66

77
module Concurrent
88

9-
describe JavaFixedThreadPool do
9+
describe JavaFixedThreadPool, :type=>:jruby do
1010

1111
subject { described_class.new(5, overflow_policy: :discard) }
1212

@@ -19,45 +19,16 @@ module Concurrent
1919

2020
context '#initialize' do
2121

22-
it 'sets :min_length correctly' do
23-
subject = JavaFixedThreadPool.new(10)
24-
subject.min_length.should eq 10
25-
end
26-
27-
it 'sets :max_length correctly' do
28-
subject = JavaFixedThreadPool.new(5)
29-
subject.max_length.should eq 5
30-
end
31-
32-
it 'sets :idletime correctly' do
33-
subject = JavaFixedThreadPool.new(5)
34-
subject.idletime.should eq 0
35-
end
36-
37-
it 'sets :max_queue correctly' do
38-
subject = JavaFixedThreadPool.new(5)
39-
subject.max_queue.should eq 0
40-
end
4122

4223
it 'sets :overflow_policy correctly' do
43-
clazz = java.util.concurrent.ThreadPoolExecutor::DiscardPolicy
24+
clazz = java.util.concurrent.ThreadPoolExecutor::DiscardPolicy
4425
policy = clazz.new
4526
clazz.should_receive(:new).at_least(:once).with(any_args).and_return(policy)
4627

4728
subject = JavaFixedThreadPool.new(5, overflow_policy: :discard)
4829
subject.overflow_policy.should eq :discard
4930
end
5031

51-
it 'defaults :overflow_policy to :abort' do
52-
subject = JavaFixedThreadPool.new(5)
53-
subject.overflow_policy.should eq :abort
54-
end
55-
56-
it 'raises an exception if given an invalid :overflow_policy' do
57-
expect {
58-
JavaFixedThreadPool.new(5, overflow_policy: :bogus)
59-
}.to raise_error(ArgumentError)
60-
end
6132
end
6233
end
6334
end

spec/concurrent/executor/java_single_thread_executor_spec.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66

77
module Concurrent
88

9-
describe JavaSingleThreadExecutor do
9+
describe JavaSingleThreadExecutor, :type=>:jruby do
1010

1111
after(:each) do
1212
subject.kill

spec/concurrent/executor/java_thread_pool_executor_spec.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66

77
module Concurrent
88

9-
describe JavaThreadPoolExecutor do
9+
describe JavaThreadPoolExecutor, :type => :jruby do
1010

1111
after(:each) do
1212
subject.kill

spec/concurrent/executor/ruby_cached_thread_pool_spec.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33

44
module Concurrent
55

6-
describe RubyCachedThreadPool do
6+
describe RubyCachedThreadPool, :type=>:mrirbx do
77

88
subject do
99
described_class.new(

0 commit comments

Comments
 (0)