Skip to content

Commit 8509ca7

Browse files
committed
Executors now follow new subclass declaration convention.
1 parent 52967d5 commit 8509ca7

File tree

6 files changed

+216
-188
lines changed

6 files changed

+216
-188
lines changed

lib/concurrent/executor/cached_thread_pool.rb

Lines changed: 36 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -4,37 +4,41 @@ module Concurrent
44

55
if Concurrent.on_jruby?
66
require 'concurrent/executor/java_cached_thread_pool'
7-
# @!macro [attach] cached_thread_pool
8-
# A thread pool that dynamically grows and shrinks to fit the current workload.
9-
# New threads are created as needed, existing threads are reused, and threads
10-
# that remain idle for too long are killed and removed from the pool. These
11-
# pools are particularly suited to applications that perform a high volume of
12-
# short-lived tasks.
13-
#
14-
# On creation a `CachedThreadPool` has zero running threads. New threads are
15-
# created on the pool as new operations are `#post`. The size of the pool
16-
# will grow until `#max_length` threads are in the pool or until the number
17-
# of threads exceeds the number of running and pending operations. When a new
18-
# operation is post to the pool the first available idle thread will be tasked
19-
# with the new operation.
20-
#
21-
# Should a thread crash for any reason the thread will immediately be removed
22-
# from the pool. Similarly, threads which remain idle for an extended period
23-
# of time will be killed and reclaimed. Thus these thread pools are very
24-
# efficient at reclaiming unused resources.
25-
#
26-
# The API and behavior of this class are based on Java's `CachedThreadPool`
27-
#
28-
# @see Concurrent::RubyCachedThreadPool
29-
# @see Concurrent::JavaCachedThreadPool
30-
#
31-
# @!macro thread_pool_options
32-
class CachedThreadPool < JavaCachedThreadPool
33-
end
34-
else
35-
# @!macro cached_thread_pool
36-
# @!macro thread_pool_options
37-
class CachedThreadPool < RubyCachedThreadPool
38-
end
7+
end
8+
9+
CachedThreadPoolImplementation = case
10+
when Concurrent.on_jruby?
11+
JavaCachedThreadPool
12+
else
13+
RubyCachedThreadPool
14+
end
15+
private_constant :CachedThreadPoolImplementation
16+
17+
# @!macro [attach] cached_thread_pool
18+
# A thread pool that dynamically grows and shrinks to fit the current workload.
19+
# New threads are created as needed, existing threads are reused, and threads
20+
# that remain idle for too long are killed and removed from the pool. These
21+
# pools are particularly suited to applications that perform a high volume of
22+
# short-lived tasks.
23+
#
24+
# On creation a `CachedThreadPool` has zero running threads. New threads are
25+
# created on the pool as new operations are `#post`. The size of the pool
26+
# will grow until `#max_length` threads are in the pool or until the number
27+
# of threads exceeds the number of running and pending operations. When a new
28+
# operation is post to the pool the first available idle thread will be tasked
29+
# with the new operation.
30+
#
31+
# Should a thread crash for any reason the thread will immediately be removed
32+
# from the pool. Similarly, threads which remain idle for an extended period
33+
# of time will be killed and reclaimed. Thus these thread pools are very
34+
# efficient at reclaiming unused resources.
35+
#
36+
# The API and behavior of this class are based on Java's `CachedThreadPool`
37+
#
38+
# @see Concurrent::RubyCachedThreadPool
39+
# @see Concurrent::JavaCachedThreadPool
40+
#
41+
# @!macro thread_pool_options
42+
class CachedThreadPool < CachedThreadPoolImplementation
3943
end
4044
end

lib/concurrent/executor/fixed_thread_pool.rb

Lines changed: 83 additions & 81 deletions
Original file line numberDiff line numberDiff line change
@@ -3,88 +3,90 @@
33
module Concurrent
44

55
if Concurrent.on_jruby?
6-
76
require 'concurrent/executor/java_fixed_thread_pool'
7+
end
8+
9+
FixedThreadPoolImplementation = case
10+
when Concurrent.on_jruby?
11+
JavaFixedThreadPool
12+
else
13+
RubyFixedThreadPool
14+
end
15+
private_constant :FixedThreadPoolImplementation
816

9-
# @!macro [attach] fixed_thread_pool
10-
#
11-
# A thread pool with a set number of threads. The number of threads in the pool
12-
# is set on construction and remains constant. When all threads are busy new
13-
# tasks `#post` to the thread pool are enqueued until a thread becomes available.
14-
# Should a thread crash for any reason the thread will immediately be removed
15-
# from the pool and replaced.
16-
#
17-
# The API and behavior of this class are based on Java's `FixedThreadPool`
18-
#
19-
# @see Concurrent::RubyFixedThreadPool
20-
# @see Concurrent::JavaFixedThreadPool
21-
#
22-
# @!macro [attach] thread_pool_options
23-
#
24-
# Thread pools support several configuration options:
25-
#
26-
# * `idletime`: The number of seconds that a thread may be idle before being reclaimed.
27-
# * `max_queue`: The maximum number of tasks that may be waiting in the work queue at
28-
# any one time. When the queue size reaches `max_queue` subsequent tasks will be
29-
# rejected in accordance with the configured `fallback_policy`.
30-
# * `auto_terminate`: When true (default) an `at_exit` handler will be registered which
31-
# will stop the thread pool when the application exits. See below for more information
32-
# on shutting down thread pools.
33-
# * `fallback_policy`: The policy defining how rejected tasks are handled.
34-
#
35-
# Three fallback policies are supported:
36-
#
37-
# * `:abort`: Raise a `RejectedExecutionError` exception and discard the task.
38-
# * `:discard`: Discard the task and return false.
39-
# * `:caller_runs`: Execute the task on the calling thread.
40-
#
41-
# **Shutting Down Thread Pools**
42-
#
43-
# Killing a thread pool while tasks are still being processed, either by calling
44-
# the `#kill` method or at application exit, will have unpredictable results. There
45-
# is no way for the thread pool to know what resources are being used by the
46-
# in-progress tasks. When those tasks are killed the impact on those resources
47-
# cannot be predicted. The *best* practice is to explicitly shutdown all thread
48-
# pools using the provided methods:
49-
#
50-
# * Call `#shutdown` to initiate an orderly termination of all in-progress tasks
51-
# * Call `#wait_for_termination` with an appropriate timeout interval an allow
52-
# the orderly shutdown to complete
53-
# * Call `#kill` *only when* the thread pool fails to shutdown in the allotted time
54-
#
55-
# On some runtime platforms (most notably the JVM) the application will not
56-
# exit until all thread pools have been shutdown. To prevent applications from
57-
# "hanging" on exit all thread pools include an `at_exit` handler that will
58-
# stop the thread pool when the application exists. This handler uses a brute
59-
# force method to stop the pool and makes no guarantees regarding resources being
60-
# used by any tasks still running. Registration of this `at_exit` handler can be
61-
# prevented by setting the thread pool's constructor `:auto_terminate` option to
62-
# `false` when the thread pool is created. All thread pools support this option.
63-
#
64-
# ```ruby
65-
# pool1 = Concurrent::FixedThreadPool.new(5) # an `at_exit` handler will be registered
66-
# pool2 = Concurrent::FixedThreadPool.new(5, auto_terminate: false) # prevent `at_exit` handler registration
67-
# ```
68-
#
69-
# @note Failure to properly shutdown a thread pool can lead to unpredictable results.
70-
# Please read *Shutting Down Thread Pools* for more information.
71-
#
72-
# @note When running on the JVM (JRuby) this class will inherit from `JavaThreadPoolExecutor`.
73-
# On all other platforms it will inherit from `RubyThreadPoolExecutor`.
74-
#
75-
# @see Concurrent::RubyThreadPoolExecutor
76-
# @see Concurrent::JavaThreadPoolExecutor
77-
#
78-
# @see http://docs.oracle.com/javase/tutorial/essential/concurrency/pools.html Java Tutorials: Thread Pools
79-
# @see http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/Executors.html Java Executors class
80-
# @see http://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ExecutorService.html Java ExecutorService interface
81-
# @see http://ruby-doc.org//core-2.2.0/Kernel.html#method-i-at_exit Kernel#at_exit
82-
class FixedThreadPool < JavaFixedThreadPool
83-
end
84-
else
85-
# @!macro fixed_thread_pool
86-
# @!macro thread_pool_options
87-
class FixedThreadPool < RubyFixedThreadPool
88-
end
17+
# @!macro [attach] fixed_thread_pool
18+
#
19+
# A thread pool with a set number of threads. The number of threads in the pool
20+
# is set on construction and remains constant. When all threads are busy new
21+
# tasks `#post` to the thread pool are enqueued until a thread becomes available.
22+
# Should a thread crash for any reason the thread will immediately be removed
23+
# from the pool and replaced.
24+
#
25+
# The API and behavior of this class are based on Java's `FixedThreadPool`
26+
#
27+
# @see Concurrent::RubyFixedThreadPool
28+
# @see Concurrent::JavaFixedThreadPool
29+
#
30+
# @!macro [attach] thread_pool_options
31+
#
32+
# Thread pools support several configuration options:
33+
#
34+
# * `idletime`: The number of seconds that a thread may be idle before being reclaimed.
35+
# * `max_queue`: The maximum number of tasks that may be waiting in the work queue at
36+
# any one time. When the queue size reaches `max_queue` subsequent tasks will be
37+
# rejected in accordance with the configured `fallback_policy`.
38+
# * `auto_terminate`: When true (default) an `at_exit` handler will be registered which
39+
# will stop the thread pool when the application exits. See below for more information
40+
# on shutting down thread pools.
41+
# * `fallback_policy`: The policy defining how rejected tasks are handled.
42+
#
43+
# Three fallback policies are supported:
44+
#
45+
# * `:abort`: Raise a `RejectedExecutionError` exception and discard the task.
46+
# * `:discard`: Discard the task and return false.
47+
# * `:caller_runs`: Execute the task on the calling thread.
48+
#
49+
# **Shutting Down Thread Pools**
50+
#
51+
# Killing a thread pool while tasks are still being processed, either by calling
52+
# the `#kill` method or at application exit, will have unpredictable results. There
53+
# is no way for the thread pool to know what resources are being used by the
54+
# in-progress tasks. When those tasks are killed the impact on those resources
55+
# cannot be predicted. The *best* practice is to explicitly shutdown all thread
56+
# pools using the provided methods:
57+
#
58+
# * Call `#shutdown` to initiate an orderly termination of all in-progress tasks
59+
# * Call `#wait_for_termination` with an appropriate timeout interval an allow
60+
# the orderly shutdown to complete
61+
# * Call `#kill` *only when* the thread pool fails to shutdown in the allotted time
62+
#
63+
# On some runtime platforms (most notably the JVM) the application will not
64+
# exit until all thread pools have been shutdown. To prevent applications from
65+
# "hanging" on exit all thread pools include an `at_exit` handler that will
66+
# stop the thread pool when the application exists. This handler uses a brute
67+
# force method to stop the pool and makes no guarantees regarding resources being
68+
# used by any tasks still running. Registration of this `at_exit` handler can be
69+
# prevented by setting the thread pool's constructor `:auto_terminate` option to
70+
# `false` when the thread pool is created. All thread pools support this option.
71+
#
72+
# ```ruby
73+
# pool1 = Concurrent::FixedThreadPool.new(5) # an `at_exit` handler will be registered
74+
# pool2 = Concurrent::FixedThreadPool.new(5, auto_terminate: false) # prevent `at_exit` handler registration
75+
# ```
76+
#
77+
# @note Failure to properly shutdown a thread pool can lead to unpredictable results.
78+
# Please read *Shutting Down Thread Pools* for more information.
79+
#
80+
# @note When running on the JVM (JRuby) this class will inherit from `JavaFixedThreadPool`.
81+
# On all other platforms it will inherit from `RubyFixedThreadPool`.
82+
#
83+
# @see Concurrent::RubyFixedThreadPool
84+
# @see Concurrent::JavaFixedThreadPool
85+
#
86+
# @see http://docs.oracle.com/javase/tutorial/essential/concurrency/pools.html Java Tutorials: Thread Pools
87+
# @see http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/Executors.html Java Executors class
88+
# @see http://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ExecutorService.html Java ExecutorService interface
89+
# @see http://ruby-doc.org//core-2.2.0/Kernel.html#method-i-at_exit Kernel#at_exit
90+
class FixedThreadPool < FixedThreadPoolImplementation
8991
end
9092
end

lib/concurrent/executor/ruby_thread_pool_executor.rb

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,7 @@ def worker_died(worker)
128128

129129
protected
130130

131+
# @!visibility private
131132
def ns_initialize(opts)
132133
@min_length = opts.fetch(:min_threads, DEFAULT_MIN_POOL_SIZE).to_i
133134
@max_length = opts.fetch(:max_threads, DEFAULT_MAX_POOL_SIZE).to_i
@@ -155,10 +156,12 @@ def ns_initialize(opts)
155156
@next_gc_time = Concurrent.monotonic_time + @gc_interval
156157
end
157158

159+
# @!visibility private
158160
def ns_limited_queue?
159161
@max_queue != 0
160162
end
161163

164+
# @!visibility private
162165
def ns_execute(*args, &task)
163166
if ns_assign_worker(*args, &task) || ns_enqueue(*args, &task)
164167
@scheduled_task_count += 1
@@ -172,6 +175,7 @@ def ns_execute(*args, &task)
172175

173176
alias_method :execute, :ns_execute
174177

178+
# @!visibility private
175179
def ns_shutdown_execution
176180
if @pool.empty?
177181
# nothing to do
@@ -187,7 +191,7 @@ def ns_shutdown_execution
187191

188192
alias_method :shutdown_execution, :ns_shutdown_execution
189193

190-
# @api private
194+
# @!visibility private
191195
def ns_kill_execution
192196
# TODO log out unprocessed tasks in queue
193197
# TODO try to shutdown first?
@@ -200,6 +204,8 @@ def ns_kill_execution
200204

201205
# tries to assign task to a worker, tries to get one from @ready or to create new one
202206
# @return [true, false] if task is assigned to a worker
207+
#
208+
# @!visibility private
203209
def ns_assign_worker(*args, &task)
204210
# keep growing if the pool is not at the minimum yet
205211
worker = (@ready.pop if @pool.size >= @min_length) || ns_add_busy_worker
@@ -213,6 +219,8 @@ def ns_assign_worker(*args, &task)
213219

214220
# tries to enqueue task
215221
# @return [true, false] if enqueued
222+
#
223+
# @!visibility private
216224
def ns_enqueue(*args, &task)
217225
if !ns_limited_queue? || @queue.size < @max_queue
218226
@queue << [task, args]
@@ -222,6 +230,7 @@ def ns_enqueue(*args, &task)
222230
end
223231
end
224232

233+
# @!visibility private
225234
def ns_worker_died(worker)
226235
ns_remove_busy_worker worker
227236
replacement_worker = ns_add_busy_worker
@@ -230,6 +239,8 @@ def ns_worker_died(worker)
230239

231240
# creates new worker which has to receive work to do after it's added
232241
# @return [nil, Worker] nil of max capacity is reached
242+
#
243+
# @!visibility private
233244
def ns_add_busy_worker
234245
return if @pool.size >= @max_length
235246

@@ -239,6 +250,8 @@ def ns_add_busy_worker
239250
end
240251

241252
# handle ready worker, giving it new job or assigning back to @ready
253+
#
254+
# @!visibility private
242255
def ns_ready_worker(worker, success = true)
243256
@completed_task_count += 1 if success
244257
task_and_args = @queue.shift
@@ -255,20 +268,26 @@ def ns_ready_worker(worker, success = true)
255268
end
256269

257270
# returns back worker to @ready which was not idle for enough time
271+
#
272+
# @!visibility private
258273
def ns_worker_not_old_enough(worker)
259274
# let's put workers coming from idle_test back to the start (as the oldest worker)
260275
@ready.unshift(worker)
261276
true
262277
end
263278

264279
# removes a worker which is not in not tracked in @ready
280+
#
281+
# @!visibility private
265282
def ns_remove_busy_worker(worker)
266283
@pool.delete(worker)
267284
stopped_event.set if @pool.empty? && !running?
268285
true
269286
end
270287

271288
# try oldest worker if it is idle for enough time, it's returned back at the start
289+
#
290+
# @!visibility private
272291
def ns_prune_pool
273292
return if @pool.size <= @min_length
274293

@@ -345,5 +364,7 @@ def run_task(pool, task, args)
345364
throw :stop
346365
end
347366
end
367+
368+
private_constant :Worker
348369
end
349370
end

0 commit comments

Comments
 (0)