Skip to content

Commit 39a4e3b

Browse files
committed
ExecutorService extends Synchronization::Object
1 parent 37edfe3 commit 39a4e3b

12 files changed

+596
-369
lines changed

lib/concurrent/executor/executor.rb

Lines changed: 307 additions & 300 deletions
Large diffs are not rendered by default.
Lines changed: 237 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,237 @@
1+
require 'concurrent/logging'
2+
require 'concurrent/synchronization'
3+
require 'concurrent/executor/executor'
4+
5+
module Concurrent
6+
7+
class AbstractExecutorService < Synchronization::Object
8+
include Executor
9+
include Logging
10+
11+
FALLBACK_POLICIES = [:abort, :discard, :caller_runs].freeze
12+
13+
attr_reader :fallback_policy
14+
15+
def initialize
16+
super()
17+
end
18+
19+
def running?
20+
raise NotImplementedError
21+
end
22+
23+
def shuttingdown?
24+
raise NotImplementedError
25+
end
26+
27+
def shutdown?
28+
raise NotImplementedError
29+
end
30+
31+
def shutdown
32+
raise NotImplementedError
33+
end
34+
35+
def kill
36+
raise NotImplementedError
37+
end
38+
39+
def wait_for_termination(timeout = nil)
40+
raise NotImplementedError
41+
end
42+
43+
def auto_terminate?
44+
synchronize { ns_auto_terminate? }
45+
end
46+
47+
def auto_terminate=(value)
48+
synchronize { self.ns_auto_terminate = value }
49+
end
50+
51+
protected
52+
53+
def handle_fallback(*args)
54+
case fallback_policy
55+
when :abort
56+
raise RejectedExecutionError
57+
when :discard
58+
false
59+
when :caller_runs
60+
begin
61+
yield(*args)
62+
rescue => ex
63+
# let it fail
64+
log DEBUG, ex
65+
end
66+
true
67+
else
68+
fail "Unknown fallback policy #{fallback_policy}"
69+
end
70+
end
71+
72+
def execute(*args, &task)
73+
raise NotImplementedError
74+
end
75+
76+
def shutdown_execution
77+
end
78+
79+
def kill_execution
80+
# do nothing
81+
end
82+
83+
private
84+
85+
def ns_auto_terminate?
86+
!!@auto_terminate
87+
end
88+
89+
def ns_auto_terminate=(value)
90+
case value
91+
when true
92+
AtExit.add(self) { terminate_at_exit }
93+
@auto_terminate = true
94+
when false
95+
AtExit.delete(self)
96+
@auto_terminate = false
97+
else
98+
raise ArgumentError
99+
end
100+
end
101+
102+
def terminate_at_exit
103+
kill # TODO be gentle first
104+
wait_for_termination(10)
105+
end
106+
end
107+
108+
class RubyExecutorService < AbstractExecutorService
109+
110+
def initialize
111+
super()
112+
@stop_event = Event.new
113+
@stopped_event = Event.new
114+
end
115+
116+
def post(*args, &task)
117+
raise ArgumentError.new('no block given') unless block_given?
118+
synchronize do
119+
# If the executor is shut down, reject this task
120+
return handle_fallback(*args, &task) unless running?
121+
execute(*args, &task)
122+
true
123+
end
124+
end
125+
126+
def running?
127+
!stop_event.set?
128+
end
129+
130+
def shuttingdown?
131+
!(running? || shutdown?)
132+
end
133+
134+
def shutdown?
135+
stopped_event.set?
136+
end
137+
138+
def shutdown
139+
synchronize do
140+
break unless running?
141+
self.ns_auto_terminate = false
142+
stop_event.set
143+
shutdown_execution
144+
end
145+
true
146+
end
147+
148+
def kill
149+
synchronize do
150+
break if shutdown?
151+
self.ns_auto_terminate = false
152+
stop_event.set
153+
kill_execution
154+
stopped_event.set
155+
end
156+
true
157+
end
158+
159+
def wait_for_termination(timeout = nil)
160+
stopped_event.wait(timeout)
161+
end
162+
163+
protected
164+
165+
attr_reader :stop_event, :stopped_event
166+
167+
def shutdown_execution
168+
stopped_event.set
169+
end
170+
end
171+
172+
if Concurrent.on_jruby?
173+
174+
class JavaExecutorService < AbstractExecutorService
175+
include Executor
176+
java_import 'java.lang.Runnable'
177+
178+
FALLBACK_POLICY_CLASSES = {
179+
abort: java.util.concurrent.ThreadPoolExecutor::AbortPolicy,
180+
discard: java.util.concurrent.ThreadPoolExecutor::DiscardPolicy,
181+
caller_runs: java.util.concurrent.ThreadPoolExecutor::CallerRunsPolicy
182+
}.freeze
183+
private_constant :FALLBACK_POLICY_CLASSES
184+
185+
def initialize
186+
super()
187+
end
188+
189+
def post(*args, &task)
190+
raise ArgumentError.new('no block given') unless block_given?
191+
return handle_fallback(*args, &task) unless running?
192+
executor_submit = @executor.java_method(:submit, [Runnable.java_class])
193+
executor_submit.call { yield(*args) }
194+
true
195+
rescue Java::JavaUtilConcurrent::RejectedExecutionException
196+
raise RejectedExecutionError
197+
end
198+
199+
def running?
200+
!(shuttingdown? || shutdown?)
201+
end
202+
203+
def shuttingdown?
204+
if @executor.respond_to? :isTerminating
205+
@executor.isTerminating
206+
else
207+
false
208+
end
209+
end
210+
211+
def shutdown?
212+
@executor.isShutdown || @executor.isTerminated
213+
end
214+
215+
def wait_for_termination(timeout = nil)
216+
if timeout.nil?
217+
ok = @executor.awaitTermination(60, java.util.concurrent.TimeUnit::SECONDS) until ok
218+
true
219+
else
220+
@executor.awaitTermination(1000 * timeout, java.util.concurrent.TimeUnit::MILLISECONDS)
221+
end
222+
end
223+
224+
def shutdown
225+
self.ns_auto_terminate = false
226+
@executor.shutdown
227+
nil
228+
end
229+
230+
def kill
231+
self.ns_auto_terminate = false
232+
@executor.shutdownNow
233+
nil
234+
end
235+
end
236+
end
237+
end

lib/concurrent/executor/java_cached_thread_pool.rb

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,14 +17,16 @@ class JavaCachedThreadPool < JavaThreadPoolExecutor
1717
#
1818
# @see http://docs.oracle.com/javase/8/docs/api/java/util/concurrent/Executors.html#newCachedThreadPool--
1919
def initialize(opts = {})
20+
super()
21+
2022
@fallback_policy = opts.fetch(:fallback_policy, opts.fetch(:overflow_policy, :abort))
2123
warn '[DEPRECATED] :overflow_policy is deprecated terminology, please use :fallback_policy instead' if opts.has_key?(:overflow_policy)
2224
@max_queue = 0
2325

24-
raise ArgumentError.new("#{@fallback_policy} is not a valid fallback policy") unless FALLBACK_POLICIES.keys.include?(@fallback_policy)
26+
raise ArgumentError.new("#{@fallback_policy} is not a valid fallback policy") unless FALLBACK_POLICY_CLASSES.keys.include?(@fallback_policy)
2527

2628
@executor = java.util.concurrent.Executors.newCachedThreadPool
27-
@executor.setRejectedExecutionHandler(FALLBACK_POLICIES[@fallback_policy].new)
29+
@executor.setRejectedExecutionHandler(FALLBACK_POLICY_CLASSES[@fallback_policy].new)
2830
@executor.setKeepAliveTime(opts.fetch(:idletime, DEFAULT_THREAD_IDLETIMEOUT), java.util.concurrent.TimeUnit::SECONDS)
2931

3032
self.auto_terminate = opts.fetch(:auto_terminate, true)

lib/concurrent/executor/java_single_thread_executor.rb

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,11 @@
11
if Concurrent.on_jruby?
2-
require_relative 'executor'
2+
require 'concurrent/executor/executor_service'
33

44
module Concurrent
55

66
# @!macro single_thread_executor
77
# @!macro thread_pool_options
8-
class JavaSingleThreadExecutor
9-
include JavaExecutor
8+
class JavaSingleThreadExecutor < JavaExecutorService
109
include SerialExecutor
1110

1211
# Create a new thread pool.
@@ -19,9 +18,10 @@ class JavaSingleThreadExecutor
1918
# @see http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/Executors.html
2019
# @see http://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ExecutorService.html
2120
def initialize(opts = {})
21+
super()
2222
@executor = java.util.concurrent.Executors.newSingleThreadExecutor
2323
@fallback_policy = opts.fetch(:fallback_policy, :discard)
24-
raise ArgumentError.new("#{@fallback_policy} is not a valid fallback policy") unless FALLBACK_POLICIES.keys.include?(@fallback_policy)
24+
raise ArgumentError.new("#{@fallback_policy} is not a valid fallback policy") unless FALLBACK_POLICY_CLASSES.keys.include?(@fallback_policy)
2525
self.auto_terminate = opts.fetch(:auto_terminate, true)
2626
end
2727
end

lib/concurrent/executor/java_thread_pool_executor.rb

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,11 @@
11
if Concurrent.on_jruby?
2-
require_relative 'executor'
2+
require 'concurrent/executor/executor_service'
33

44
module Concurrent
55

66
# @!macro thread_pool_executor
77
# @!macro thread_pool_options
8-
class JavaThreadPoolExecutor
9-
include JavaExecutor
8+
class JavaThreadPoolExecutor < JavaExecutorService
109

1110
# Default maximum number of threads that will be created in the pool.
1211
DEFAULT_MAX_POOL_SIZE = java.lang.Integer::MAX_VALUE # 2147483647
@@ -53,6 +52,8 @@ class JavaThreadPoolExecutor
5352
#
5453
# @see http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/ThreadPoolExecutor.html
5554
def initialize(opts = {})
55+
super()
56+
5657
min_length = opts.fetch(:min_threads, DEFAULT_MIN_POOL_SIZE).to_i
5758
max_length = opts.fetch(:max_threads, DEFAULT_MAX_POOL_SIZE).to_i
5859
idletime = opts.fetch(:idletime, DEFAULT_THREAD_IDLETIMEOUT).to_i
@@ -63,7 +64,7 @@ def initialize(opts = {})
6364
raise ArgumentError.new('max_threads must be greater than zero') if max_length <= 0
6465
raise ArgumentError.new('min_threads cannot be less than zero') if min_length < 0
6566
raise ArgumentError.new('min_threads cannot be more than max_threads') if min_length > max_length
66-
raise ArgumentError.new("#{fallback_policy} is not a valid fallback policy") unless FALLBACK_POLICIES.include?(@fallback_policy)
67+
raise ArgumentError.new("#{fallback_policy} is not a valid fallback policy") unless FALLBACK_POLICY_CLASSES.include?(@fallback_policy)
6768

6869
if @max_queue == 0
6970
queue = java.util.concurrent.LinkedBlockingQueue.new
@@ -74,7 +75,7 @@ def initialize(opts = {})
7475
@executor = java.util.concurrent.ThreadPoolExecutor.new(
7576
min_length, max_length,
7677
idletime, java.util.concurrent.TimeUnit::SECONDS,
77-
queue, FALLBACK_POLICIES[@fallback_policy].new)
78+
queue, FALLBACK_POLICY_CLASSES[@fallback_policy].new)
7879

7980
self.auto_terminate = opts.fetch(:auto_terminate, true)
8081
end

lib/concurrent/executor/per_thread_executor.rb

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
require 'concurrent/atomics'
2+
require 'concurrent/synchronization'
23
require 'concurrent/executor/executor'
34

45
module Concurrent
@@ -15,7 +16,7 @@ module Concurrent
1516
# lead to suboptimal performance.
1617
#
1718
# @note Intended for use primarily in testing and debugging.
18-
class PerThreadExecutor
19+
class PerThreadExecutor < Synchronization::Object
1920
include Executor
2021

2122
# Creates a new executor

lib/concurrent/executor/ruby_cached_thread_pool.rb

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@ class RubyCachedThreadPool < RubyThreadPoolExecutor
1414
# @raise [ArgumentError] if `fallback_policy` is not a known policy
1515
def initialize(opts = {})
1616
fallback_policy = opts.fetch(:fallback_policy, opts.fetch(:overflow_policy, :abort))
17-
1817
raise ArgumentError.new("#{fallback_policy} is not a valid fallback policy") unless FALLBACK_POLICIES.include?(fallback_policy)
1918

2019
defaults = { idletime: DEFAULT_THREAD_IDLETIMEOUT }

lib/concurrent/executor/ruby_fixed_thread_pool.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,9 @@ class RubyFixedThreadPool < RubyThreadPoolExecutor
1616
# @raise [ArgumentError] if `fallback_policy` is not a known policy
1717
def initialize(num_threads, opts = {})
1818
fallback_policy = opts.fetch(:fallback_policy, opts.fetch(:overflow_policy, :abort))
19+
raise ArgumentError.new("#{fallback_policy} is not a valid fallback policy") unless FALLBACK_POLICIES.include?(fallback_policy)
1920

2021
raise ArgumentError.new('number of threads must be greater than zero') if num_threads < 1
21-
raise ArgumentError.new("#{fallback_policy} is not a valid fallback policy") unless FALLBACK_POLICIES.include?(fallback_policy)
2222

2323
opts = {
2424
min_threads: num_threads,

lib/concurrent/executor/ruby_single_thread_executor.rb

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,11 @@
1-
require_relative 'executor'
1+
require 'concurrent/executor/executor'
2+
require 'concurrent/executor/executor_service'
23

34
module Concurrent
45

56
# @!macro single_thread_executor
67
# @!macro thread_pool_options
7-
class RubySingleThreadExecutor
8-
include RubyExecutor
8+
class RubySingleThreadExecutor < RubyExecutorService
99
include SerialExecutor
1010

1111
# Create a new thread pool.
@@ -18,11 +18,11 @@ class RubySingleThreadExecutor
1818
# @see http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/Executors.html
1919
# @see http://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ExecutorService.html
2020
def initialize(opts = {})
21+
super()
2122
@queue = Queue.new
2223
@thread = nil
2324
@fallback_policy = opts.fetch(:fallback_policy, :discard)
2425
raise ArgumentError.new("#{@fallback_policy} is not a valid fallback policy") unless FALLBACK_POLICIES.include?(@fallback_policy)
25-
init_executor
2626
self.auto_terminate = opts.fetch(:auto_terminate, true)
2727
end
2828

0 commit comments

Comments
 (0)