Skip to content

Commit 2d40427

Browse files
committed
Merge pull request #39 from jdantonio/thread-pool-executor
`RubyThreadPoolExecutor` class
2 parents 2f535ff + 94cefa8 commit 2d40427

23 files changed

+660
-679
lines changed

lib/concurrent.rb

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
require 'concurrent/fixed_thread_pool'
3535
require 'concurrent/immediate_executor'
3636
require 'concurrent/per_thread_executor'
37+
require 'concurrent/thread_pool_executor'
3738
require 'concurrent/uses_global_thread_pool'
3839

3940
# Modern concurrency tools for Ruby. Inspired by Erlang, Clojure, Scala, Haskell,

lib/concurrent/immediate_executor.rb

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,14 @@ module Concurrent
22
class ImmediateExecutor
33

44
def post(*args, &block)
5+
raise ArgumentError.new('no block given') unless block_given?
56
block.call(*args)
7+
return true
68
end
79

810
def <<(block)
911
post(&block)
1012
self
1113
end
12-
1314
end
14-
end
15+
end

lib/concurrent/java_cached_thread_pool.rb

Lines changed: 16 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -1,50 +1,36 @@
11
if defined? java.util
22

3-
require 'concurrent/java_abstract_thread_pool'
3+
require 'concurrent/java_thread_pool_executor'
44

55
module Concurrent
66

77
# @!macro cached_thread_pool
8-
class JavaCachedThreadPool
9-
include JavaAbstractThreadPool
10-
11-
# The maximum number of threads that may be created in the pool
12-
# (unless overridden during construction).
13-
DEFAULT_MAX_POOL_SIZE = java.lang.Integer::MAX_VALUE # 2147483647
14-
15-
# The maximum number of seconds a thread in the pool may remain idle before
16-
# being reclaimed (unless overridden during construction).
17-
DEFAULT_THREAD_IDLETIME = 60
18-
19-
# The maximum number of threads that may be created in the pool.
20-
attr_reader :max_length
8+
class JavaCachedThreadPool < JavaThreadPoolExecutor
219

2210
# Create a new thread pool.
2311
#
12+
# @param [Hash] opts the options defining pool behavior.
13+
# @option opts [Integer] :max_threads (+DEFAULT_MAX_POOL_SIZE+) maximum number
14+
# of threads which may be created in the pool
15+
# @option opts [Integer] :idletime (+DEFAULT_THREAD_IDLETIMEOUT+) maximum
16+
# number of seconds a thread may be idle before it is reclaimed
17+
#
18+
# @raise [ArgumentError] if +max_threads+ is less than or equal to zero
19+
# @raise [ArgumentError] if +idletime+ is less than or equal to zero
20+
#
2421
# @see http://docs.oracle.com/javase/8/docs/api/java/util/concurrent/Executors.html#newCachedThreadPool--
2522
def initialize(opts = {})
26-
idletime = (opts[:thread_idletime] || opts[:idletime] || DEFAULT_THREAD_IDLETIME).to_i
27-
raise ArgumentError.new('idletime must be greater than zero') if idletime <= 0
23+
max_length = opts.fetch(:max_threads, DEFAULT_MAX_POOL_SIZE).to_i
24+
idletime = opts.fetch(:idletime, DEFAULT_THREAD_IDLETIMEOUT).to_i
2825

29-
@max_length = opts[:max_threads] || opts[:max] || DEFAULT_MAX_POOL_SIZE
30-
raise ArgumentError.new('maximum_number of threads must be greater than zero') if @max_length <= 0
26+
raise ArgumentError.new('idletime must be greater than zero') if idletime <= 0
27+
raise ArgumentError.new('max_threads must be greater than zero') if max_length <= 0
3128

32-
#@executor = java.util.concurrent.Executors.newCachedThreadPool
3329
@executor = java.util.concurrent.ThreadPoolExecutor.new(
34-
0, @max_length,
30+
0, max_length,
3531
idletime, java.util.concurrent.TimeUnit::SECONDS,
3632
java.util.concurrent.SynchronousQueue.new,
3733
java.util.concurrent.ThreadPoolExecutor::AbortPolicy.new)
38-
39-
#p = java.util.concurrent.Executors.newCachedThreadPool
40-
#p.getCorePoolSize #=> 0
41-
#p.getMaximumPoolSize #=> 2147483647
42-
#p.getKeepAliveTime(java.util.concurrent.TimeUnit::SECONDS) #=> 60
43-
#p.getQueue #=> #<Java::JavaUtilConcurrent::SynchronousQueue:0x68ec7913>
44-
45-
#p.getActiveCount #=> 0
46-
#p.getQueue.size #=> 0
47-
#p.getRejectedExecutionHandler #=> #<Java::JavaUtilConcurrent::ThreadPoolExecutor::AbortPolicy:0x57f897a7>
4834
end
4935
end
5036
end

lib/concurrent/java_fixed_thread_pool.rb

Lines changed: 4 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,37 +1,24 @@
11
if defined? java.util
22

3-
require 'concurrent/java_abstract_thread_pool'
3+
require 'concurrent/java_thread_pool_executor'
44
require 'concurrent/utilities'
55

66
module Concurrent
77

88
# @!macro fixed_thread_pool
9-
class JavaFixedThreadPool
10-
include JavaAbstractThreadPool
9+
class JavaFixedThreadPool < JavaThreadPoolExecutor
1110

1211
# Create a new thread pool.
1312
#
1413
# @see http://docs.oracle.com/javase/8/docs/api/java/util/concurrent/Executors.html#newFixedThreadPool-int-
1514
def initialize(num_threads = Concurrent::processor_count)
16-
@num_threads = num_threads.to_i
17-
raise ArgumentError.new('number of threads must be greater than zero') if @num_threads < 1
15+
raise ArgumentError.new('number of threads must be greater than zero') if num_threads < 1
1816

19-
#@executor = java.util.concurrent.Executors.newFixedThreadPool(@num_threads)
2017
@executor = java.util.concurrent.ThreadPoolExecutor.new(
21-
@num_threads, @num_threads,
18+
num_threads, num_threads,
2219
0, java.util.concurrent.TimeUnit::SECONDS,
2320
java.util.concurrent.LinkedBlockingQueue.new,
2421
java.util.concurrent.ThreadPoolExecutor::AbortPolicy.new)
25-
26-
#p = java.util.concurrent.Executors.newFixedThreadPool(10)
27-
#p.getCorePoolSize #=> 10
28-
#p.getMaximumPoolSize #=> 10
29-
#p.getKeepAliveTime(java.util.concurrent.TimeUnit::SECONDS) #=> 0
30-
#p.getQueue #=> #<Java::JavaUtilConcurrent::LinkedBlockingQueue:0x97dabf4>
31-
32-
#p.getActiveCount #=> 0
33-
#p.getQueue.size #=> 0
34-
#p.getRejectedExecutionHandler #=> #<Java::JavaUtilConcurrent::ThreadPoolExecutor::AbortPolicy:0x7e41986c>
3522
end
3623
end
3724
end

lib/concurrent/java_abstract_thread_pool.rb renamed to lib/concurrent/java_thread_pool_executor.rb

Lines changed: 71 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,60 @@
22

33
module Concurrent
44

5-
# @!macro cached_thread_pool
6-
module JavaAbstractThreadPool
5+
# @!macro thread_pool_executor
6+
class JavaThreadPoolExecutor
7+
8+
# The maximum number of threads that will be created in the pool
9+
# (unless overridden during construction).
10+
DEFAULT_MAX_POOL_SIZE = java.lang.Integer::MAX_VALUE # 2147483647
11+
12+
# The minimum number of threads that will be created in the pool
13+
# (unless overridden during construction).
14+
DEFAULT_MIN_POOL_SIZE = 0
15+
16+
DEFAULT_MAX_QUEUE_SIZE = 0
17+
18+
# The maximum number of seconds a thread in the pool may remain idle before
19+
# being reclaimed (unless overridden during construction).
20+
DEFAULT_THREAD_IDLETIMEOUT = 60
21+
22+
OVERFLOW_POLICIES = {
23+
abort: java.util.concurrent.ThreadPoolExecutor::AbortPolicy,
24+
discard: java.util.concurrent.ThreadPoolExecutor::DiscardPolicy,
25+
caller_runs: java.util.concurrent.ThreadPoolExecutor::CallerRunsPolicy
26+
}.freeze
27+
28+
# The maximum number of threads that may be created in the pool.
29+
attr_reader :max_length
30+
31+
attr_reader :max_queue
32+
33+
# Create a new thread pool.
34+
#
35+
# @see http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/ThreadPoolExecutor.html
36+
def initialize(opts = {})
37+
min_length = opts.fetch(:min_threads, DEFAULT_MIN_POOL_SIZE).to_i
38+
@max_length = opts.fetch(:max_threads, DEFAULT_MAX_POOL_SIZE).to_i
39+
idletime = opts.fetch(:idletime, DEFAULT_THREAD_IDLETIMEOUT).to_i
40+
@max_queue = opts.fetch(:max_queue, DEFAULT_MAX_QUEUE_SIZE).to_i
41+
overflow_policy = opts.fetch(:overflow_policy, :abort)
42+
43+
raise ArgumentError.new('max_threads must be greater than zero') if @max_length <= 0
44+
raise ArgumentError.new("#{overflow_policy} is not a valid overflow policy") unless OVERFLOW_POLICIES.keys.include?(overflow_policy)
45+
46+
if min_length == 0 && max_queue == 0
47+
queue = java.util.concurrent.SynchronousQueue.new
48+
elsif max_queue == 0
49+
queue = java.util.concurrent.LinkedBlockingQueue.new
50+
else
51+
queue = java.util.concurrent.LinkedBlockingQueue.new(max_queue)
52+
end
53+
54+
@executor = java.util.concurrent.ThreadPoolExecutor.new(
55+
min_length, @max_length,
56+
idletime, java.util.concurrent.TimeUnit::SECONDS,
57+
queue, OVERFLOW_POLICIES[overflow_policy].new)
58+
end
759

860
def min_length
961
@executor.getCorePoolSize
@@ -34,6 +86,14 @@ def idletime
3486
@executor.getKeepAliveTime(java.util.concurrent.TimeUnit::SECONDS)
3587
end
3688

89+
def queue_length
90+
@executor.getQueue.size
91+
end
92+
93+
def remaining_capacity
94+
@executor.getQueue.remainingCapacity
95+
end
96+
3797
# Is the thread pool running?
3898
#
3999
# @return [Boolean] +true+ when running, +false+ when shutting down or shutdown
@@ -48,13 +108,6 @@ def shutdown?
48108
@executor.isShutdown
49109
end
50110

51-
# Were all tasks completed before shutdown?
52-
#
53-
# @return [Boolean] +true+ if shutdown and all tasks completed else +false+
54-
def terminated?
55-
@executor.isTerminated
56-
end
57-
58111
# Block until thread pool shutdown is complete or until +timeout+ seconds have
59112
# passed.
60113
#
@@ -115,6 +168,15 @@ def kill
115168
@executor.shutdownNow
116169
return nil
117170
end
171+
172+
protected
173+
174+
# Were all tasks completed before shutdown?
175+
#
176+
# @return [Boolean] +true+ if shutdown and all tasks completed else +false+
177+
def terminated?
178+
@executor.isTerminated
179+
end
118180
end
119181
end
120182
end

lib/concurrent/per_thread_executor.rb

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ module Concurrent
33
class PerThreadExecutor
44

55
def self.post(*args)
6+
raise ArgumentError.new('no block given') unless block_given?
67
Thread.new(*args) do
78
Thread.current.abort_on_exception = false
89
yield(*args)

0 commit comments

Comments
 (0)