Skip to content

Commit 94cefa8

Browse files
committed
Refactored thread pools.
1 parent 41a9fc7 commit 94cefa8

12 files changed

+213
-208
lines changed

lib/concurrent/java_cached_thread_pool.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ class JavaCachedThreadPool < JavaThreadPoolExecutor
1616
# number of seconds a thread may be idle before it is reclaimed
1717
#
1818
# @raise [ArgumentError] if +max_threads+ is less than or equal to zero
19-
# @raise [ArgumentError] if +thread_idletime+ is less than or equal to zero
19+
# @raise [ArgumentError] if +idletime+ is less than or equal to zero
2020
#
2121
# @see http://docs.oracle.com/javase/8/docs/api/java/util/concurrent/Executors.html#newCachedThreadPool--
2222
def initialize(opts = {})

lib/concurrent/ruby_cached_thread_pool.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ class RubyCachedThreadPool < RubyThreadPoolExecutor
1414
# number of seconds a thread may be idle before it is reclaimed
1515
#
1616
# @raise [ArgumentError] if +max_threads+ is less than or equal to zero
17-
# @raise [ArgumentError] if +thread_idletime+ is less than or equal to zero
17+
# @raise [ArgumentError] if +idletime+ is less than or equal to zero
1818
def initialize(opts = {})
1919
max_length = opts.fetch(:max_threads, DEFAULT_MAX_POOL_SIZE).to_i
2020
idletime = opts.fetch(:idletime, DEFAULT_THREAD_IDLETIMEOUT).to_i

lib/concurrent/ruby_thread_pool_executor.rb

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
require 'thread'
22

33
require 'concurrent/event'
4-
require 'concurrent/ruby_thread_pool_executor/worker'
4+
require 'concurrent/ruby_thread_pool_worker'
55

66
module Concurrent
77

@@ -60,8 +60,8 @@ def initialize(opts = {})
6060
@largest_length = 0
6161

6262
#@busy = 0
63-
@prune_interval = 1
64-
@last_prune_time = Time.now.to_i - (@prune_interval * 2)
63+
@gc_interval = opts.fetch(:gc_interval, 1).to_i # undocumented
64+
@last_gc_time = Time.now.to_f - [1.0, (@gc_interval * 2.0)].max
6565
end
6666

6767
def length
@@ -120,11 +120,11 @@ def post(*args, &task)
120120
break false unless @state == :running
121121
@scheduled_task_count += 1
122122
@queue << [args, task]
123-
#if Time.now.to_i - @prune_interval > @last_prune_time
123+
if Time.now.to_f - @gc_interval >= @last_gc_time
124124
prune_pool
125125
grow_pool
126-
#@last_prune_time = Time.now.to_i
127-
#end
126+
@last_gc_time = Time.now.to_f
127+
end
128128
true
129129
end
130130
end
@@ -202,7 +202,7 @@ def on_worker_exit(worker) # :nodoc:
202202
def prune_pool # :nodoc:
203203
@pool.delete_if do |worker|
204204
worker.dead? ||
205-
(@idletime == 0 ? false : Time.now.to_i - @idletime > worker.last_activity)
205+
(@idletime == 0 ? false : Time.now.to_f - @idletime > worker.last_activity)
206206
end
207207
end
208208

@@ -228,7 +228,7 @@ def drain_pool # :nodoc:
228228

229229
# @!visibility private
230230
def create_worker_thread # :nodoc:
231-
wrkr = Worker.new(@queue, self)
231+
wrkr = RubyThreadPoolWorker.new(@queue, self)
232232
Thread.new(wrkr, self) do |worker, parent|
233233
Thread.current.abort_on_exception = false
234234
worker.run

lib/concurrent/ruby_thread_pool_executor/worker.rb

Lines changed: 0 additions & 60 deletions
This file was deleted.
Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
require 'thread'
2+
3+
module Concurrent
4+
5+
# @!visibility private
6+
class RubyThreadPoolWorker # :nodoc:
7+
8+
# @!visibility private
9+
def initialize(queue, parent) # :nodoc:
10+
@queue = queue
11+
@parent = parent
12+
@mutex = Mutex.new
13+
@last_activity = Time.now.to_f
14+
end
15+
16+
# @!visibility private
17+
def dead? # :nodoc:
18+
return @mutex.synchronize do
19+
@thread.nil? ? false : ! @thread.alive?
20+
end
21+
end
22+
23+
# @!visibility private
24+
def last_activity # :nodoc:
25+
@mutex.synchronize { @last_activity }
26+
end
27+
28+
# @!visibility private
29+
def kill # :nodoc:
30+
@mutex.synchronize do
31+
Thread.kill(@thread) unless @thread.nil?
32+
@thread = nil
33+
end
34+
end
35+
36+
# @!visibility private
37+
def run(thread = Thread.current) # :nodoc:
38+
@mutex.synchronize do
39+
raise StandardError.new('already running') unless @thread.nil?
40+
@thread = thread
41+
end
42+
43+
loop do
44+
task = @queue.pop
45+
if task == :stop
46+
@thread = nil
47+
@parent.on_worker_exit(self)
48+
break
49+
end
50+
51+
begin
52+
#@parent.on_start_task
53+
task.last.call(*task.first)
54+
rescue => ex
55+
# let it fail
56+
ensure
57+
@last_activity = Time.now.to_f
58+
@parent.on_end_task
59+
end
60+
end
61+
end
62+
end
63+
end

spec/concurrent/cached_thread_pool_shared.rb

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -106,15 +106,15 @@
106106

107107
context 'garbage collection' do
108108

109-
subject{ described_class.new(idletime: 1, max_threads: 5) }
109+
subject{ described_class.new(idletime: 1, max_threads: 5, gc_interval: 0) }
110110

111111
it 'removes from pool any thread that has been idle too long' do
112112
3.times { subject << proc{ sleep(0.1) } }
113113
sleep(0.1)
114114
subject.length.should eq 3
115115
sleep(2)
116116
subject << proc{ nil }
117-
#sleep(0.1)
117+
sleep(0.1)
118118
subject.length.should < 3
119119
end
120120

@@ -124,13 +124,15 @@
124124
subject.length.should eq 3
125125
sleep(2)
126126
subject << proc{ nil }
127-
#sleep(0.1)
127+
sleep(0.1)
128128
subject.length.should < 3
129129
end
130130
end
131131

132132
context 'worker creation and caching' do
133133

134+
subject{ described_class.new(idletime: 1, max_threads: 5, gc_interval: 0) }
135+
134136
it 'creates new workers when there are none available' do
135137
subject.length.should eq 0
136138
5.times{ sleep(0.1); subject << proc{ sleep(1) } }
Lines changed: 17 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,17 @@
1-
require 'spec_helper'
2-
require_relative 'cached_thread_pool_shared'
3-
4-
module Concurrent
5-
6-
describe CachedThreadPool do
7-
8-
after(:each) do
9-
subject.kill
10-
sleep(0.1)
11-
end
12-
13-
subject { described_class.new(5) }
14-
15-
it_should_behave_like :cached_thread_pool
16-
end
17-
end
1+
#require 'spec_helper'
2+
#require_relative 'cached_thread_pool_shared'
3+
#
4+
#module Concurrent
5+
#
6+
# describe CachedThreadPool do
7+
#
8+
# after(:each) do
9+
# subject.kill
10+
# sleep(0.1)
11+
# end
12+
#
13+
# subject { described_class.new(5) }
14+
#
15+
# it_should_behave_like :cached_thread_pool
16+
# end
17+
#end
Lines changed: 17 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,17 @@
1-
require 'spec_helper'
2-
require_relative 'fixed_thread_pool_shared'
3-
4-
module Concurrent
5-
6-
describe FixedThreadPool do
7-
8-
after(:each) do
9-
subject.kill
10-
sleep(0.1)
11-
end
12-
13-
subject { described_class.new(5) }
14-
15-
it_should_behave_like :fixed_thread_pool
16-
end
17-
end
1+
#require 'spec_helper'
2+
#require_relative 'fixed_thread_pool_shared'
3+
#
4+
#module Concurrent
5+
#
6+
# describe FixedThreadPool do
7+
#
8+
# after(:each) do
9+
# subject.kill
10+
# sleep(0.1)
11+
# end
12+
#
13+
# subject { described_class.new(5) }
14+
#
15+
# it_should_behave_like :fixed_thread_pool
16+
# end
17+
#end

0 commit comments

Comments
 (0)