Skip to content

Commit 7e83fbf

Browse files
committed
make the pruning of thread pools a public API
This makes the pruning of thread pools a public API, which may or may not be a no-op, depending on the underlying thread pool implementation. It also reworks the Ruby implementation to be more aggressive in the pruning - it will kill all to-be-pruned threads in one call instead of doing them one by one. Moreover, workers do not get woken up to check for their idleness anymore, the thread of the caller does it instead.
1 parent 322c6bb commit 7e83fbf

File tree

5 files changed

+151
-39
lines changed

5 files changed

+151
-39
lines changed

lib/concurrent/executor/fixed_thread_pool.rb

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -64,9 +64,16 @@ module Concurrent
6464
# @return [Integer] Number of tasks that may be enqueued before reaching `max_queue` and rejecting
6565
# new tasks. A value of -1 indicates that the queue may grow without bound.
6666

67-
68-
69-
67+
# @!macro thread_pool_executor_method_prune_pool
68+
# Prune the thread pool of unneeded threads
69+
#
70+
# What is being pruned is controlled by the min_threads and idletime
71+
# parameters passed at pool creation time
72+
#
73+
# This is a no-op on some pool implementation (e.g. the Java one). The Ruby
74+
# pool will auto-prune each time a new job is posted. You will need to call
75+
# this method explicitely in case your application post jobs in bursts (a
76+
# lot of jobs and then nothing for long periods)
7077

7178
# @!macro thread_pool_executor_public_api
7279
#
@@ -104,6 +111,9 @@ module Concurrent
104111
#
105112
# @!method can_overflow?
106113
# @!macro executor_service_method_can_overflow_question
114+
#
115+
# @!method prune_pool
116+
# @!macro thread_pool_executor_method_prune_pool
107117

108118

109119

lib/concurrent/executor/java_thread_pool_executor.rb

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,10 @@ def running?
8787
super && !@executor.isTerminating
8888
end
8989

90+
# @!macro thread_pool_executor_method_prune_pool
91+
def prune_pool
92+
end
93+
9094
private
9195

9296
def ns_initialize(opts)

lib/concurrent/executor/ruby_thread_pool_executor.rb

Lines changed: 24 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -87,13 +87,8 @@ def remove_busy_worker(worker)
8787
end
8888

8989
# @!visibility private
90-
def ready_worker(worker)
91-
synchronize { ns_ready_worker worker }
92-
end
93-
94-
# @!visibility private
95-
def worker_not_old_enough(worker)
96-
synchronize { ns_worker_not_old_enough worker }
90+
def ready_worker(worker, last_message)
91+
synchronize { ns_ready_worker worker, last_message }
9792
end
9893

9994
# @!visibility private
@@ -106,6 +101,11 @@ def worker_task_completed
106101
synchronize { @completed_task_count += 1 }
107102
end
108103

104+
# @!macro thread_pool_executor_method_prune_pool
105+
def prune_pool
106+
synchronize { ns_prune_pool }
107+
end
108+
109109
private
110110

111111
# @!visibility private
@@ -185,7 +185,7 @@ def ns_kill_execution
185185
# @!visibility private
186186
def ns_assign_worker(*args, &task)
187187
# keep growing if the pool is not at the minimum yet
188-
worker = (@ready.pop if @pool.size >= @min_length) || ns_add_busy_worker
188+
worker, _ = (@ready.pop if @pool.size >= @min_length) || ns_add_busy_worker
189189
if worker
190190
worker << [task, args]
191191
true
@@ -214,7 +214,7 @@ def ns_enqueue(*args, &task)
214214
def ns_worker_died(worker)
215215
ns_remove_busy_worker worker
216216
replacement_worker = ns_add_busy_worker
217-
ns_ready_worker replacement_worker, false if replacement_worker
217+
ns_ready_worker replacement_worker, Concurrent.monotonic_time, false if replacement_worker
218218
end
219219

220220
# creates new worker which has to receive work to do after it's added
@@ -232,29 +232,21 @@ def ns_add_busy_worker
232232
# handle ready worker, giving it new job or assigning back to @ready
233233
#
234234
# @!visibility private
235-
def ns_ready_worker(worker, success = true)
235+
def ns_ready_worker(worker, last_message, success = true)
236236
task_and_args = @queue.shift
237237
if task_and_args
238238
worker << task_and_args
239239
else
240240
# stop workers when !running?, do not return them to @ready
241241
if running?
242-
@ready.push(worker)
242+
raise unless last_message
243+
@ready.push([worker, last_message])
243244
else
244245
worker.stop
245246
end
246247
end
247248
end
248249

249-
# returns back worker to @ready which was not idle for enough time
250-
#
251-
# @!visibility private
252-
def ns_worker_not_old_enough(worker)
253-
# let's put workers coming from idle_test back to the start (as the oldest worker)
254-
@ready.unshift(worker)
255-
true
256-
end
257-
258250
# removes a worker which is not in not tracked in @ready
259251
#
260252
# @!visibility private
@@ -268,10 +260,17 @@ def ns_remove_busy_worker(worker)
268260
#
269261
# @!visibility private
270262
def ns_prune_pool
271-
return if @pool.size <= @min_length
272-
273-
last_used = @ready.shift
274-
last_used << :idle_test if last_used
263+
now = Concurrent.monotonic_time
264+
stopped_workers = 0
265+
while !@ready.empty? && (@pool.size - stopped_workers > @min_length)
266+
worker, last_message = @ready.first
267+
if now - last_message > self.idletime
268+
stopped_workers += 1
269+
@ready.shift
270+
worker << :stop
271+
else break
272+
end
273+
end
275274

276275
@next_gc_time = Concurrent.monotonic_time + @gc_interval
277276
end
@@ -315,29 +314,18 @@ def kill
315314

316315
def create_worker(queue, pool, idletime)
317316
Thread.new(queue, pool, idletime) do |my_queue, my_pool, my_idletime|
318-
last_message = Concurrent.monotonic_time
319317
catch(:stop) do
320318
loop do
321319

322320
case message = my_queue.pop
323-
when :idle_test
324-
if (Concurrent.monotonic_time - last_message) > my_idletime
325-
my_pool.remove_busy_worker(self)
326-
throw :stop
327-
else
328-
my_pool.worker_not_old_enough(self)
329-
end
330-
331321
when :stop
332322
my_pool.remove_busy_worker(self)
333323
throw :stop
334324

335325
else
336326
task, args = message
337327
run_task my_pool, task, args
338-
last_message = Concurrent.monotonic_time
339-
340-
my_pool.ready_worker(self)
328+
my_pool.ready_worker(self, Concurrent.monotonic_time)
341329
end
342330
end
343331
end

spec/concurrent/executor/java_thread_pool_executor_spec.rb

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,13 @@ module Concurrent
2525

2626
it_should_behave_like :thread_pool_executor
2727

28+
context :prune do
29+
it "is a no-op, pruning is handled by the JVM" do
30+
executor = JavaThreadPoolExecutor.new
31+
executor.prune_pool
32+
end
33+
end
34+
2835
context '#overload_policy' do
2936

3037
specify ':abort maps to AbortPolicy' do

spec/concurrent/executor/ruby_thread_pool_executor_spec.rb

Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,109 @@ module Concurrent
2323

2424
it_should_behave_like :thread_pool_executor
2525

26+
27+
context :prune do
28+
subject do
29+
RubyThreadPoolExecutor.new(idletime: 5, min_threads: 2, max_threads: 10)
30+
end
31+
32+
Group = Struct.new :waiting_threads, :threads, :mutex, :cond
33+
34+
def prepare_thread_group(size)
35+
cond = ConditionVariable.new
36+
mutex = Mutex.new
37+
threads = []
38+
size.times do
39+
subject.post do
40+
mutex.synchronize do
41+
threads << Thread.current
42+
cond.wait(mutex)
43+
threads.delete(Thread.current)
44+
end
45+
end
46+
end
47+
eventually(mutex: mutex) { expect(threads).to have_attributes(size: size) }
48+
Group.new(threads, threads.dup, mutex, cond)
49+
end
50+
51+
def wakeup_thread_group(group)
52+
group.cond.broadcast
53+
eventually(mutex: group.mutex) do
54+
expect(group.waiting_threads).to have_attributes(size: 0)
55+
end
56+
end
57+
58+
before(:each) do
59+
@now = Concurrent.monotonic_time
60+
allow(Concurrent).to receive(:monotonic_time) { @now }
61+
62+
@group1 = prepare_thread_group(5)
63+
@group2 = prepare_thread_group(5)
64+
end
65+
66+
def eventually(mutex: nil, timeout: 5, &block)
67+
start = Time.now
68+
while Time.now - start < timeout
69+
begin
70+
if mutex
71+
mutex.synchronize do
72+
return yield
73+
end
74+
else
75+
return yield
76+
end
77+
rescue Exception => last_failure
78+
end
79+
Thread.pass
80+
end
81+
raise last_failure
82+
end
83+
84+
it "triggers pruning when posting work if the last prune happened more than gc_interval ago" do
85+
wakeup_thread_group(@group1)
86+
@now += 6
87+
wakeup_thread_group(@group2)
88+
subject.post { }
89+
90+
eventually { expect(@group1.threads).to all(have_attributes(status: false)) }
91+
eventually { expect(@group2.threads).to all(have_attributes(status: 'sleep')) }
92+
end
93+
94+
it "does not trigger pruning when posting work if the last prune happened less than gc_interval ago" do
95+
wakeup_thread_group(@group1)
96+
@now += 3
97+
subject.prune_pool
98+
@now += 3
99+
wakeup_thread_group(@group2)
100+
subject.post { }
101+
102+
eventually { expect(@group1.threads).to all(have_attributes(status: false)) }
103+
eventually { expect(@group2.threads).to all(have_attributes(status: 'sleep')) }
104+
end
105+
106+
it "reclaims threads that have been idle for more than idletime seconds" do
107+
wakeup_thread_group(@group1)
108+
@now += 6
109+
wakeup_thread_group(@group2)
110+
subject.prune_pool
111+
112+
eventually { expect(@group1.threads).to all(have_attributes(status: false)) }
113+
eventually { expect(@group2.threads).to all(have_attributes(status: 'sleep')) }
114+
end
115+
116+
it "keeps at least min_length workers" do
117+
wakeup_thread_group(@group1)
118+
wakeup_thread_group(@group2)
119+
@now += 12
120+
subject.prune_pool
121+
all_threads = @group1.threads + @group2.threads
122+
eventually do
123+
finished_threads = all_threads.find_all { |t| !t.status }
124+
expect(finished_threads).to have_attributes(size: 8)
125+
end
126+
end
127+
end
128+
26129
context '#remaining_capacity' do
27130

28131
let!(:expected_max){ 100 }

0 commit comments

Comments
 (0)