Skip to content

Commit 2f535ff

Browse files
committed
Merge pull request #38 from jdantonio/better-thread-pool
Refactored thread pools to more closely mirror Java ThreadPoolExecutor.
2 parents e277678 + a85a94d commit 2f535ff

13 files changed

+462
-211
lines changed

lib/concurrent/cached_thread_pool.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ module Concurrent
1313
#
1414
# On creation a +CachedThreadPool+ has zero running threads. New threads are
1515
# created on the pool as new operations are +#post+. The size of the pool
16-
# will grow until +#max_threads+ threads are in the pool or until the number
16+
# will grow until +#max_length+ threads are in the pool or until the number
1717
# of threads exceeds the number of running and pending operations. When a new
1818
# operation is post to the pool the first available idle thread will be tasked
1919
# with the new operation.

lib/concurrent/java_abstract_thread_pool.rb

Lines changed: 29 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,35 @@ module Concurrent
55
# @!macro cached_thread_pool
66
module JavaAbstractThreadPool
77

8+
def min_length
9+
@executor.getCorePoolSize
10+
end
11+
12+
def max_length
13+
@executor.getMaximumPoolSize
14+
end
15+
16+
def length
17+
@executor.getPoolSize
18+
end
19+
alias_method :current_length, :length
20+
21+
def largest_length
22+
@executor.getLargestPoolSize
23+
end
24+
25+
def scheduled_task_count
26+
@executor.getTaskCount
27+
end
28+
29+
def completed_task_count
30+
@executor.getCompletedTaskCount
31+
end
32+
33+
def idletime
34+
@executor.getKeepAliveTime(java.util.concurrent.TimeUnit::SECONDS)
35+
end
36+
837
# Is the thread pool running?
938
#
1039
# @return [Boolean] +true+ when running, +false+ when shutting down or shutdown
@@ -86,15 +115,6 @@ def kill
86115
@executor.shutdownNow
87116
return nil
88117
end
89-
90-
# The number of threads currently in the pool.
91-
#
92-
# @return [Integer] a non-zero value when the pool is running,
93-
# zero when the pool is shutdown
94-
def length
95-
running? ? 1 : 0
96-
end
97-
alias_method :size, :length
98118
end
99119
end
100120
end

lib/concurrent/java_cached_thread_pool.rb

Lines changed: 32 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -8,21 +8,44 @@ module Concurrent
88
class JavaCachedThreadPool
99
include JavaAbstractThreadPool
1010

11+
# The maximum number of threads that may be created in the pool
12+
# (unless overridden during construction).
13+
DEFAULT_MAX_POOL_SIZE = java.lang.Integer::MAX_VALUE # 2147483647
14+
15+
# The maximum number of seconds a thread in the pool may remain idle before
16+
# being reclaimed (unless overridden during construction).
17+
DEFAULT_THREAD_IDLETIME = 60
18+
19+
# The maximum number of threads that may be created in the pool.
20+
attr_reader :max_length
21+
1122
# Create a new thread pool.
1223
#
1324
# @see http://docs.oracle.com/javase/8/docs/api/java/util/concurrent/Executors.html#newCachedThreadPool--
1425
def initialize(opts = {})
15-
@executor = java.util.concurrent.Executors.newCachedThreadPool
16-
end
26+
idletime = (opts[:thread_idletime] || opts[:idletime] || DEFAULT_THREAD_IDLETIME).to_i
27+
raise ArgumentError.new('idletime must be greater than zero') if idletime <= 0
1728

18-
# The number of threads currently in the pool.
19-
#
20-
# @return [Integer] a non-zero value when the pool is running,
21-
# zero when the pool is shutdown
22-
def length
23-
running? ? 1 : 0
29+
@max_length = opts[:max_threads] || opts[:max] || DEFAULT_MAX_POOL_SIZE
30+
raise ArgumentError.new('maximum_number of threads must be greater than zero') if @max_length <= 0
31+
32+
#@executor = java.util.concurrent.Executors.newCachedThreadPool
33+
@executor = java.util.concurrent.ThreadPoolExecutor.new(
34+
0, @max_length,
35+
idletime, java.util.concurrent.TimeUnit::SECONDS,
36+
java.util.concurrent.SynchronousQueue.new,
37+
java.util.concurrent.ThreadPoolExecutor::AbortPolicy.new)
38+
39+
#p = java.util.concurrent.Executors.newCachedThreadPool
40+
#p.getCorePoolSize #=> 0
41+
#p.getMaximumPoolSize #=> 2147483647
42+
#p.getKeepAliveTime(java.util.concurrent.TimeUnit::SECONDS) #=> 60
43+
#p.getQueue #=> #<Java::JavaUtilConcurrent::SynchronousQueue:0x68ec7913>
44+
45+
#p.getActiveCount #=> 0
46+
#p.getQueue.size #=> 0
47+
#p.getRejectedExecutionHandler #=> #<Java::JavaUtilConcurrent::ThreadPoolExecutor::AbortPolicy:0x57f897a7>
2448
end
25-
alias_method :size, :length
2649
end
2750
end
2851
end

lib/concurrent/java_fixed_thread_pool.rb

Lines changed: 15 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -16,17 +16,23 @@ def initialize(num_threads = Concurrent::processor_count)
1616
@num_threads = num_threads.to_i
1717
raise ArgumentError.new('number of threads must be greater than zero') if @num_threads < 1
1818

19-
@executor = java.util.concurrent.Executors.newFixedThreadPool(@num_threads)
20-
end
19+
#@executor = java.util.concurrent.Executors.newFixedThreadPool(@num_threads)
20+
@executor = java.util.concurrent.ThreadPoolExecutor.new(
21+
@num_threads, @num_threads,
22+
0, java.util.concurrent.TimeUnit::SECONDS,
23+
java.util.concurrent.LinkedBlockingQueue.new,
24+
java.util.concurrent.ThreadPoolExecutor::AbortPolicy.new)
2125

22-
# The number of threads currently in the pool.
23-
#
24-
# @return [Integer] a non-zero value when the pool is running,
25-
# zero when the pool is shutdown
26-
def length
27-
running? ? @num_threads : 0
26+
#p = java.util.concurrent.Executors.newFixedThreadPool(10)
27+
#p.getCorePoolSize #=> 10
28+
#p.getMaximumPoolSize #=> 10
29+
#p.getKeepAliveTime(java.util.concurrent.TimeUnit::SECONDS) #=> 0
30+
#p.getQueue #=> #<Java::JavaUtilConcurrent::LinkedBlockingQueue:0x97dabf4>
31+
32+
#p.getActiveCount #=> 0
33+
#p.getQueue.size #=> 0
34+
#p.getRejectedExecutionHandler #=> #<Java::JavaUtilConcurrent::ThreadPoolExecutor::AbortPolicy:0x7e41986c>
2835
end
29-
alias_method :size, :length
3036
end
3137
end
3238
end

lib/concurrent/ruby_cached_thread_pool.rb

Lines changed: 28 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,17 @@ class RubyCachedThreadPool
1717
DEFAULT_THREAD_IDLETIME = 60
1818

1919
# The maximum number of threads that may be created in the pool.
20-
attr_accessor :max_threads
20+
attr_reader :max_length
21+
22+
# The minimum number of threads that may be created in the pool.
23+
attr_reader :min_length
24+
25+
attr_reader :largest_length
26+
27+
attr_reader :scheduled_task_count
28+
attr_reader :completed_task_count
29+
30+
attr_reader :idletime
2131

2232
# Create a new thread pool.
2333
#
@@ -33,8 +43,8 @@ def initialize(opts = {})
3343
@idletime = (opts[:thread_idletime] || opts[:idletime] || DEFAULT_THREAD_IDLETIME).to_i
3444
raise ArgumentError.new('idletime must be greater than zero') if @idletime <= 0
3545

36-
@max_threads = opts[:max_threads] || opts[:max] || DEFAULT_MAX_POOL_SIZE
37-
raise ArgumentError.new('maximum_number of threads must be greater than zero') if @max_threads <= 0
46+
@max_length = opts[:max_threads] || opts[:max] || DEFAULT_MAX_POOL_SIZE
47+
raise ArgumentError.new('maximum_number of threads must be greater than zero') if @max_length <= 0
3848

3949
@state = :running
4050
@pool = []
@@ -43,6 +53,10 @@ def initialize(opts = {})
4353

4454
@busy = []
4555
@idle = []
56+
@scheduled_task_count = 0
57+
@completed_task_count = 0
58+
@min_length = 0
59+
@largest_length = 0
4660
end
4761

4862
# Is the thread pool running?
@@ -86,18 +100,20 @@ def post(*args, &task)
86100
raise ArgumentError.new('no block given') unless block_given?
87101
@mutex.synchronize do
88102
break false unless @state == :running
103+
@scheduled_task_count += 1
89104

90105
if @idle.empty?
91-
if @idle.length + @busy.length < @max_threads
106+
if @idle.length + @busy.length < @max_length
92107
worker = create_worker_thread
93108
else
94-
worker = @busy.shift
109+
worker = @busy.pop
95110
end
96111
else
97112
worker = @idle.pop
98113
end
99114

100115
@busy.push(worker)
116+
@largest_length = [@idle.length + @busy.length, @largest_length].max
101117
worker.signal(*args, &task)
102118

103119
prune_stale_workers
@@ -152,11 +168,9 @@ def kill
152168
# zero when the pool is shutdown
153169
def length
154170
@mutex.synchronize do
155-
@state == :running ? @busy.length + @idle.length : 0
171+
@state != :shutdown ? @busy.length + @idle.length : 0
156172
end
157173
end
158-
alias_method :size, :length
159-
alias_method :current_size, :length
160174
alias_method :current_length, :length
161175

162176
# @!visibility private
@@ -172,11 +186,14 @@ def on_worker_exit(worker) # :nodoc:
172186
end
173187

174188
# @!visibility private
175-
def on_end_task(worker) # :nodoc:
189+
def on_end_task(worker, success) # :nodoc:
176190
@mutex.synchronize do
191+
@completed_task_count += 1 #if success
177192
break unless @state == :running
178-
@busy.delete(worker)
179-
@idle.push(worker)
193+
unless worker.tasks_remaining?
194+
@busy.delete(worker)
195+
@idle.push(worker)
196+
end
180197
end
181198
end
182199

lib/concurrent/ruby_cached_thread_pool/worker.rb

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,11 @@ def initialize(parent)
1313
@resource = ConditionVariable.new
1414
@tasks = Queue.new
1515
end
16-
17-
def idle?
18-
return ! @idletime.nil?
16+
17+
def tasks_remaining?
18+
return @mutex.synchronize do
19+
! @tasks.empty?
20+
end
1921
end
2022

2123
def dead?
@@ -65,7 +67,6 @@ def run(thread = Thread.current)
6567
loop do
6668
task = @mutex.synchronize do
6769
@resource.wait(@mutex, 60) if @tasks.empty?
68-
6970
@tasks.pop(true)
7071
end
7172

@@ -76,13 +77,12 @@ def run(thread = Thread.current)
7677
break
7778
end
7879

79-
#@parent.on_start_task(self)
8080
begin
8181
task.last.call(*task.first)
82-
rescue
82+
rescue => ex
8383
# let it fail
8484
ensure
85-
@parent.on_end_task(self)
85+
@parent.on_end_task(self, ex.nil?)
8686
end
8787
end
8888
end

0 commit comments

Comments
 (0)