Skip to content

Commit 4e445c7

Browse files
committed
Removed JavaAbstractThreadPool in lieu of JavaThreadPoolExecutor.
* Created JavaThreadPoolExecutor; some specs failing. * JavaThreadPoolExecutor specs passing. * Added shared specs for global thread pool. * Removed JavaAbstractThreadPool in lieu of JavaThreadPoolExecutor.
1 parent a85a94d commit 4e445c7

15 files changed

+221
-81
lines changed

lib/concurrent.rb

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
require 'concurrent/fixed_thread_pool'
3535
require 'concurrent/immediate_executor'
3636
require 'concurrent/per_thread_executor'
37+
require 'concurrent/thread_pool_executor'
3738
require 'concurrent/uses_global_thread_pool'
3839

3940
# Modern concurrency tools for Ruby. Inspired by Erlang, Clojure, Scala, Haskell,

lib/concurrent/immediate_executor.rb

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,14 @@ module Concurrent
22
class ImmediateExecutor
33

44
def post(*args, &block)
5+
raise ArgumentError.new('no block given') unless block_given?
56
block.call(*args)
7+
return true
68
end
79

810
def <<(block)
911
post(&block)
1012
self
1113
end
12-
1314
end
14-
end
15+
end

lib/concurrent/java_cached_thread_pool.rb

Lines changed: 2 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,11 @@
11
if defined? java.util
22

3-
require 'concurrent/java_abstract_thread_pool'
3+
require 'concurrent/java_thread_pool_executor'
44

55
module Concurrent
66

77
# @!macro cached_thread_pool
8-
class JavaCachedThreadPool
9-
include JavaAbstractThreadPool
8+
class JavaCachedThreadPool < JavaThreadPoolExecutor
109

1110
# The maximum number of threads that may be created in the pool
1211
# (unless overridden during construction).
@@ -29,22 +28,11 @@ def initialize(opts = {})
2928
@max_length = opts[:max_threads] || opts[:max] || DEFAULT_MAX_POOL_SIZE
3029
raise ArgumentError.new('maximum_number of threads must be greater than zero') if @max_length <= 0
3130

32-
#@executor = java.util.concurrent.Executors.newCachedThreadPool
3331
@executor = java.util.concurrent.ThreadPoolExecutor.new(
3432
0, @max_length,
3533
idletime, java.util.concurrent.TimeUnit::SECONDS,
3634
java.util.concurrent.SynchronousQueue.new,
3735
java.util.concurrent.ThreadPoolExecutor::AbortPolicy.new)
38-
39-
#p = java.util.concurrent.Executors.newCachedThreadPool
40-
#p.getCorePoolSize #=> 0
41-
#p.getMaximumPoolSize #=> 2147483647
42-
#p.getKeepAliveTime(java.util.concurrent.TimeUnit::SECONDS) #=> 60
43-
#p.getQueue #=> #<Java::JavaUtilConcurrent::SynchronousQueue:0x68ec7913>
44-
45-
#p.getActiveCount #=> 0
46-
#p.getQueue.size #=> 0
47-
#p.getRejectedExecutionHandler #=> #<Java::JavaUtilConcurrent::ThreadPoolExecutor::AbortPolicy:0x57f897a7>
4836
end
4937
end
5038
end

lib/concurrent/java_fixed_thread_pool.rb

Lines changed: 2 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,12 @@
11
if defined? java.util
22

3-
require 'concurrent/java_abstract_thread_pool'
3+
require 'concurrent/java_thread_pool_executor'
44
require 'concurrent/utilities'
55

66
module Concurrent
77

88
# @!macro fixed_thread_pool
9-
class JavaFixedThreadPool
10-
include JavaAbstractThreadPool
9+
class JavaFixedThreadPool < JavaThreadPoolExecutor
1110

1211
# Create a new thread pool.
1312
#
@@ -16,22 +15,11 @@ def initialize(num_threads = Concurrent::processor_count)
1615
@num_threads = num_threads.to_i
1716
raise ArgumentError.new('number of threads must be greater than zero') if @num_threads < 1
1817

19-
#@executor = java.util.concurrent.Executors.newFixedThreadPool(@num_threads)
2018
@executor = java.util.concurrent.ThreadPoolExecutor.new(
2119
@num_threads, @num_threads,
2220
0, java.util.concurrent.TimeUnit::SECONDS,
2321
java.util.concurrent.LinkedBlockingQueue.new,
2422
java.util.concurrent.ThreadPoolExecutor::AbortPolicy.new)
25-
26-
#p = java.util.concurrent.Executors.newFixedThreadPool(10)
27-
#p.getCorePoolSize #=> 10
28-
#p.getMaximumPoolSize #=> 10
29-
#p.getKeepAliveTime(java.util.concurrent.TimeUnit::SECONDS) #=> 0
30-
#p.getQueue #=> #<Java::JavaUtilConcurrent::LinkedBlockingQueue:0x97dabf4>
31-
32-
#p.getActiveCount #=> 0
33-
#p.getQueue.size #=> 0
34-
#p.getRejectedExecutionHandler #=> #<Java::JavaUtilConcurrent::ThreadPoolExecutor::AbortPolicy:0x7e41986c>
3523
end
3624
end
3725
end

lib/concurrent/java_abstract_thread_pool.rb renamed to lib/concurrent/java_thread_pool_executor.rb

Lines changed: 62 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,60 @@
22

33
module Concurrent
44

5-
# @!macro cached_thread_pool
6-
module JavaAbstractThreadPool
5+
# @!macro thread_pool_executor
6+
class JavaThreadPoolExecutor
7+
8+
# The maximum number of threads that will be created in the pool
9+
# (unless overridden during construction).
10+
DEFAULT_MAX_POOL_SIZE = java.lang.Integer::MAX_VALUE # 2147483647
11+
12+
# The minimum number of threads that will be created in the pool
13+
# (unless overridden during construction).
14+
DEFAULT_MIN_POOL_SIZE = 0
15+
16+
DEFAULT_MAX_QUEUE_SIZE = 0
17+
18+
# The maximum number of seconds a thread in the pool may remain idle before
19+
# being reclaimed (unless overridden during construction).
20+
DEFAULT_THREAD_IDLETIMEOUT = 60
21+
22+
OVERFLOW_POLICIES = {
23+
abort: java.util.concurrent.ThreadPoolExecutor::AbortPolicy,
24+
discard: java.util.concurrent.ThreadPoolExecutor::DiscardPolicy,
25+
caller_runs: java.util.concurrent.ThreadPoolExecutor::CallerRunsPolicy
26+
}.freeze
27+
28+
# The maximum number of threads that may be created in the pool.
29+
attr_reader :max_length
30+
31+
attr_reader :max_queue
32+
33+
# Create a new thread pool.
34+
#
35+
# @see http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/ThreadPoolExecutor.html
36+
def initialize(opts = {})
37+
min_length = opts.fetch(:min_threads, DEFAULT_MIN_POOL_SIZE).to_i
38+
@max_length = opts.fetch(:max_threads, DEFAULT_MAX_POOL_SIZE).to_i
39+
idletime = opts.fetch(:idletime, DEFAULT_THREAD_IDLETIMEOUT).to_i
40+
@max_queue = opts.fetch(:max_queue, DEFAULT_MAX_QUEUE_SIZE).to_i
41+
overflow_policy = opts.fetch(:overflow_policy, :abort)
42+
43+
raise ArgumentError.new('max_threads must be greater than zero') if @max_length <= 0
44+
raise ArgumentError.new("#{overflow_policy} is not a valid overflow policy") unless OVERFLOW_POLICIES.keys.include?(overflow_policy)
45+
46+
if min_length == 0 && max_queue == 0
47+
queue = java.util.concurrent.SynchronousQueue.new
48+
elsif max_queue == 0
49+
queue = java.util.concurrent.LinkedBlockingQueue.new
50+
else
51+
queue = java.util.concurrent.LinkedBlockingQueue.new(max_queue)
52+
end
53+
54+
@executor = java.util.concurrent.ThreadPoolExecutor.new(
55+
min_length, @max_length,
56+
idletime, java.util.concurrent.TimeUnit::SECONDS,
57+
queue, OVERFLOW_POLICIES[overflow_policy].new)
58+
end
759

860
def min_length
961
@executor.getCorePoolSize
@@ -34,6 +86,14 @@ def idletime
3486
@executor.getKeepAliveTime(java.util.concurrent.TimeUnit::SECONDS)
3587
end
3688

89+
def queue_length
90+
@executor.getQueue.size
91+
end
92+
93+
def remaining_capacity
94+
@executor.getQueue.remainingCapacity
95+
end
96+
3797
# Is the thread pool running?
3898
#
3999
# @return [Boolean] +true+ when running, +false+ when shutting down or shutdown

lib/concurrent/per_thread_executor.rb

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ module Concurrent
33
class PerThreadExecutor
44

55
def self.post(*args)
6+
raise ArgumentError.new('no block given') unless block_given?
67
Thread.new(*args) do
78
Thread.current.abort_on_exception = false
89
yield(*args)

lib/concurrent/ruby_fixed_thread_pool.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ def wait_for_termination(timeout)
8181
#
8282
# @raise [ArgumentError] if no block is given
8383
def post(*args, &task)
84-
raise ArgumentError.new('no block given') if task.nil?
84+
raise ArgumentError.new('no block given') unless block_given?
8585
@mutex.synchronize do
8686
break false unless @state == :running
8787
@scheduled_task_count += 1
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
#require 'concurrent/ruby_thread_pool_executor'
2+
3+
module Concurrent
4+
5+
if defined? java.util
6+
require 'concurrent/java_thread_pool_executor'
7+
# @!macro [attach] thread_pool_executor
8+
#
9+
# A thread pool...
10+
#
11+
# The API and behavior of this class are based on Java's +ThreadPoolExecutor+
12+
#
13+
# @note When running on the JVM (JRuby) this class will inherit from +JavaThreadPoolExecutor+.
14+
# On all other platforms it will inherit from +RubyThreadPoolExecutor+.
15+
#
16+
# @see Concurrent::RubyThreadPoolExecutor
17+
# @see Concurrent::JavaThreadPoolExecutor
18+
#
19+
# @see http://docs.oracle.com/javase/tutorial/essential/concurrency/pools.html
20+
# @see http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/Executors.html
21+
# @see http://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ExecutorService.html
22+
# @see http://stackoverflow.com/questions/17957382/fixedthreadpool-vs-fixedthreadpool-the-lesser-of-two-evils
23+
class ThreadPoolExecutor < JavaThreadPoolExecutor
24+
end
25+
else
26+
## @!macro thread_pool_executor
27+
#class ThreadPoolExecutor < RubyThreadPoolExecutor
28+
#end
29+
end
30+
end

spec/concurrent/cached_thread_pool_shared.rb

Lines changed: 12 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@
2323

2424
it 'raises an exception when the pool size is less than one' do
2525
lambda {
26-
described_class.new(max: 0)
26+
described_class.new(max_threads: 0)
2727
}.should raise_error(ArgumentError)
2828
end
2929

@@ -74,37 +74,24 @@
7474
end
7575
end
7676

77-
context '#length' do
78-
79-
it 'returns zero for a new thread pool' do
80-
subject.length.should eq 0
81-
end
82-
83-
it 'returns :max_length while running' do
84-
10.times{ subject.post{ sleep(0.1) } }
85-
sleep(1)
86-
subject.length.should eq max_threads
87-
end
88-
end
89-
9077
context '#largest_length' do
9178

9279
it 'returns zero on creation' do
9380
subject.largest_length.should eq 0
9481
end
9582

96-
it 'returns :max_threads while running' do
83+
it 'returns a non-zero number once tasks have been received' do
9784
10.times{ subject.post{ sleep(0.1) } }
98-
sleep(1)
99-
subject.largest_length.should eq max_threads
85+
sleep(0.1)
86+
subject.largest_length.should > 0
10087
end
10188

102-
it 'returns :max_threads once shutdown' do
89+
it 'returns a non-zero number after shutdown if tasks have been received' do
10390
10.times{ subject.post{ sleep(0.1) } }
104-
sleep(1)
91+
sleep(0.1)
10592
subject.shutdown
10693
subject.wait_for_termination(1)
107-
subject.largest_length.should eq max_threads
94+
subject.largest_length.should > 0
10895
end
10996
end
11097

@@ -119,10 +106,11 @@
119106

120107
context 'garbage collection' do
121108

122-
subject{ described_class.new(idletime: 1) }
109+
subject{ described_class.new(idletime: 1, max_threads: 5) }
123110

124111
it 'removes from pool any thread that has been idle too long' do
125112
3.times { subject << proc{ sleep(0.1) } }
113+
sleep(0.1)
126114
subject.length.should eq 3
127115
sleep(2)
128116
subject << proc{ nil }
@@ -131,7 +119,8 @@
131119

132120
it 'removes from pool any dead thread' do
133121
3.times { subject << proc{ sleep(0.1); raise Exception } }
134-
subject.length.should == 3
122+
sleep(0.1)
123+
subject.length.should eq 3
135124
sleep(2)
136125
subject << proc{ nil }
137126
subject.length.should < 3
@@ -157,7 +146,7 @@
157146
end
158147

159148
it 'never creates more than :max_threads threads' do
160-
pool = described_class.new(max: 5)
149+
pool = described_class.new(max_threads: 5)
161150
100.times{ sleep(0.01); pool << proc{ sleep(1) } }
162151
sleep(0.1)
163152
pool.length.should eq 5
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
require 'spec_helper'
2+
3+
share_examples_for :global_thread_pool do
4+
5+
context '#post' do
6+
7+
it 'raises an exception if no block is given' do
8+
lambda {
9+
subject.post
10+
}.should raise_error(ArgumentError)
11+
end
12+
13+
it 'returns true when the block is added to the queue' do
14+
subject.post{ nil }.should be_true
15+
end
16+
17+
it 'calls the block with the given arguments' do
18+
@expected = nil
19+
subject.post(1, 2, 3) do |a, b, c|
20+
@expected = a + b + c
21+
end
22+
sleep(0.1)
23+
@expected.should eq 6
24+
end
25+
26+
it 'aliases #<<' do
27+
@expected = false
28+
subject << proc { @expected = true }
29+
sleep(0.1)
30+
@expected.should be_true
31+
end
32+
end
33+
end

0 commit comments

Comments
 (0)