Skip to content

Commit e132eef

Browse files
committed
Revert "Rollback java executor implementation"
This reverts commit a6fc08f.
1 parent cb8903a commit e132eef

File tree

6 files changed

+158
-165
lines changed

6 files changed

+158
-165
lines changed

lib/concurrent/executor/cached_thread_pool.rb

Lines changed: 6 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -46,24 +46,17 @@ def initialize(opts = {})
4646

4747
private
4848

49-
if defined?(JavaThreadPoolExecutor) && self < JavaThreadPoolExecutor
50-
# @!macro cached_thread_pool_method_initialize
51-
# @!visibility private
52-
def ns_initialize(opts)
53-
super(opts)
49+
# @!macro cached_thread_pool_method_initialize
50+
# @!visibility private
51+
def ns_initialize(opts)
52+
super(opts)
53+
if Concurrent.on_jruby?
5454
@max_queue = 0
55-
@executor = java.util.concurrent.Executors.newCachedThreadPool
55+
@executor = java.util.concurrent.Executors.newCachedThreadPool
5656
@executor.setRejectedExecutionHandler(FALLBACK_POLICY_CLASSES[@fallback_policy].new)
5757
@executor.setKeepAliveTime(opts.fetch(:idletime, DEFAULT_THREAD_IDLETIMEOUT), java.util.concurrent.TimeUnit::SECONDS)
5858
self.auto_terminate = opts.fetch(:auto_terminate, true)
5959
end
60-
else
61-
# @!macro cached_thread_pool_method_initialize
62-
# @!visibility private
63-
def ns_initialize(opts)
64-
super(opts)
65-
end
6660
end
67-
6861
end
6962
end

lib/concurrent/executor/single_thread_executor.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ module Concurrent
88

99
SingleThreadExecutorImplementation = case
1010
when Concurrent.on_jruby?
11-
RubySingleThreadExecutor
11+
JavaSingleThreadExecutor
1212
else
1313
RubySingleThreadExecutor
1414
end

lib/concurrent/executor/thread_pool_executor.rb

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ module Concurrent
99

1010
ThreadPoolExecutorImplementation = case
1111
when Concurrent.on_jruby?
12-
RubyThreadPoolExecutor
12+
JavaThreadPoolExecutor
1313
else
1414
RubyThreadPoolExecutor
1515
end
@@ -58,9 +58,9 @@ class ThreadPoolExecutor < ThreadPoolExecutorImplementation
5858
# @!macro [new] thread_pool_executor_method_initialize
5959
#
6060
# Create a new thread pool.
61-
#
61+
#
6262
# @param [Hash] opts the options which configure the thread pool.
63-
#
63+
#
6464
# @option opts [Integer] :max_threads (DEFAULT_MAX_POOL_SIZE) the maximum
6565
# number of threads to be created
6666
# @option opts [Integer] :min_threads (DEFAULT_MIN_POOL_SIZE) When a new task is submitted
@@ -73,12 +73,12 @@ class ThreadPoolExecutor < ThreadPoolExecutorImplementation
7373
# @option opts [Symbol] :fallback_policy (:abort) the policy for handling new
7474
# tasks that are received when the queue size has reached
7575
# `max_queue` or the executor has shut down
76-
#
76+
#
7777
# @raise [ArgumentError] if `:max_threads` is less than one
7878
# @raise [ArgumentError] if `:min_threads` is less than zero
7979
# @raise [ArgumentError] if `:fallback_policy` is not one of the values specified
8080
# in `FALLBACK_POLICIES`
81-
#
81+
#
8282
# @see http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/ThreadPoolExecutor.html
8383

8484
# @!method initialize(opts = {})

spec/concurrent/executor/cached_thread_pool_spec.rb

Lines changed: 100 additions & 100 deletions
Original file line numberDiff line numberDiff line change
@@ -116,122 +116,122 @@ module Concurrent
116116

117117
context 'runtime-specific implementation' do
118118

119-
# if Concurrent.on_jruby?
120-
#
121-
# context '#initialize' do
122-
#
123-
# it 'sets :fallback_policy correctly' do
124-
# clazz = java.util.concurrent.ThreadPoolExecutor::DiscardPolicy
125-
# policy = clazz.new
126-
# expect(clazz).to receive(:new).at_least(:once).with(any_args).and_return(policy)
127-
#
128-
# subject = CachedThreadPool.new(fallback_policy: :discard)
129-
# expect(subject.fallback_policy).to eq :discard
130-
# end
131-
#
132-
# it 'defaults :fallback_policy to :abort' do
133-
# subject = CachedThreadPool.new
134-
# expect(subject.fallback_policy).to eq :abort
135-
# end
136-
#
137-
# it 'raises an exception if given an invalid :fallback_policy' do
138-
# expect {
139-
# CachedThreadPool.new(fallback_policy: :bogus)
140-
# }.to raise_error(ArgumentError)
141-
# end
142-
# end
143-
#
144-
# else
145-
146-
context 'garbage collection' do
147-
148-
subject { described_class.new(idletime: 0.1, max_threads: 2, gc_interval: 0) }
149-
150-
it 'removes from pool any thread that has been idle too long' do
151-
latch = Concurrent::CountDownLatch.new(4)
152-
4.times { subject.post { sleep 0.1; latch.count_down } }
153-
expect(latch.wait(1)).to be true
154-
sleep 0.2
155-
subject.post {}
156-
sleep 0.2
157-
expect(subject.length).to be < 4
158-
end
119+
if Concurrent.on_jruby?
120+
121+
context '#initialize' do
159122

160-
it 'deals with dead threads' do
161-
expect(subject).to receive(:ns_worker_died).exactly(5).times.and_call_original
123+
it 'sets :fallback_policy correctly' do
124+
clazz = java.util.concurrent.ThreadPoolExecutor::DiscardPolicy
125+
policy = clazz.new
126+
expect(clazz).to receive(:new).at_least(:once).with(any_args).and_return(policy)
162127

163-
dead_threads_queue = Queue.new
164-
5.times { subject.post { sleep 0.1; dead_threads_queue.push Thread.current; raise Exception } }
165-
sleep(0.2)
166-
latch = Concurrent::CountDownLatch.new(5)
167-
5.times { subject.post { sleep 0.1; latch.count_down } }
168-
expect(latch.wait(1)).to be true
128+
subject = CachedThreadPool.new(fallback_policy: :discard)
129+
expect(subject.fallback_policy).to eq :discard
130+
end
131+
132+
it 'defaults :fallback_policy to :abort' do
133+
subject = CachedThreadPool.new
134+
expect(subject.fallback_policy).to eq :abort
135+
end
169136

170-
dead_threads = []
171-
dead_threads << dead_threads_queue.pop until dead_threads_queue.empty?
172-
expect(dead_threads.all? { |t| !t.alive? }).to be true
137+
it 'raises an exception if given an invalid :fallback_policy' do
138+
expect {
139+
CachedThreadPool.new(fallback_policy: :bogus)
140+
}.to raise_error(ArgumentError)
141+
end
173142
end
174-
end
175143

176-
context 'worker creation and caching' do
144+
else
145+
146+
context 'garbage collection' do
147+
148+
subject { described_class.new(idletime: 0.1, max_threads: 2, gc_interval: 0) }
149+
150+
it 'removes from pool any thread that has been idle too long' do
151+
latch = Concurrent::CountDownLatch.new(4)
152+
4.times { subject.post { sleep 0.1; latch.count_down } }
153+
expect(latch.wait(1)).to be true
154+
sleep 0.2
155+
subject.post {}
156+
sleep 0.2
157+
expect(subject.length).to be < 4
158+
end
159+
160+
it 'deals with dead threads' do
161+
expect(subject).to receive(:ns_worker_died).exactly(5).times.and_call_original
177162

178-
subject { described_class.new(idletime: 1, max_threads: 5) }
163+
dead_threads_queue = Queue.new
164+
5.times { subject.post { sleep 0.1; dead_threads_queue.push Thread.current; raise Exception } }
165+
sleep(0.2)
166+
latch = Concurrent::CountDownLatch.new(5)
167+
5.times { subject.post { sleep 0.1; latch.count_down } }
168+
expect(latch.wait(1)).to be true
179169

180-
it 'creates new workers when there are none available' do
181-
expect(subject.length).to eq 0
182-
5.times { sleep(0.1); subject << proc { sleep(1) } }
183-
sleep(1)
184-
expect(subject.length).to eq 5
170+
dead_threads = []
171+
dead_threads << dead_threads_queue.pop until dead_threads_queue.empty?
172+
expect(dead_threads.all? { |t| !t.alive? }).to be true
173+
end
185174
end
186175

187-
it 'uses existing idle threads' do
188-
5.times { subject << proc { sleep(0.1) } }
189-
sleep(1)
190-
expect(subject.length).to be >= 5
191-
3.times { subject << proc { sleep(1) } }
192-
sleep(0.1)
193-
expect(subject.length).to be >= 5
176+
context 'worker creation and caching' do
177+
178+
subject { described_class.new(idletime: 1, max_threads: 5) }
179+
180+
it 'creates new workers when there are none available' do
181+
expect(subject.length).to eq 0
182+
5.times { sleep(0.1); subject << proc { sleep(1) } }
183+
sleep(1)
184+
expect(subject.length).to eq 5
185+
end
186+
187+
it 'uses existing idle threads' do
188+
5.times { subject << proc { sleep(0.1) } }
189+
sleep(1)
190+
expect(subject.length).to be >= 5
191+
3.times { subject << proc { sleep(1) } }
192+
sleep(0.1)
193+
expect(subject.length).to be >= 5
194+
end
194195
end
195196
end
196-
end
197197

198-
context 'stress', notravis: true do
199-
configurations = [
200-
{ min_threads: 2,
201-
max_threads: ThreadPoolExecutor::DEFAULT_MAX_POOL_SIZE,
202-
auto_terminate: false,
203-
idletime: 0.1, # 1 minute
204-
max_queue: 0, # unlimited
205-
fallback_policy: :caller_runs, # shouldn't matter -- 0 max queue
206-
gc_interval: 0.1 },
207-
{ min_threads: 2,
208-
max_threads: 4,
209-
auto_terminate: false,
210-
idletime: 0.1, # 1 minute
211-
max_queue: 0, # unlimited
212-
fallback_policy: :caller_runs, # shouldn't matter -- 0 max queue
213-
gc_interval: 0.1 }
214-
]
215-
216-
configurations.each do |config|
217-
specify do
218-
pool = RubyThreadPoolExecutor.new(config)
219-
220-
10.times do
221-
count = Concurrent::CountDownLatch.new(100)
222-
100.times do
223-
pool.post { count.count_down }
224-
end
225-
count.wait
226-
sleep 0.01 # let the tasks end after count_down
227-
expect(pool.length).to be <= [200, config[:max_threads]].min
228-
if pool.length > [110, config[:max_threads]].min
229-
puts "ERRORSIZE #{pool.length} max #{config[:max_threads]}"
198+
context 'stress', notravis: true do
199+
configurations = [
200+
{ min_threads: 2,
201+
max_threads: ThreadPoolExecutor::DEFAULT_MAX_POOL_SIZE,
202+
auto_terminate: false,
203+
idletime: 0.1, # 1 minute
204+
max_queue: 0, # unlimited
205+
fallback_policy: :caller_runs, # shouldn't matter -- 0 max queue
206+
gc_interval: 0.1 },
207+
{ min_threads: 2,
208+
max_threads: 4,
209+
auto_terminate: false,
210+
idletime: 0.1, # 1 minute
211+
max_queue: 0, # unlimited
212+
fallback_policy: :caller_runs, # shouldn't matter -- 0 max queue
213+
gc_interval: 0.1 }
214+
]
215+
216+
configurations.each do |config|
217+
specify do
218+
pool = RubyThreadPoolExecutor.new(config)
219+
220+
10.times do
221+
count = Concurrent::CountDownLatch.new(100)
222+
100.times do
223+
pool.post { count.count_down }
224+
end
225+
count.wait
226+
sleep 0.01 # let the tasks end after count_down
227+
expect(pool.length).to be <= [200, config[:max_threads]].min
228+
if pool.length > [110, config[:max_threads]].min
229+
puts "ERRORSIZE #{pool.length} max #{config[:max_threads]}"
230+
end
230231
end
231232
end
232233
end
233234
end
234-
# end
235235
end
236236
end
237237
end

spec/concurrent/executor/fixed_thread_pool_spec.rb

Lines changed: 30 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -259,41 +259,41 @@ module Concurrent
259259

260260
context 'runtime-specific implementation' do
261261

262-
# if Concurrent.on_jruby?
263-
#
264-
# it 'sets :fallback_policy correctly' do
265-
# clazz = java.util.concurrent.ThreadPoolExecutor::DiscardPolicy
266-
# policy = clazz.new
267-
# expect(clazz).to receive(:new).at_least(:once).with(any_args).and_return(policy)
268-
#
269-
# subject = FixedThreadPool.new(5, fallback_policy: :discard)
270-
# expect(subject.fallback_policy).to eq :discard
271-
# end
272-
#
273-
# else
274-
275-
context 'exception handling' do
276-
277-
it 'restarts threads that experience exception' do
278-
count = subject.length
279-
count.times { subject << proc { raise StandardError } }
280-
sleep(1)
281-
expect(subject.length).to eq count
262+
if Concurrent.on_jruby?
263+
264+
it 'sets :fallback_policy correctly' do
265+
clazz = java.util.concurrent.ThreadPoolExecutor::DiscardPolicy
266+
policy = clazz.new
267+
expect(clazz).to receive(:new).at_least(:once).with(any_args).and_return(policy)
268+
269+
subject = FixedThreadPool.new(5, fallback_policy: :discard)
270+
expect(subject.fallback_policy).to eq :discard
282271
end
283-
end
284272

285-
context 'worker creation and caching' do
273+
else
286274

287-
it 'creates new workers when there are none available' do
288-
pool = described_class.new(5)
289-
expect(pool.length).to eq 0
290-
5.times { pool << proc { sleep(1) } }
291-
sleep(0.1)
292-
expect(pool.length).to eq 5
293-
pool.kill
275+
context 'exception handling' do
276+
277+
it 'restarts threads that experience exception' do
278+
count = subject.length
279+
count.times{ subject << proc{ raise StandardError } }
280+
sleep(1)
281+
expect(subject.length).to eq count
282+
end
283+
end
284+
285+
context 'worker creation and caching' do
286+
287+
it 'creates new workers when there are none available' do
288+
pool = described_class.new(5)
289+
expect(pool.length).to eq 0
290+
5.times{ pool << proc{ sleep(1) } }
291+
sleep(0.1)
292+
expect(pool.length).to eq 5
293+
pool.kill
294+
end
294295
end
295296
end
296-
# end
297297
end
298298
end
299299
end

0 commit comments

Comments
 (0)