Skip to content

Commit 214ecbf

Browse files
authored
Merge pull request #938 from ruby-concurrency/prune
Expose the prune operation from thread pools
2 parents 1ce854c + 1dc5c1f commit 214ecbf

File tree

5 files changed

+151
-39
lines changed

5 files changed

+151
-39
lines changed

lib/concurrent-ruby/concurrent/executor/fixed_thread_pool.rb

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -71,9 +71,16 @@ module Concurrent
7171
# @return [Integer] Number of tasks that may be enqueued before reaching `max_queue` and rejecting
7272
# new tasks. A value of -1 indicates that the queue may grow without bound.
7373

74-
75-
76-
74+
# @!macro thread_pool_executor_method_prune_pool
75+
# Prune the thread pool of unneeded threads
76+
#
77+
# What is being pruned is controlled by the min_threads and idletime
78+
# parameters passed at pool creation time
79+
#
80+
# This is a no-op on some pool implementation (e.g. the Java one). The Ruby
81+
# pool will auto-prune each time a new job is posted. You will need to call
82+
# this method explicitely in case your application post jobs in bursts (a
83+
# lot of jobs and then nothing for long periods)
7784

7885
# @!macro thread_pool_executor_public_api
7986
#
@@ -111,6 +118,9 @@ module Concurrent
111118
#
112119
# @!method can_overflow?
113120
# @!macro executor_service_method_can_overflow_question
121+
#
122+
# @!method prune_pool
123+
# @!macro thread_pool_executor_method_prune_pool
114124

115125

116126

lib/concurrent-ruby/concurrent/executor/java_thread_pool_executor.rb

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,10 @@ def running?
9393
super && !@executor.isTerminating
9494
end
9595

96+
# @!macro thread_pool_executor_method_prune_pool
97+
def prune_pool
98+
end
99+
96100
private
97101

98102
def ns_initialize(opts)

lib/concurrent-ruby/concurrent/executor/ruby_thread_pool_executor.rb

Lines changed: 24 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -93,13 +93,8 @@ def remove_busy_worker(worker)
9393
end
9494

9595
# @!visibility private
96-
def ready_worker(worker)
97-
synchronize { ns_ready_worker worker }
98-
end
99-
100-
# @!visibility private
101-
def worker_not_old_enough(worker)
102-
synchronize { ns_worker_not_old_enough worker }
96+
def ready_worker(worker, last_message)
97+
synchronize { ns_ready_worker worker, last_message }
10398
end
10499

105100
# @!visibility private
@@ -112,6 +107,11 @@ def worker_task_completed
112107
synchronize { @completed_task_count += 1 }
113108
end
114109

110+
# @!macro thread_pool_executor_method_prune_pool
111+
def prune_pool
112+
synchronize { ns_prune_pool }
113+
end
114+
115115
private
116116

117117
# @!visibility private
@@ -192,7 +192,7 @@ def ns_kill_execution
192192
# @!visibility private
193193
def ns_assign_worker(*args, &task)
194194
# keep growing if the pool is not at the minimum yet
195-
worker = (@ready.pop if @pool.size >= @min_length) || ns_add_busy_worker
195+
worker, _ = (@ready.pop if @pool.size >= @min_length) || ns_add_busy_worker
196196
if worker
197197
worker << [task, args]
198198
true
@@ -223,7 +223,7 @@ def ns_enqueue(*args, &task)
223223
def ns_worker_died(worker)
224224
ns_remove_busy_worker worker
225225
replacement_worker = ns_add_busy_worker
226-
ns_ready_worker replacement_worker, false if replacement_worker
226+
ns_ready_worker replacement_worker, Concurrent.monotonic_time, false if replacement_worker
227227
end
228228

229229
# creates new worker which has to receive work to do after it's added
@@ -242,29 +242,21 @@ def ns_add_busy_worker
242242
# handle ready worker, giving it new job or assigning back to @ready
243243
#
244244
# @!visibility private
245-
def ns_ready_worker(worker, success = true)
245+
def ns_ready_worker(worker, last_message, success = true)
246246
task_and_args = @queue.shift
247247
if task_and_args
248248
worker << task_and_args
249249
else
250250
# stop workers when !running?, do not return them to @ready
251251
if running?
252-
@ready.push(worker)
252+
raise unless last_message
253+
@ready.push([worker, last_message])
253254
else
254255
worker.stop
255256
end
256257
end
257258
end
258259

259-
# returns back worker to @ready which was not idle for enough time
260-
#
261-
# @!visibility private
262-
def ns_worker_not_old_enough(worker)
263-
# let's put workers coming from idle_test back to the start (as the oldest worker)
264-
@ready.unshift(worker)
265-
true
266-
end
267-
268260
# removes a worker which is not in not tracked in @ready
269261
#
270262
# @!visibility private
@@ -278,10 +270,17 @@ def ns_remove_busy_worker(worker)
278270
#
279271
# @!visibility private
280272
def ns_prune_pool
281-
return if @pool.size <= @min_length
282-
283-
last_used = @ready.shift
284-
last_used << :idle_test if last_used
273+
now = Concurrent.monotonic_time
274+
stopped_workers = 0
275+
while !@ready.empty? && (@pool.size - stopped_workers > @min_length)
276+
worker, last_message = @ready.first
277+
if now - last_message > self.idletime
278+
stopped_workers += 1
279+
@ready.shift
280+
worker << :stop
281+
else break
282+
end
283+
end
285284

286285
@next_gc_time = Concurrent.monotonic_time + @gc_interval
287286
end
@@ -330,29 +329,18 @@ def kill
330329

331330
def create_worker(queue, pool, idletime)
332331
Thread.new(queue, pool, idletime) do |my_queue, my_pool, my_idletime|
333-
last_message = Concurrent.monotonic_time
334332
catch(:stop) do
335333
loop do
336334

337335
case message = my_queue.pop
338-
when :idle_test
339-
if (Concurrent.monotonic_time - last_message) > my_idletime
340-
my_pool.remove_busy_worker(self)
341-
throw :stop
342-
else
343-
my_pool.worker_not_old_enough(self)
344-
end
345-
346336
when :stop
347337
my_pool.remove_busy_worker(self)
348338
throw :stop
349339

350340
else
351341
task, args = message
352342
run_task my_pool, task, args
353-
last_message = Concurrent.monotonic_time
354-
355-
my_pool.ready_worker(self)
343+
my_pool.ready_worker(self, Concurrent.monotonic_time)
356344
end
357345
end
358346
end

spec/concurrent/executor/java_thread_pool_executor_spec.rb

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,13 @@ module Concurrent
2525

2626
it_should_behave_like :thread_pool_executor
2727

28+
context :prune do
29+
it "is a no-op, pruning is handled by the JVM" do
30+
executor = JavaThreadPoolExecutor.new
31+
executor.prune_pool
32+
end
33+
end
34+
2835
context '#overload_policy' do
2936

3037
specify ':abort maps to AbortPolicy' do

spec/concurrent/executor/ruby_thread_pool_executor_spec.rb

Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,109 @@ module Concurrent
2323

2424
it_should_behave_like :thread_pool_executor
2525

26+
27+
context :prune do
28+
subject do
29+
RubyThreadPoolExecutor.new(idletime: 5, min_threads: 2, max_threads: 10)
30+
end
31+
32+
Group = Struct.new :waiting_threads, :threads, :mutex, :cond
33+
34+
def prepare_thread_group(size)
35+
cond = ConditionVariable.new
36+
mutex = Mutex.new
37+
threads = []
38+
size.times do
39+
subject.post do
40+
mutex.synchronize do
41+
threads << Thread.current
42+
cond.wait(mutex)
43+
threads.delete(Thread.current)
44+
end
45+
end
46+
end
47+
eventually(mutex: mutex) { expect(threads).to have_attributes(size: size) }
48+
Group.new(threads, threads.dup, mutex, cond)
49+
end
50+
51+
def wakeup_thread_group(group)
52+
group.cond.broadcast
53+
eventually(mutex: group.mutex) do
54+
expect(group.waiting_threads).to have_attributes(size: 0)
55+
end
56+
end
57+
58+
before(:each) do
59+
@now = Concurrent.monotonic_time
60+
allow(Concurrent).to receive(:monotonic_time) { @now }
61+
62+
@group1 = prepare_thread_group(5)
63+
@group2 = prepare_thread_group(5)
64+
end
65+
66+
def eventually(mutex: nil, timeout: 5, &block)
67+
start = Time.now
68+
while Time.now - start < timeout
69+
begin
70+
if mutex
71+
mutex.synchronize do
72+
return yield
73+
end
74+
else
75+
return yield
76+
end
77+
rescue Exception => last_failure
78+
end
79+
Thread.pass
80+
end
81+
raise last_failure
82+
end
83+
84+
it "triggers pruning when posting work if the last prune happened more than gc_interval ago" do
85+
wakeup_thread_group(@group1)
86+
@now += 6
87+
wakeup_thread_group(@group2)
88+
subject.post { }
89+
90+
eventually { expect(@group1.threads).to all(have_attributes(status: false)) }
91+
eventually { expect(@group2.threads).to all(have_attributes(status: 'sleep')) }
92+
end
93+
94+
it "does not trigger pruning when posting work if the last prune happened less than gc_interval ago" do
95+
wakeup_thread_group(@group1)
96+
@now += 3
97+
subject.prune_pool
98+
@now += 3
99+
wakeup_thread_group(@group2)
100+
subject.post { }
101+
102+
eventually { expect(@group1.threads).to all(have_attributes(status: false)) }
103+
eventually { expect(@group2.threads).to all(have_attributes(status: 'sleep')) }
104+
end
105+
106+
it "reclaims threads that have been idle for more than idletime seconds" do
107+
wakeup_thread_group(@group1)
108+
@now += 6
109+
wakeup_thread_group(@group2)
110+
subject.prune_pool
111+
112+
eventually { expect(@group1.threads).to all(have_attributes(status: false)) }
113+
eventually { expect(@group2.threads).to all(have_attributes(status: 'sleep')) }
114+
end
115+
116+
it "keeps at least min_length workers" do
117+
wakeup_thread_group(@group1)
118+
wakeup_thread_group(@group2)
119+
@now += 12
120+
subject.prune_pool
121+
all_threads = @group1.threads + @group2.threads
122+
eventually do
123+
finished_threads = all_threads.find_all { |t| !t.status }
124+
expect(finished_threads).to have_attributes(size: 8)
125+
end
126+
end
127+
end
128+
26129
context '#remaining_capacity' do
27130

28131
let!(:expected_max){ 100 }

0 commit comments

Comments
 (0)