Skip to content

Commit 880290a

Browse files
committed
Improved performance of Java thread pools
1 parent 30c2098 commit 880290a

File tree

5 files changed

+55
-5
lines changed

5 files changed

+55
-5
lines changed
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
#!/usr/bin/env ruby
2+
3+
$: << File.expand_path('../../lib', __FILE__)
4+
5+
require 'benchmark'
6+
require 'concurrent/executors'
7+
8+
COUNT = 100_000
9+
10+
EXECUTORS = [
11+
Concurrent::JavaThreadPoolExecutor
12+
]
13+
14+
def test_executor(executor_class, count)
15+
executor = executor_class.new
16+
count.times { executor.post{} }
17+
end
18+
19+
Benchmark.bmbm do |x|
20+
EXECUTORS.each do |executor_class|
21+
x.report(executor_class.to_s) { test_executor(executor_class, COUNT) }
22+
end
23+
end

lib/concurrent/executor/executor_service.rb

Lines changed: 23 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -250,13 +250,13 @@ class JavaExecutorService < AbstractExecutorService
250250

251251
def initialize(*args, &block)
252252
super
253+
ns_make_executor_runnable
253254
end
254255

255256
def post(*args, &task)
256257
raise ArgumentError.new('no block given') unless block_given?
257258
return handle_fallback(*args, &task) unless running?
258-
executor_submit = @executor.java_method(:submit, [Runnable.java_class])
259-
executor_submit.call { yield(*args) }
259+
@executor.submit_runnable Job.new(args, task)
260260
true
261261
rescue Java::JavaUtilConcurrent::RejectedExecutionException
262262
raise RejectedExecutionError
@@ -304,6 +304,27 @@ def ns_shuttingdown?
304304
def ns_shutdown?
305305
@executor.isShutdown || @executor.isTerminated
306306
end
307+
308+
def ns_make_executor_runnable
309+
if !defined?(@executor.submit_runnable)
310+
@executor.class.class_eval do
311+
java_alias :submit_runnable, :submit, [java.lang.Runnable.java_class]
312+
end
313+
end
314+
end
315+
316+
class Job
317+
include Runnable
318+
def initialize(args, block)
319+
@args = args
320+
@block = block
321+
end
322+
323+
def run
324+
@block.call(*@args)
325+
end
326+
end
327+
private_constant :Job
307328
end
308329
end
309330
end

lib/concurrent/executor/java_cached_thread_pool.rb

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,12 @@ class JavaCachedThreadPool < JavaThreadPoolExecutor
1717
#
1818
# @see http://docs.oracle.com/javase/8/docs/api/java/util/concurrent/Executors.html#newCachedThreadPool--
1919
def initialize(opts = {})
20-
super()
20+
super(opts)
21+
end
22+
23+
protected
2124

25+
def ns_initialize(opts)
2226
@fallback_policy = opts.fetch(:fallback_policy, opts.fetch(:overflow_policy, :abort))
2327
warn '[DEPRECATED] :overflow_policy is deprecated terminology, please use :fallback_policy instead' if opts.has_key?(:overflow_policy)
2428
@max_queue = 0

lib/concurrent/executor/java_fixed_thread_pool.rb

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,6 @@ def initialize(num_threads, opts = {})
2424
max_threads: num_threads
2525
}.merge(opts)
2626
super(opts)
27-
28-
self.auto_terminate = opts.fetch(:auto_terminate, true)
2927
end
3028
end
3129
end

lib/concurrent/executor/java_single_thread_executor.rb

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,10 @@ def initialize(opts = {})
2323

2424
protected
2525

26+
def ns_initialize(opts)
27+
super(opts)
28+
end
29+
2630
def ns_initialize(opts)
2731
@executor = java.util.concurrent.Executors.newSingleThreadExecutor
2832
@fallback_policy = opts.fetch(:fallback_policy, :discard)

0 commit comments

Comments
 (0)