Skip to content

Commit 4da0cce

Browse files
committed
RubyThreadPoolExecutor has better algorithm for creating new threads.
1 parent 96fae99 commit 4da0cce

File tree

1 file changed

+52
-31
lines changed

1 file changed

+52
-31
lines changed

lib/concurrent/executor/ruby_thread_pool_executor.rb

Lines changed: 52 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -157,14 +157,13 @@ def on_worker_exit(worker)
157157

158158
# @!visibility private
159159
def execute(*args, &task)
160-
return handle_overflow(*args, &task) if @max_queue != 0 && @queue.length >= @max_queue
161-
@scheduled_task_count += 1
162-
@queue << [args, task]
163-
if Time.now.to_f - @gc_interval >= @last_gc_time
164-
prune_pool
165-
@last_gc_time = Time.now.to_f
160+
prune_pool
161+
if ensure_capacity?
162+
@scheduled_task_count += 1
163+
@queue << [args, task]
164+
else
165+
handle_overflow(*args, &task) if @max_queue != 0 && @queue.length >= @max_queue
166166
end
167-
grow_pool
168167
end
169168

170169
# @!visibility private
@@ -183,6 +182,42 @@ def kill_execution
183182
drain_pool
184183
end
185184

185+
# Check the thread pool configuration and determine if the pool
186+
# has enought capacity to handle the request. Will grow the size
187+
# of the pool if necessary.
188+
#
189+
# @return [Boolean] true if the pool has enough capacity else false
190+
#
191+
# @!visibility private
192+
def ensure_capacity?
193+
additional = 0
194+
capacity = true
195+
196+
if @pool.size < @min_length
197+
additional = @min_length - @pool.size
198+
elsif @queue.empty? && @queue.num_waiting >= 1
199+
additional = 0
200+
elsif @pool.size == 0 && @min_length == 0
201+
additional = 1
202+
elsif @pool.size < @max_length || @max_length == 0
203+
additional = 1
204+
elsif @max_queue == 0 || @queue.size < @max_queue
205+
additional = 0
206+
else
207+
capacity = false
208+
end
209+
210+
additional.times do
211+
@pool << create_worker_thread
212+
end
213+
214+
if additional > 0
215+
@largest_length = [@largest_length, @pool.length].max
216+
end
217+
218+
capacity
219+
end
220+
186221
# Handler which executes the `overflow_policy` once the queue size
187222
# reaches `max_queue`.
188223
#
@@ -205,34 +240,20 @@ def handle_overflow(*args)
205240
end
206241
end
207242

208-
# Scan all threads in the pool and reclaim any that are dead or have been idle
209-
# too long.
243+
# Scan all threads in the pool and reclaim any that are dead or
244+
# have been idle too long. Will check the last time the pool was
245+
# pruned and only run if the configured garbage collection
246+
# interval has passed.
210247
#
211248
# @!visibility private
212249
def prune_pool
213-
@pool.delete_if do |worker|
214-
worker.dead? ||
215-
(@idletime == 0 ? false : Time.now.to_f - @idletime > worker.last_activity)
216-
end
217-
end
218-
219-
# Increase the size of the pool when necessary.
220-
#
221-
# @!visibility private
222-
def grow_pool
223-
if @min_length > @pool.length
224-
additional = @min_length - @pool.length
225-
elsif @pool.length < @max_length && ! @queue.empty?
226-
# NOTE: does not take into account idle threads
227-
additional = 1
228-
else
229-
additional = 0
230-
end
231-
additional.times do
232-
break if @pool.length >= @max_length
233-
@pool << create_worker_thread
250+
if Time.now.to_f - @gc_interval >= @last_gc_time
251+
@pool.delete_if do |worker|
252+
worker.dead? ||
253+
(@idletime == 0 ? false : Time.now.to_f - @idletime > worker.last_activity)
254+
end
255+
@last_gc_time = Time.now.to_f
234256
end
235-
@largest_length = [@largest_length, @pool.length].max
236257
end
237258

238259
# Reclaim all threads in the pool.

0 commit comments

Comments
 (0)