Skip to content

Commit 02c28ba

Browse files
committed
Ruby thread pools now extend RubyThreadPoolExecutor.
1 parent 66f0755 commit 02c28ba

File tree

4 files changed

+24
-537
lines changed

4 files changed

+24
-537
lines changed
Lines changed: 15 additions & 207 deletions
Original file line numberDiff line numberDiff line change
@@ -1,225 +1,33 @@
1-
require 'thread'
2-
3-
require 'concurrent/event'
4-
require 'concurrent/ruby_cached_thread_pool/worker'
1+
require 'concurrent/ruby_thread_pool_executor'
52

63
module Concurrent
74

85
# @!macro cached_thread_pool
9-
class RubyCachedThreadPool
10-
11-
# The maximum number of threads that may be created in the pool
12-
# (unless overridden during construction).
13-
DEFAULT_MAX_POOL_SIZE = 2**15 # 32768
14-
15-
# The maximum number of seconds a thread in the pool may remain idle before
16-
# being reclaimed (unless overridden during construction).
17-
DEFAULT_THREAD_IDLETIME = 60
18-
19-
# The maximum number of threads that may be created in the pool.
20-
attr_reader :max_length
21-
22-
# The minimum number of threads that may be created in the pool.
23-
attr_reader :min_length
24-
25-
attr_reader :largest_length
26-
27-
attr_reader :scheduled_task_count
28-
attr_reader :completed_task_count
29-
30-
attr_reader :idletime
6+
class RubyCachedThreadPool < RubyThreadPoolExecutor
317

328
# Create a new thread pool.
339
#
3410
# @param [Hash] opts the options defining pool behavior.
3511
# @option opts [Integer] :max_threads (+DEFAULT_MAX_POOL_SIZE+) maximum number
3612
# of threads which may be created in the pool
37-
# @option opts [Integer] :thread_idletime (+DEFAULT_THREAD_IDLETIME+) maximum
13+
# @option opts [Integer] :idletime (+DEFAULT_THREAD_IDLETIME+) maximum
3814
# number of seconds a thread may be idle before it is reclaimed
3915
#
4016
# @raise [ArgumentError] if +max_threads+ is less than or equal to zero
4117
# @raise [ArgumentError] if +thread_idletime+ is less than or equal to zero
4218
def initialize(opts = {})
43-
@idletime = (opts[:thread_idletime] || opts[:idletime] || DEFAULT_THREAD_IDLETIME).to_i
44-
raise ArgumentError.new('idletime must be greater than zero') if @idletime <= 0
45-
46-
@max_length = opts[:max_threads] || opts[:max] || DEFAULT_MAX_POOL_SIZE
47-
raise ArgumentError.new('maximum_number of threads must be greater than zero') if @max_length <= 0
48-
49-
@state = :running
50-
@pool = []
51-
@terminator = Event.new
52-
@mutex = Mutex.new
53-
54-
@busy = []
55-
@idle = []
56-
@scheduled_task_count = 0
57-
@completed_task_count = 0
58-
@min_length = 0
59-
@largest_length = 0
60-
end
61-
62-
# Is the thread pool running?
63-
#
64-
# @return [Boolean] +true+ when running, +false+ when shutting down or shutdown
65-
def running?
66-
@mutex.synchronize { @state == :running }
67-
end
68-
69-
# Is the thread pool shutdown?
70-
#
71-
# @return [Boolean] +true+ when shutdown, +false+ when shutting down or running
72-
def shutdown?
73-
@mutex.synchronize { @state != :running }
74-
end
75-
76-
# Block until thread pool shutdown is complete or until +timeout+ seconds have
77-
# passed.
78-
#
79-
# @note Does not initiate shutdown or termination. Either +shutdown+ or +kill+
80-
# must be called before this method (or on another thread).
81-
#
82-
# @param [Integer] timeout the maximum number of seconds to wait for shutdown to complete
83-
#
84-
# @return [Boolean] +true+ if shutdown complete or false on +timeout+
85-
def wait_for_termination(timeout)
86-
return @terminator.wait(timeout.to_i)
87-
end
88-
89-
# Submit a task to the thread pool for asynchronous processing.
90-
#
91-
# @param [Array] args zero or more arguments to be passed to the block
92-
#
93-
# @yield the asynchronous task to perform
94-
#
95-
# @return [Boolean] +true+ if the task is queued, +false+ if the thread pool
96-
# is not running
97-
#
98-
# @raise [ArgumentError] if no block is given
99-
def post(*args, &task)
100-
raise ArgumentError.new('no block given') unless block_given?
101-
@mutex.synchronize do
102-
break false unless @state == :running
103-
@scheduled_task_count += 1
104-
105-
if @idle.empty?
106-
if @idle.length + @busy.length < @max_length
107-
worker = create_worker_thread
108-
else
109-
worker = @busy.pop
110-
end
111-
else
112-
worker = @idle.pop
113-
end
114-
115-
@busy.push(worker)
116-
@largest_length = [@idle.length + @busy.length, @largest_length].max
117-
worker.signal(*args, &task)
118-
119-
prune_stale_workers
120-
true
121-
end
122-
end
123-
124-
# Submit a task to the thread pool for asynchronous processing.
125-
#
126-
# @param [Proc] task the asynchronous task to perform
127-
#
128-
# @return [self] returns itself
129-
def <<(task)
130-
self.post(&task)
131-
return self
132-
end
133-
134-
# Begin an orderly shutdown. Tasks already in the queue will be executed,
135-
# but no new tasks will be accepted. Has no additional effect if the
136-
# thread pool is not running.
137-
def shutdown
138-
@mutex.synchronize do
139-
break unless @state == :running
140-
if @idle.empty? && @busy.empty?
141-
@state = :shutdown
142-
@terminator.set
143-
else
144-
@state = :shuttingdown
145-
@idle.each{|worker| worker.stop }
146-
@busy.each{|worker| worker.stop }
147-
end
148-
end
149-
end
150-
151-
# Begin an immediate shutdown. In-progress tasks will be allowed to
152-
# complete but enqueued tasks will be dismissed and no new tasks
153-
# will be accepted. Has no additional effect if the thread pool is
154-
# not running.
155-
def kill
156-
@mutex.synchronize do
157-
break if @state == :shutdown
158-
@state = :shutdown
159-
@idle.each{|worker| worker.kill }
160-
@busy.each{|worker| worker.kill }
161-
@terminator.set
162-
end
163-
end
164-
165-
# The number of threads currently in the pool.
166-
#
167-
# @return [Integer] the number of threads in a running pool,
168-
# zero when the pool is shutdown
169-
def length
170-
@mutex.synchronize do
171-
@state != :shutdown ? @busy.length + @idle.length : 0
172-
end
173-
end
174-
alias_method :current_length, :length
175-
176-
# @!visibility private
177-
def on_worker_exit(worker) # :nodoc:
178-
@mutex.synchronize do
179-
@idle.delete(worker)
180-
@busy.delete(worker)
181-
if @idle.empty? && @busy.empty? && @state != :running
182-
@state = :shutdown
183-
@terminator.set
184-
end
185-
end
186-
end
187-
188-
# @!visibility private
189-
def on_end_task(worker, success) # :nodoc:
190-
@mutex.synchronize do
191-
@completed_task_count += 1 #if success
192-
break unless @state == :running
193-
unless worker.tasks_remaining?
194-
@busy.delete(worker)
195-
@idle.push(worker)
196-
end
197-
end
198-
end
199-
200-
protected
201-
202-
# @!visibility private
203-
def create_worker_thread # :nodoc:
204-
wrkr = Worker.new(self)
205-
Thread.new(wrkr, self) do |worker, parent|
206-
Thread.current.abort_on_exception = false
207-
worker.run
208-
parent.on_worker_exit(worker)
209-
end
210-
return wrkr
211-
end
212-
213-
# @!visibility private
214-
def prune_stale_workers # :nodoc:
215-
@idle.reject! do |worker|
216-
if worker.idletime > @idletime
217-
worker.stop
218-
true
219-
else
220-
worker.dead?
221-
end
222-
end
19+
max_length = opts.fetch(:max_threads, DEFAULT_MAX_POOL_SIZE).to_i
20+
idletime = opts.fetch(:idletime, DEFAULT_THREAD_IDLETIMEOUT).to_i
21+
22+
raise ArgumentError.new('idletime must be greater than zero') if idletime <= 0
23+
raise ArgumentError.new('max_threads must be greater than zero') if max_length <= 0
24+
25+
opts = opts.merge(
26+
min_threads: 0,
27+
max_threads: max_length,
28+
idletime: idletime
29+
)
30+
super(opts)
22331
end
22432
end
22533
end

lib/concurrent/ruby_cached_thread_pool/worker.rb

Lines changed: 0 additions & 91 deletions
This file was deleted.

0 commit comments

Comments
 (0)