Skip to content

Commit 66f0755

Browse files
committed
Implemented RubyThreadPoolExecutor.
1 parent 76d6914 commit 66f0755

File tree

6 files changed

+209
-38
lines changed

6 files changed

+209
-38
lines changed

lib/concurrent/java_thread_pool_executor.rb

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -108,13 +108,6 @@ def shutdown?
108108
@executor.isShutdown
109109
end
110110

111-
# Were all tasks completed before shutdown?
112-
#
113-
# @return [Boolean] +true+ if shutdown and all tasks completed else +false+
114-
def terminated?
115-
@executor.isTerminated
116-
end
117-
118111
# Block until thread pool shutdown is complete or until +timeout+ seconds have
119112
# passed.
120113
#
@@ -175,6 +168,15 @@ def kill
175168
@executor.shutdownNow
176169
return nil
177170
end
171+
172+
protected
173+
174+
# Were all tasks completed before shutdown?
175+
#
176+
# @return [Boolean] +true+ if shutdown and all tasks completed else +false+
177+
def terminated?
178+
@executor.isTerminated
179+
end
178180
end
179181
end
180182
end

lib/concurrent/ruby_cached_thread_pool.rb

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -63,14 +63,14 @@ def initialize(opts = {})
6363
#
6464
# @return [Boolean] +true+ when running, +false+ when shutting down or shutdown
6565
def running?
66-
return @state == :running
66+
@mutex.synchronize { @state == :running }
6767
end
6868

6969
# Is the thread pool shutdown?
7070
#
7171
# @return [Boolean] +true+ when shutdown, +false+ when shutting down or running
7272
def shutdown?
73-
return @state != :running
73+
@mutex.synchronize { @state != :running }
7474
end
7575

7676
# Block until thread pool shutdown is complete or until +timeout+ seconds have

lib/concurrent/ruby_fixed_thread_pool.rb

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -47,14 +47,14 @@ def initialize(num_threads)
4747
#
4848
# @return [Boolean] +true+ when running, +false+ when shutting down or shutdown
4949
def running?
50-
return @state == :running
50+
@mutex.synchronize { @state == :running }
5151
end
5252

5353
# Is the thread pool shutdown?
5454
#
5555
# @return [Boolean] +true+ when shutdown, +false+ when shutting down or running
5656
def shutdown?
57-
return @state != :running
57+
@mutex.synchronize { @state != :running }
5858
end
5959

6060
# Block until thread pool shutdown is complete or until +timeout+ seconds have

lib/concurrent/ruby_thread_pool_executor.rb

Lines changed: 134 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,8 @@
1+
require 'thread'
2+
3+
require 'concurrent/event'
4+
require 'concurrent/ruby_thread_pool_executor/worker'
5+
16
module Concurrent
27

38
# @!macro thread_pool_executor
@@ -21,45 +26,51 @@ class RubyThreadPoolExecutor
2126

2227
# The maximum number of threads that may be created in the pool.
2328
attr_reader :max_length
29+
attr_reader :min_length
30+
31+
attr_reader :largest_length
32+
33+
attr_reader :scheduled_task_count
34+
attr_reader :completed_task_count
35+
36+
attr_reader :idletime
2437

2538
attr_reader :max_queue
2639

2740
# Create a new thread pool.
2841
#
2942
# @see http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/ThreadPoolExecutor.html
3043
def initialize(opts = {})
31-
min_length = opts.fetch(:min_threads, DEFAULT_MIN_POOL_SIZE).to_i
44+
@min_length = opts.fetch(:min_threads, DEFAULT_MIN_POOL_SIZE).to_i
3245
@max_length = opts.fetch(:max_threads, DEFAULT_MAX_POOL_SIZE).to_i
33-
idletime = opts.fetch(:idletime, DEFAULT_THREAD_IDLETIMEOUT).to_i
46+
@idletime = opts.fetch(:idletime, DEFAULT_THREAD_IDLETIMEOUT).to_i
3447
@max_queue = opts.fetch(:max_queue, DEFAULT_MAX_QUEUE_SIZE).to_i
35-
overflow_policy = opts.fetch(:overflow_policy, :abort)
48+
@overflow_policy = opts.fetch(:overflow_policy, :abort)
3649

3750
raise ArgumentError.new('max_threads must be greater than zero') if @max_length <= 0
38-
raise ArgumentError.new("#{overflow_policy} is not a valid overflow policy") unless OVERFLOW_POLICIES.include?(overflow_policy)
39-
end
51+
raise ArgumentError.new("#{overflow_policy} is not a valid overflow policy") unless OVERFLOW_POLICIES.include?(@overflow_policy)
4052

41-
def min_length
42-
end
53+
@state = :running
54+
@pool = []
55+
@terminator = Event.new
56+
@queue = Queue.new
57+
@mutex = Mutex.new
58+
@scheduled_task_count = 0
59+
@completed_task_count = 0
60+
@largest_length = 0
4361

44-
def max_length
62+
#@busy = 0
63+
@prune_interval = 1
64+
@last_prune_time = Time.now.to_i - (@prune_interval * 2)
4565
end
4666

4767
def length
68+
@mutex.synchronize do
69+
@state != :shutdown ? @pool.length : 0
70+
end
4871
end
4972
alias_method :current_length, :length
5073

51-
def largest_length
52-
end
53-
54-
def scheduled_task_count
55-
end
56-
57-
def completed_task_count
58-
end
59-
60-
def idletime
61-
end
62-
6374
def queue_length
6475
end
6576

@@ -70,18 +81,14 @@ def remaining_capacity
7081
#
7182
# @return [Boolean] +true+ when running, +false+ when shutting down or shutdown
7283
def running?
84+
@mutex.synchronize { @state == :running }
7385
end
7486

7587
# Is the thread pool shutdown?
7688
#
7789
# @return [Boolean] +true+ when shutdown, +false+ when shutting down or running
7890
def shutdown?
79-
end
80-
81-
# Were all tasks completed before shutdown?
82-
#
83-
# @return [Boolean] +true+ if shutdown and all tasks completed else +false+
84-
def terminated?
91+
@mutex.synchronize { @state != :running }
8592
end
8693

8794
# Block until thread pool shutdown is complete or until +timeout+ seconds have
@@ -94,6 +101,7 @@ def terminated?
94101
#
95102
# @return [Boolean] +true+ if shutdown complete or false on +timeout+
96103
def wait_for_termination(timeout)
104+
return @terminator.wait(timeout.to_i)
97105
end
98106

99107
# Submit a task to the thread pool for asynchronous processing.
@@ -106,7 +114,19 @@ def wait_for_termination(timeout)
106114
# is not running
107115
#
108116
# @raise [ArgumentError] if no task is given
109-
def post(*args)
117+
def post(*args, &task)
118+
raise ArgumentError.new('no block given') unless block_given?
119+
@mutex.synchronize do
120+
break false unless @state == :running
121+
@scheduled_task_count += 1
122+
@queue << [args, task]
123+
#if Time.now.to_i - @prune_interval > @last_prune_time
124+
prune_pool
125+
grow_pool
126+
#@last_prune_time = Time.now.to_i
127+
#end
128+
true
129+
end
110130
end
111131

112132
# Submit a task to the thread pool for asynchronous processing.
@@ -115,19 +135,106 @@ def post(*args)
115135
#
116136
# @return [self] returns itself
117137
def <<(task)
138+
self.post(&task)
139+
return self
118140
end
119141

120142
# Begin an orderly shutdown. Tasks already in the queue will be executed,
121143
# but no new tasks will be accepted. Has no additional effect if the
122144
# thread pool is not running.
123145
def shutdown
146+
@mutex.synchronize do
147+
break unless @state == :running
148+
if @pool.empty?
149+
@state = :shutdown
150+
@terminator.set
151+
else
152+
@state = :shuttingdown
153+
@pool.length.times{ @queue << :stop }
154+
end
155+
end
124156
end
125157

126158
# Begin an immediate shutdown. In-progress tasks will be allowed to
127159
# complete but enqueued tasks will be dismissed and no new tasks
128160
# will be accepted. Has no additional effect if the thread pool is
129161
# not running.
130162
def kill
163+
@mutex.synchronize do
164+
break if @state == :shutdown
165+
@state = :shutdown
166+
@queue.clear
167+
drain_pool
168+
@terminator.set
169+
end
170+
end
171+
172+
## @!visibility private
173+
#def on_start_task # :nodoc:
174+
#@mutex.synchronize do
175+
#@busy += 1
176+
#end
177+
#end
178+
179+
# @!visibility private
180+
def on_end_task # :nodoc:
181+
@mutex.synchronize do
182+
#@busy -= 1
183+
@completed_task_count += 1 #if success
184+
break unless @state == :running
185+
end
186+
end
187+
188+
# @!visibility private
189+
def on_worker_exit(worker) # :nodoc:
190+
@mutex.synchronize do
191+
@pool.delete(worker)
192+
if @pool.empty? && @state != :running
193+
@state = :shutdown
194+
@terminator.set
195+
end
196+
end
197+
end
198+
199+
protected
200+
201+
# @!visibility private
202+
def prune_pool # :nodoc:
203+
@pool.delete_if do |worker|
204+
worker.dead? ||
205+
(@idletime == 0 ? false : Time.now.to_i - @idletime > worker.last_activity)
206+
end
207+
end
208+
209+
# @!visibility private
210+
def grow_pool # :nodoc:
211+
if @min_length > @pool.length
212+
additional = @min_length - @pool.length
213+
elsif @pool.length < @max_length && ! @queue.empty?
214+
# NOTE: does not take into account idle threads
215+
additional = 1
216+
else
217+
additional = 0
218+
end
219+
additional.times{ @pool << create_worker_thread }
220+
@largest_length = [@largest_length, @pool.length].max
221+
end
222+
223+
# @!visibility private
224+
def drain_pool # :nodoc:
225+
@pool.each {|worker| worker.kill }
226+
@pool.clear
227+
end
228+
229+
# @!visibility private
230+
def create_worker_thread # :nodoc:
231+
wrkr = Worker.new(@queue, self)
232+
Thread.new(wrkr, self) do |worker, parent|
233+
Thread.current.abort_on_exception = false
234+
worker.run
235+
parent.on_worker_exit(worker)
236+
end
237+
return wrkr
131238
end
132239
end
133240
end
Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
require 'thread'
2+
3+
module Concurrent
4+
5+
class RubyThreadPoolExecutor
6+
7+
class Worker
8+
9+
def initialize(queue, parent)
10+
@queue = queue
11+
@parent = parent
12+
@mutex = Mutex.new
13+
@last_activity = Time.now.to_i
14+
end
15+
16+
def dead?
17+
return @mutex.synchronize do
18+
@thread.nil? ? false : ! @thread.alive?
19+
end
20+
end
21+
22+
def last_activity
23+
@mutex.synchronize { @last_activity }
24+
end
25+
26+
def kill
27+
@mutex.synchronize do
28+
Thread.kill(@thread) unless @thread.nil?
29+
@thread = nil
30+
end
31+
end
32+
33+
def run(thread = Thread.current)
34+
@mutex.synchronize do
35+
raise StandardError.new('already running') unless @thread.nil?
36+
@thread = thread
37+
end
38+
39+
loop do
40+
task = @queue.pop
41+
if task == :stop
42+
@thread = nil
43+
@parent.on_worker_exit(self)
44+
break
45+
end
46+
47+
begin
48+
#@parent.on_start_task
49+
task.last.call(*task.first)
50+
rescue => ex
51+
# let it fail
52+
ensure
53+
@last_activity = Time.now.to_i
54+
@parent.on_end_task
55+
end
56+
end
57+
end
58+
end
59+
end
60+
end

spec/concurrent/cached_thread_pool_shared.rb

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,7 @@
114114
subject.length.should eq 3
115115
sleep(2)
116116
subject << proc{ nil }
117+
#sleep(0.1)
117118
subject.length.should < 3
118119
end
119120

@@ -123,6 +124,7 @@
123124
subject.length.should eq 3
124125
sleep(2)
125126
subject << proc{ nil }
127+
#sleep(0.1)
126128
subject.length.should < 3
127129
end
128130
end

0 commit comments

Comments
 (0)