Skip to content

Commit b2c5bae

Browse files
committed
Merge pull request #334 from ruby-concurrency/refactor/thread-pools
Cached/FixedThreadPool directly extend ThreadPoolExecutor
2 parents 89ce879 + 245413d commit b2c5bae

16 files changed

+601
-784
lines changed
Lines changed: 41 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -1,46 +1,32 @@
1-
require 'concurrent/executor/ruby_cached_thread_pool'
1+
require 'concurrent/utility/engine'
2+
require 'concurrent/executor/thread_pool_executor'
23

34
module Concurrent
45

5-
if Concurrent.on_jruby?
6-
require 'concurrent/executor/java_cached_thread_pool'
7-
end
8-
9-
CachedThreadPoolImplementation = case
10-
when Concurrent.on_jruby?
11-
JavaCachedThreadPool
12-
else
13-
RubyCachedThreadPool
14-
end
15-
private_constant :CachedThreadPoolImplementation
16-
17-
# @!macro [attach] cached_thread_pool
6+
# A thread pool that dynamically grows and shrinks to fit the current workload.
7+
# New threads are created as needed, existing threads are reused, and threads
8+
# that remain idle for too long are killed and removed from the pool. These
9+
# pools are particularly suited to applications that perform a high volume of
10+
# short-lived tasks.
1811
#
19-
# A thread pool that dynamically grows and shrinks to fit the current workload.
20-
# New threads are created as needed, existing threads are reused, and threads
21-
# that remain idle for too long are killed and removed from the pool. These
22-
# pools are particularly suited to applications that perform a high volume of
23-
# short-lived tasks.
12+
# On creation a `CachedThreadPool` has zero running threads. New threads are
13+
# created on the pool as new operations are `#post`. The size of the pool
14+
# will grow until `#max_length` threads are in the pool or until the number
15+
# of threads exceeds the number of running and pending operations. When a new
16+
# operation is post to the pool the first available idle thread will be tasked
17+
# with the new operation.
2418
#
25-
# On creation a `CachedThreadPool` has zero running threads. New threads are
26-
# created on the pool as new operations are `#post`. The size of the pool
27-
# will grow until `#max_length` threads are in the pool or until the number
28-
# of threads exceeds the number of running and pending operations. When a new
29-
# operation is post to the pool the first available idle thread will be tasked
30-
# with the new operation.
19+
# Should a thread crash for any reason the thread will immediately be removed
20+
# from the pool. Similarly, threads which remain idle for an extended period
21+
# of time will be killed and reclaimed. Thus these thread pools are very
22+
# efficient at reclaiming unused resources.
3123
#
32-
# Should a thread crash for any reason the thread will immediately be removed
33-
# from the pool. Similarly, threads which remain idle for an extended period
34-
# of time will be killed and reclaimed. Thus these thread pools are very
35-
# efficient at reclaiming unused resources.
36-
#
37-
# The API and behavior of this class are based on Java's `CachedThreadPool`
24+
# The API and behavior of this class are based on Java's `CachedThreadPool`
3825
#
3926
# @!macro thread_pool_options
40-
# @!macro thread_pool_executor_public_api
41-
class CachedThreadPool < CachedThreadPoolImplementation
27+
class CachedThreadPool < ThreadPoolExecutor
4228

43-
# @!macro [new] cached_thread_pool_method_initialize
29+
# @!macro [attach] cached_thread_pool_method_initialize
4430
#
4531
# Create a new thread pool.
4632
#
@@ -50,8 +36,27 @@ class CachedThreadPool < CachedThreadPoolImplementation
5036
# @raise [ArgumentError] if `fallback_policy` is not a known policy
5137
#
5238
# @see http://docs.oracle.com/javase/8/docs/api/java/util/concurrent/Executors.html#newCachedThreadPool--
39+
def initialize(opts = {})
40+
defaults = { idletime: DEFAULT_THREAD_IDLETIMEOUT }
41+
overrides = { min_threads: 0,
42+
max_threads: DEFAULT_MAX_POOL_SIZE,
43+
max_queue: DEFAULT_MAX_QUEUE_SIZE }
44+
super(defaults.merge(opts).merge(overrides))
45+
end
46+
47+
private
5348

54-
# @!method initialize(opts = {})
55-
# @!macro cached_thread_pool_method_initialize
49+
# @!macro cached_thread_pool_method_initialize
50+
# @!visibility private
51+
def ns_initialize(opts)
52+
super(opts)
53+
if Concurrent.on_jruby?
54+
@max_queue = 0
55+
@executor = java.util.concurrent.Executors.newCachedThreadPool
56+
@executor.setRejectedExecutionHandler(FALLBACK_POLICY_CLASSES[@fallback_policy].new)
57+
@executor.setKeepAliveTime(opts.fetch(:idletime, DEFAULT_THREAD_IDLETIMEOUT), java.util.concurrent.TimeUnit::SECONDS)
58+
self.auto_terminate = opts.fetch(:auto_terminate, true)
59+
end
60+
end
5661
end
5762
end

lib/concurrent/executor/fixed_thread_pool.rb

Lines changed: 40 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,8 @@
1-
require 'concurrent/executor/ruby_fixed_thread_pool'
1+
require 'concurrent/utility/engine'
2+
require 'concurrent/executor/thread_pool_executor'
23

34
module Concurrent
45

5-
if Concurrent.on_jruby?
6-
require 'concurrent/executor/java_fixed_thread_pool'
7-
end
8-
9-
FixedThreadPoolImplementation = case
10-
when Concurrent.on_jruby?
11-
JavaFixedThreadPool
12-
else
13-
RubyFixedThreadPool
14-
end
15-
private_constant :FixedThreadPoolImplementation
16-
176
# @!macro [new] thread_pool_executor_constant_default_max_pool_size
187
# Default maximum number of threads that will be created in the pool.
198

@@ -119,35 +108,7 @@ module Concurrent
119108

120109

121110

122-
123-
# @!macro [new] fixed_thread_pool_method_initialize
124-
#
125-
# Create a new thread pool.
126-
#
127-
# @param [Integer] num_threads the number of threads to allocate
128-
# @param [Hash] opts the options defining pool behavior.
129-
# @option opts [Symbol] :fallback_policy (`:abort`) the fallback policy
130-
#
131-
# @raise [ArgumentError] if `num_threads` is less than or equal to zero
132-
# @raise [ArgumentError] if `fallback_policy` is not a known policy
133-
#
134-
# @see http://docs.oracle.com/javase/8/docs/api/java/util/concurrent/Executors.html#newFixedThreadPool-int-
135-
136-
137-
138-
139-
140-
# @!macro [attach] fixed_thread_pool
141-
#
142-
# A thread pool with a set number of threads. The number of threads in the pool
143-
# is set on construction and remains constant. When all threads are busy new
144-
# tasks `#post` to the thread pool are enqueued until a thread becomes available.
145-
# Should a thread crash for any reason the thread will immediately be removed
146-
# from the pool and replaced.
147-
#
148-
# The API and behavior of this class are based on Java's `FixedThreadPool`
149-
#
150-
# @!macro [attach] thread_pool_options
111+
# @!macro [new] thread_pool_options
151112
#
152113
# **Thread Pool Options**
153114
#
@@ -203,11 +164,43 @@ module Concurrent
203164
# @see http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/Executors.html Java Executors class
204165
# @see http://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ExecutorService.html Java ExecutorService interface
205166
# @see http://ruby-doc.org//core-2.2.0/Kernel.html#method-i-at_exit Kernel#at_exit
206-
#
207-
# @!macro thread_pool_executor_public_api
208-
class FixedThreadPool < FixedThreadPoolImplementation
209167

210-
# @!method initialize(num_threads, opts = {})
211-
# @!macro fixed_thread_pool_method_initialize
168+
169+
170+
171+
172+
# @!macro [attach] fixed_thread_pool
173+
#
174+
# A thread pool with a set number of threads. The number of threads in the pool
175+
# is set on construction and remains constant. When all threads are busy new
176+
# tasks `#post` to the thread pool are enqueued until a thread becomes available.
177+
# Should a thread crash for any reason the thread will immediately be removed
178+
# from the pool and replaced.
179+
#
180+
# The API and behavior of this class are based on Java's `FixedThreadPool`
181+
#
182+
# @!macro thread_pool_options
183+
class FixedThreadPool < ThreadPoolExecutor
184+
185+
# @!macro [attach] fixed_thread_pool_method_initialize
186+
#
187+
# Create a new thread pool.
188+
#
189+
# @param [Integer] num_threads the number of threads to allocate
190+
# @param [Hash] opts the options defining pool behavior.
191+
# @option opts [Symbol] :fallback_policy (`:abort`) the fallback policy
192+
#
193+
# @raise [ArgumentError] if `num_threads` is less than or equal to zero
194+
# @raise [ArgumentError] if `fallback_policy` is not a known policy
195+
#
196+
# @see http://docs.oracle.com/javase/8/docs/api/java/util/concurrent/Executors.html#newFixedThreadPool-int-
197+
def initialize(num_threads, opts = {})
198+
raise ArgumentError.new('number of threads must be greater than zero') if num_threads.to_i < 1
199+
defaults = { max_queue: DEFAULT_MAX_QUEUE_SIZE,
200+
idletime: DEFAULT_THREAD_IDLETIMEOUT }
201+
overrides = { min_threads: num_threads,
202+
max_threads: num_threads }
203+
super(defaults.merge(opts).merge(overrides))
204+
end
212205
end
213206
end

lib/concurrent/executor/java_cached_thread_pool.rb

Lines changed: 0 additions & 34 deletions
This file was deleted.

lib/concurrent/executor/java_fixed_thread_pool.rb

Lines changed: 0 additions & 24 deletions
This file was deleted.

lib/concurrent/executor/ruby_cached_thread_pool.rb

Lines changed: 0 additions & 20 deletions
This file was deleted.

lib/concurrent/executor/ruby_fixed_thread_pool.rb

Lines changed: 0 additions & 21 deletions
This file was deleted.

lib/concurrent/executor/thread_pool_executor.rb

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
require 'concurrent/utility/engine'
12
require 'concurrent/executor/ruby_thread_pool_executor'
23

34
module Concurrent

0 commit comments

Comments
 (0)