Skip to content

Commit 863c63f

Browse files
committed
Implemented queue length functionality for RubyThreadPoolExecutor.
1 parent d650f03 commit 863c63f

File tree

4 files changed

+25
-13
lines changed

4 files changed

+25
-13
lines changed

lib/concurrent/java_thread_pool_executor.rb

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -158,8 +158,8 @@ def <<(task)
158158
# but no new tasks will be accepted. Has no additional effect if the
159159
# thread pool is not running.
160160
def shutdown
161-
@executor.shutdown
162161
@executor.getQueue.clear
162+
@executor.shutdown
163163
return nil
164164
end
165165

@@ -168,8 +168,8 @@ def shutdown
168168
# will be accepted. Has no additional effect if the thread pool is
169169
# not running.
170170
def kill
171-
@executor.shutdownNow
172171
@executor.getQueue.clear
172+
@executor.shutdownNow
173173
return nil
174174
end
175175

lib/concurrent/ruby_thread_pool_executor.rb

Lines changed: 7 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,8 @@ class RubyThreadPoolExecutor
3737

3838
attr_reader :max_queue
3939

40+
attr_reader :overflow_policy
41+
4042
# Create a new thread pool.
4143
#
4244
# @see http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/ThreadPoolExecutor.html
@@ -59,7 +61,6 @@ def initialize(opts = {})
5961
@completed_task_count = 0
6062
@largest_length = 0
6163

62-
#@busy = 0
6364
@gc_interval = opts.fetch(:gc_interval, 1).to_i # undocumented
6465
@last_gc_time = Time.now.to_f - [1.0, (@gc_interval * 2.0)].max
6566
end
@@ -72,9 +73,11 @@ def length
7273
alias_method :current_length, :length
7374

7475
def queue_length
76+
@queue.length
7577
end
7678

7779
def remaining_capacity
80+
@mutex.synchronize { @max_queue == 0 ? -1 : @max_queue - @queue.length }
7881
end
7982

8083
# Is the thread pool running?
@@ -118,6 +121,7 @@ def post(*args, &task)
118121
raise ArgumentError.new('no block given') unless block_given?
119122
@mutex.synchronize do
120123
break false unless @state == :running
124+
return false if @max_queue != 0 && @queue.length >= @max_queue
121125
@scheduled_task_count += 1
122126
@queue << [args, task]
123127
if Time.now.to_f - @gc_interval >= @last_gc_time
@@ -145,6 +149,7 @@ def <<(task)
145149
def shutdown
146150
@mutex.synchronize do
147151
break unless @state == :running
152+
@queue.clear
148153
if @pool.empty?
149154
@state = :shutdown
150155
@terminator.set
@@ -162,24 +167,16 @@ def shutdown
162167
def kill
163168
@mutex.synchronize do
164169
break if @state == :shutdown
165-
@state = :shutdown
166170
@queue.clear
171+
@state = :shutdown
167172
drain_pool
168173
@terminator.set
169174
end
170175
end
171176

172-
## @!visibility private
173-
#def on_start_task # :nodoc:
174-
#@mutex.synchronize do
175-
#@busy += 1
176-
#end
177-
#end
178-
179177
# @!visibility private
180178
def on_end_task # :nodoc:
181179
@mutex.synchronize do
182-
#@busy -= 1
183180
@completed_task_count += 1 #if success
184181
break unless @state == :running
185182
end

lib/concurrent/ruby_thread_pool_worker.rb

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,6 @@ def run(thread = Thread.current) # :nodoc:
4949
end
5050

5151
begin
52-
#@parent.on_start_task
5352
task.last.call(*task.first)
5453
rescue => ex
5554
# let it fail

spec/concurrent/thread_pool_executor_shared.rb

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,15 @@
6464

6565
context '#queue_length' do
6666

67+
let!(:expected_max){ 10 }
68+
subject do
69+
described_class.new(
70+
min_threads: 2,
71+
max_threads: 5,
72+
max_queue: expected_max
73+
)
74+
end
75+
6776
it 'returns zero on creation' do
6877
subject.queue_length.should eq 0
6978
end
@@ -87,6 +96,12 @@
8796
subject.wait_for_termination(1)
8897
subject.queue_length.should eq 0
8998
end
99+
100+
it 'can never be greater than :max_queue' do
101+
100.times{ subject.post{ sleep(0.5) } }
102+
sleep(0.1)
103+
subject.queue_length.should <= expected_max
104+
end
90105
end
91106

92107
context '#remaining_capacity' do
@@ -116,6 +131,7 @@
116131

117132
it 'returns :max_length when stopped' do
118133
100.times{ subject.post{ sleep(0.5) } }
134+
sleep(0.1)
119135
subject.shutdown
120136
subject.wait_for_termination(1)
121137
subject.remaining_capacity.should eq expected_max

0 commit comments

Comments
 (0)