Skip to content

Commit d85eeee

Browse files
committed
Documented fixed thread pools.
1 parent fb9145e commit d85eeee

File tree

5 files changed

+168
-29
lines changed

5 files changed

+168
-29
lines changed

lib/concurrent/cached_thread_pool.rb

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -2,19 +2,17 @@
22

33
module Concurrent
44

5-
# @!class CachedThreadPool
6-
75
if defined? java.util
86
require 'concurrent/java_cached_thread_pool'
97
# @!macro [attach] cached_thread_pool
108
# A thread pool that dynamically grows and shrinks to fit the current workload.
119
# New threads are created as needed, existing threads are reused, and threads
1210
# that remain idle for too long are killed and removed from the pool. These
13-
# pools are articularly suited to programs that run a high volume of short-lived
14-
# tasks.
11+
# pools are particularly suited to applications that perform a high volume of
12+
# short-lived tasks.
1513
#
1614
# On creation a +CachedThreadPool+ has zero running threads. New threads are
17-
# spawned on the pool as new operations are +#post+. The size of the pool
15+
# created on the pool as new operations are +#post+. The size of the pool
1816
# will grow until +#max_threads+ threads are in the pool or until the number
1917
# of threads exceeds the number of running and pending operations. When a new
2018
# operation is post to the pool the first available idle thread will be tasked
@@ -26,14 +24,22 @@ module Concurrent
2624
# efficient at reclaiming unused resources.
2725
#
2826
# The API and behavior of this class are based on Java's +CachedThreadPool+
27+
#
28+
# @note When running on the JVM (JRuby) this class will inherit from +JavaCachedThreadPool+.
29+
# On all other platforms it will inherit from +RubyCachedThreadPool+.
30+
#
31+
# @see Concurrent::RubyCachedThreadPool
32+
# @see Concurrent::JavaCachedThreadPool
2933
#
3034
# @see http://docs.oracle.com/javase/tutorial/essential/concurrency/pools.html
3135
# @see http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/Executors.html
3236
# @see http://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ExecutorService.html
3337
# @see http://stackoverflow.com/questions/17957382/fixedthreadpool-vs-cachedthreadpool-the-lesser-of-two-evils
34-
CachedThreadPool = Class.new(JavaCachedThreadPool)
38+
class CachedThreadPool < JavaCachedThreadPool
39+
end
3540
else
3641
# @!macro cached_thread_pool
37-
CachedThreadPool = Class.new(RubyCachedThreadPool)
42+
class CachedThreadPool < RubyCachedThreadPool
43+
end
3844
end
3945
end

lib/concurrent/fixed_thread_pool.rb

Lines changed: 25 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,31 @@ module Concurrent
44

55
if defined? java.util
66
require 'concurrent/java_fixed_thread_pool'
7-
FixedThreadPool = Class.new(JavaFixedThreadPool)
7+
# @!macro [attach] fixed_thread_pool
8+
#
9+
# A thread pool with a set number of threads. The number of threads in the pool
10+
# is set on construction and remains constant. When all threads are busy new
11+
# tasks +#post+ to the thread pool are enqueued until a thread becomes available.
12+
# Should a thread crash for any reason the thread will immediately be removed
13+
# from the pool and replaced.
14+
#
15+
# The API and behavior of this class are based on Java's +FixedThreadPool+
16+
#
17+
# @note When running on the JVM (JRuby) this class will inherit from +JavaFixedThreadPool+.
18+
# On all other platforms it will inherit from +RubyFixedThreadPool+.
19+
#
20+
# @see Concurrent::RubyFixedThreadPool
21+
# @see Concurrent::JavaFixedThreadPool
22+
#
23+
# @see http://docs.oracle.com/javase/tutorial/essential/concurrency/pools.html
24+
# @see http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/Executors.html
25+
# @see http://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ExecutorService.html
26+
# @see http://stackoverflow.com/questions/17957382/fixedthreadpool-vs-fixedthreadpool-the-lesser-of-two-evils
27+
class FixedThreadPool < JavaFixedThreadPool
28+
end
829
else
9-
FixedThreadPool = Class.new(RubyFixedThreadPool)
30+
# @!macro fixed_thread_pool
31+
class FixedThreadPool < RubyFixedThreadPool
32+
end
1033
end
1134
end

lib/concurrent/java_fixed_thread_pool.rb

Lines changed: 49 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,11 @@
44

55
module Concurrent
66

7-
# @see http://docs.oracle.com/javase/tutorial/essential/concurrency/pools.html
8-
# @see http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/Executors.html
9-
# @see http://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ExecutorService.html
7+
# @!macro fixed_thread_pool
108
class JavaFixedThreadPool
119

10+
# Create a new thread pool.
11+
#
1212
# @see http://docs.oracle.com/javase/8/docs/api/java/util/concurrent/Executors.html#newFixedThreadPool-int-
1313
def initialize(num_threads = Concurrent::processor_count)
1414
@num_threads = num_threads.to_i
@@ -17,22 +17,50 @@ def initialize(num_threads = Concurrent::processor_count)
1717
@executor = java.util.concurrent.Executors.newFixedThreadPool(@num_threads)
1818
end
1919

20+
# Is the thread pool running?
21+
#
22+
# @return [Boolean] +true+ when running, +false+ when shutting down or shutdown
2023
def running?
2124
! (shutdown? || terminated?)
2225
end
2326

27+
# Is the thread pool shutdown?
28+
#
29+
# @return [Boolean] +true+ when shutdown, +false+ when shutting down or running
2430
def shutdown?
2531
@executor.isShutdown
2632
end
2733

34+
# Were all tasks completed before shutdown?
35+
#
36+
# @return [Boolean] +true+ if shutdown and all tasks completed else +false+
2837
def terminated?
2938
@executor.isTerminated
3039
end
3140

41+
# Block until thread pool shutdown is complete or until +timeout+ seconds have
42+
# passed.
43+
#
44+
# @note Does not initiate shutdown or termination. Either +shutdown+ or +kill+
45+
# must be called before this method (or on another thread).
46+
#
47+
# @param [Integer] timeout the maximum number of seconds to wait for shutdown to complete
48+
#
49+
# @return [Boolean] +true+ if shutdown complete or false on +timeout+
3250
def wait_for_termination(timeout)
3351
@executor.awaitTermination(timeout.to_i, java.util.concurrent.TimeUnit::SECONDS)
3452
end
3553

54+
# Submit a task to the thread pool for asynchronous processing.
55+
#
56+
# @param [Array] args zero or more arguments to be passed to the task
57+
#
58+
# @yield the asynchronous task to perform
59+
#
60+
# @return [Boolean] +true+ if the task is queued, +false+ if the thread pool
61+
# is not running
62+
#
63+
# @raise [ArgumentError] if no task is given
3664
def post(*args)
3765
raise ArgumentError.new('no block given') unless block_given?
3866
@executor.submit{ yield(*args) }
@@ -41,24 +69,40 @@ def post(*args)
4169
return false
4270
end
4371

44-
def <<(block)
45-
@executor.submit(&block)
72+
# Submit a task to the thread pool for asynchronous processing.
73+
#
74+
# @param [Proc] task the asynchronous task to perform
75+
#
76+
# @return [self] returns itself
77+
def <<(task)
78+
@executor.submit(&task)
4679
rescue Java::JavaUtilConcurrent::RejectedExecutionException => ex
4780
# do nothing
4881
ensure
4982
return self
5083
end
5184

85+
# Begin an orderly shutdown. Tasks already in the queue will be executed,
86+
# but no new tasks will be accepted. Has no additional effect if the
87+
# thread pool is not running.
5288
def shutdown
5389
@executor.shutdown
5490
return nil
5591
end
5692

93+
# Begin an immediate shutdown. In-progress tasks will be allowed to
94+
# complete but enqueued tasks will be dismissed and no new tasks
95+
# will be accepted. Has no additional effect if the thread pool is
96+
# not running.
5797
def kill
5898
@executor.shutdownNow
5999
return nil
60100
end
61101

102+
# The number of threads currently in the pool.
103+
#
104+
# @return [Integer] a non-zero value when the pool is running,
105+
# zero when the pool is shutdown
62106
def length
63107
running? ? @num_threads : 0
64108
end

lib/concurrent/ruby_cached_thread_pool.rb

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -140,8 +140,8 @@ def kill
140140
@mutex.synchronize do
141141
break if @state == :shutdown
142142
@state = :shutdown
143-
@idle.each{|worker| worker.kill }
144-
@busy.each{|worker| worker.kill }
143+
@idle.each{|worker| worker.kill }
144+
@busy.each{|worker| worker.kill }
145145
@terminator.set
146146
end
147147
end
@@ -156,6 +156,8 @@ def length
156156
end
157157
end
158158
alias_method :size, :length
159+
alias_method :current_size, :length
160+
alias_method :current_length, :length
159161

160162
# @!visibility private
161163
def on_worker_exit(worker) # :nodoc:

lib/concurrent/ruby_fixed_thread_pool.rb

Lines changed: 77 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,21 @@
55

66
module Concurrent
77

8+
# @!macro fixed_thread_pool
9+
#
10+
# @note To prevent deadlocks and race conditions, no threads will be allocated
11+
# on construction. Threads will be allocated once the first task is post to
12+
# the pool. Additionally, threads that crash will be removed from the pool and
13+
# replaced. Thus the +#length+ and +#current_length+ may occasionally be
14+
# different.
815
class RubyFixedThreadPool
916

10-
def initialize(num_threads, opts = {})
17+
# Create a new thread pool.
18+
#
19+
# @param [Integer] num_threads the number of threads to allocate
20+
#
21+
# @raise [ArgumentError] if +num_threads+ is less than or equal to zero
22+
def initialize(num_threads)
1123
@num_threads = num_threads.to_i
1224
raise ArgumentError.new('number of threads must be greater than zero') if @num_threads < 1
1325

@@ -18,34 +30,67 @@ def initialize(num_threads, opts = {})
1830
@mutex = Mutex.new
1931
end
2032

33+
# Is the thread pool running?
34+
#
35+
# @return [Boolean] +true+ when running, +false+ when shutting down or shutdown
2136
def running?
2237
return @state == :running
2338
end
2439

40+
# Is the thread pool shutdown?
41+
#
42+
# @return [Boolean] +true+ when shutdown, +false+ when shutting down or running
2543
def shutdown?
2644
return @state != :running
2745
end
2846

47+
# Block until thread pool shutdown is complete or until +timeout+ seconds have
48+
# passed.
49+
#
50+
# @note Does not initiate shutdown or termination. Either +shutdown+ or +kill+
51+
# must be called before this method (or on another thread).
52+
#
53+
# @param [Integer] timeout the maximum number of seconds to wait for shutdown to complete
54+
#
55+
# @return [Boolean] +true+ if shutdown complete or false on +timeout+
2956
def wait_for_termination(timeout)
3057
return @terminator.wait(timeout.to_i)
3158
end
3259

33-
def post(*args, &block)
34-
raise ArgumentError.new('no block given') if block.nil?
60+
# Submit a task to the thread pool for asynchronous processing.
61+
#
62+
# @param [Array] args zero or more arguments to be passed to the block
63+
#
64+
# @yield the asynchronous task to perform
65+
#
66+
# @return [Boolean] +true+ if the task is queued, +false+ if the thread pool
67+
# is not running
68+
#
69+
# @raise [ArgumentError] if no block is given
70+
def post(*args, &task)
71+
raise ArgumentError.new('no block given') if task.nil?
3572
@mutex.synchronize do
3673
break false unless @state == :running
37-
@queue << [args, block]
74+
@queue << [args, task]
3875
clean_pool
3976
fill_pool
4077
true
4178
end
4279
end
4380

44-
def <<(block)
45-
self.post(&block)
81+
# Submit a task to the thread pool for asynchronous processing.
82+
#
83+
# @param [Proc] task the asynchronous task to perform
84+
#
85+
# @return [self] returns itself
86+
def <<(task)
87+
self.post(&task)
4688
return self
4789
end
4890

91+
# Begin an orderly shutdown. Tasks already in the queue will be executed,
92+
# but no new tasks will be accepted. Has no additional effect if the
93+
# thread pool is not running.
4994
def shutdown
5095
@mutex.synchronize do
5196
break unless @state == :running
@@ -59,6 +104,10 @@ def shutdown
59104
end
60105
end
61106

107+
# Begin an immediate shutdown. In-progress tasks will be allowed to
108+
# complete but enqueued tasks will be dismissed and no new tasks
109+
# will be accepted. Has no additional effect if the thread pool is
110+
# not running.
62111
def kill
63112
@mutex.synchronize do
64113
break if @state == :shutdown
@@ -69,21 +118,30 @@ def kill
69118
end
70119
end
71120

121+
# The number of threads allocated for the pool.
122+
#
123+
# @return [Integer] the number of threads allocated for a running pool,
124+
# zero when the pool is shutdown
72125
def length
73126
@mutex.synchronize do
74127
@state == :running ? @num_threads : 0
75128
end
76129
end
77130
alias_method :size, :length
78131

132+
# The number of threads currently in the pool.
133+
#
134+
# @return [Integer] the number of threads currently operating when the
135+
# pool is running, zero when the pool is shutdown
79136
def current_length
80137
@mutex.synchronize do
81138
@state == :running ? @pool.length : 0
82139
end
83140
end
84141
alias_method :current_size, :current_length
85142

86-
def create_worker_thread
143+
# @!visibility private
144+
def create_worker_thread # :nodoc:
87145
wrkr = Worker.new(@queue, self)
88146
Thread.new(wrkr, self) do |worker, parent|
89147
Thread.current.abort_on_exception = false
@@ -93,34 +151,40 @@ def create_worker_thread
93151
return wrkr
94152
end
95153

96-
def fill_pool
154+
# @!visibility private
155+
def fill_pool # :nodoc:
97156
return unless @state == :running
98157
while @pool.length < @num_threads
99158
@pool << create_worker_thread
100159
end
101160
end
102161

103-
def clean_pool
162+
# @!visibility private
163+
def clean_pool # :nodoc:
104164
@pool.reject! {|worker| worker.dead? }
105165
end
106166

107-
def drain_pool
167+
# @!visibility private
168+
def drain_pool # :nodoc:
108169
@pool.each {|worker| worker.kill }
109170
@pool.clear
110171
end
111172

112-
def on_start_task(worker)
173+
# @!visibility private
174+
def on_start_task(worker) # :nodoc:
113175
end
114176

115-
def on_end_task(worker)
177+
# @!visibility private
178+
def on_end_task(worker) # :nodoc:
116179
@mutex.synchronize do
117180
break unless @state == :running
118181
clean_pool
119182
fill_pool
120183
end
121184
end
122185

123-
def on_worker_exit(worker)
186+
# @!visibility private
187+
def on_worker_exit(worker) # :nodoc:
124188
@mutex.synchronize do
125189
@pool.delete(worker)
126190
if @pool.empty? && @state != :running

0 commit comments

Comments
 (0)