Skip to content

Commit 6f8ea67

Browse files
committed
wip: RubyThreadPoolExecutor now uses post/execute from Exector.
1 parent 87fb2a3 commit 6f8ea67

File tree

1 file changed

+19
-33
lines changed

1 file changed

+19
-33
lines changed

lib/concurrent/executor/ruby_thread_pool_executor.rb

Lines changed: 19 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -146,32 +146,6 @@ def wait_for_termination(timeout)
146146
return @stopped_event.wait(timeout.to_i)
147147
end
148148

149-
# Submit a task to the thread pool for asynchronous processing.
150-
#
151-
# @param [Array] args zero or more arguments to be passed to the task
152-
#
153-
# @yield the asynchronous task to perform
154-
#
155-
# @return [Boolean] `true` if the task is queued, `false` if the thread pool
156-
# is not running
157-
#
158-
# @raise [ArgumentError] if no task is given
159-
def post(*args, &task)
160-
raise ArgumentError.new('no block given') unless block_given?
161-
mutex.synchronize do
162-
break false unless running?
163-
return handle_overflow(*args, &task) if @max_queue != 0 && @queue.length >= @max_queue
164-
@scheduled_task_count += 1
165-
@queue << [args, task]
166-
if Time.now.to_f - @gc_interval >= @last_gc_time
167-
prune_pool
168-
@last_gc_time = Time.now.to_f
169-
end
170-
grow_pool
171-
true
172-
end
173-
end
174-
175149
# Begin an orderly shutdown. Tasks already in the queue will be executed,
176150
# but no new tasks will be accepted. Has no additional effect if the
177151
# thread pool is not running.
@@ -205,7 +179,7 @@ def kill
205179
# Run on task completion.
206180
#
207181
# @!visibility private
208-
def on_end_task # :nodoc:
182+
def on_end_task
209183
mutex.synchronize do
210184
@completed_task_count += 1 #if success
211185
break unless running?
@@ -215,7 +189,7 @@ def on_end_task # :nodoc:
215189
# Run when a thread worker exits.
216190
#
217191
# @!visibility private
218-
def on_worker_exit(worker) # :nodoc:
192+
def on_worker_exit(worker)
219193
mutex.synchronize do
220194
@pool.delete(worker)
221195
if @pool.empty? && ! running?
@@ -227,13 +201,25 @@ def on_worker_exit(worker) # :nodoc:
227201

228202
protected
229203

204+
# @!visibility private
205+
def execute(*args, &task)
206+
return handle_overflow(*args, &task) if @max_queue != 0 && @queue.length >= @max_queue
207+
@scheduled_task_count += 1
208+
@queue << [args, task]
209+
if Time.now.to_f - @gc_interval >= @last_gc_time
210+
prune_pool
211+
@last_gc_time = Time.now.to_f
212+
end
213+
grow_pool
214+
end
215+
230216
# Handler which executes the `overflow_policy` once the queue size
231217
# reaches `max_queue`.
232218
#
233219
# @param [Array] args the arguments to the task which is being handled.
234220
#
235221
# @!visibility private
236-
def handle_overflow(*args) # :nodoc:
222+
def handle_overflow(*args)
237223
case @overflow_policy
238224
when :abort
239225
raise RejectedExecutionError
@@ -253,7 +239,7 @@ def handle_overflow(*args) # :nodoc:
253239
# too long.
254240
#
255241
# @!visibility private
256-
def prune_pool # :nodoc:
242+
def prune_pool
257243
@pool.delete_if do |worker|
258244
worker.dead? ||
259245
(@idletime == 0 ? false : Time.now.to_f - @idletime > worker.last_activity)
@@ -263,7 +249,7 @@ def prune_pool # :nodoc:
263249
# Increase the size of the pool when necessary.
264250
#
265251
# @!visibility private
266-
def grow_pool # :nodoc:
252+
def grow_pool
267253
if @min_length > @pool.length
268254
additional = @min_length - @pool.length
269255
elsif @pool.length < @max_length && ! @queue.empty?
@@ -282,7 +268,7 @@ def grow_pool # :nodoc:
282268
# Reclaim all threads in the pool.
283269
#
284270
# @!visibility private
285-
def drain_pool # :nodoc:
271+
def drain_pool
286272
@pool.each {|worker| worker.kill }
287273
@pool.clear
288274
end
@@ -292,7 +278,7 @@ def drain_pool # :nodoc:
292278
# @return [Thread] the new thread.
293279
#
294280
# @!visibility private
295-
def create_worker_thread # :nodoc:
281+
def create_worker_thread
296282
wrkr = RubyThreadPoolWorker.new(@queue, self)
297283
Thread.new(wrkr, self) do |worker, parent|
298284
Thread.current.abort_on_exception = false

0 commit comments

Comments
 (0)