Skip to content

Commit 42320af

Browse files
committed
Improved initialization synchronization of ExecutorService subclasses.
1 parent 55ce8f4 commit 42320af

8 files changed

+61
-42
lines changed

lib/concurrent/executor/executor_service.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -145,7 +145,7 @@ def kill_execution
145145
# do nothing
146146
end
147147

148-
private
148+
protected
149149

150150
def ns_auto_terminate?
151151
!!@auto_terminate

lib/concurrent/executor/java_single_thread_executor.rb

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,12 @@ class JavaSingleThreadExecutor < JavaExecutorService
1818
# @see http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/Executors.html
1919
# @see http://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ExecutorService.html
2020
def initialize(opts = {})
21-
super()
21+
super(opts)
22+
end
23+
24+
protected
25+
26+
def ns_initialize(opts)
2227
@executor = java.util.concurrent.Executors.newSingleThreadExecutor
2328
@fallback_policy = opts.fetch(:fallback_policy, :discard)
2429
raise ArgumentError.new("#{@fallback_policy} is not a valid fallback policy") unless FALLBACK_POLICY_CLASSES.keys.include?(@fallback_policy)

lib/concurrent/executor/java_thread_pool_executor.rb

Lines changed: 30 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -52,32 +52,7 @@ class JavaThreadPoolExecutor < JavaExecutorService
5252
#
5353
# @see http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/ThreadPoolExecutor.html
5454
def initialize(opts = {})
55-
super()
56-
57-
min_length = opts.fetch(:min_threads, DEFAULT_MIN_POOL_SIZE).to_i
58-
max_length = opts.fetch(:max_threads, DEFAULT_MAX_POOL_SIZE).to_i
59-
idletime = opts.fetch(:idletime, DEFAULT_THREAD_IDLETIMEOUT).to_i
60-
@max_queue = opts.fetch(:max_queue, DEFAULT_MAX_QUEUE_SIZE).to_i
61-
@fallback_policy = opts.fetch(:fallback_policy, opts.fetch(:overflow_policy, :abort))
62-
warn '[DEPRECATED] :overflow_policy is deprecated terminology, please use :fallback_policy instead' if opts.has_key?(:overflow_policy)
63-
64-
raise ArgumentError.new('max_threads must be greater than zero') if max_length <= 0
65-
raise ArgumentError.new('min_threads cannot be less than zero') if min_length < 0
66-
raise ArgumentError.new('min_threads cannot be more than max_threads') if min_length > max_length
67-
raise ArgumentError.new("#{fallback_policy} is not a valid fallback policy") unless FALLBACK_POLICY_CLASSES.include?(@fallback_policy)
68-
69-
if @max_queue == 0
70-
queue = java.util.concurrent.LinkedBlockingQueue.new
71-
else
72-
queue = java.util.concurrent.LinkedBlockingQueue.new(@max_queue)
73-
end
74-
75-
@executor = java.util.concurrent.ThreadPoolExecutor.new(
76-
min_length, max_length,
77-
idletime, java.util.concurrent.TimeUnit::SECONDS,
78-
queue, FALLBACK_POLICY_CLASSES[@fallback_policy].new)
79-
80-
self.auto_terminate = opts.fetch(:auto_terminate, true)
55+
super(opts)
8156
end
8257

8358
# @!macro executor_module_method_can_overflow_question
@@ -155,6 +130,35 @@ def remaining_capacity
155130
def running?
156131
super && !@executor.isTerminating
157132
end
133+
134+
protected
135+
136+
def ns_initialize(opts)
137+
min_length = opts.fetch(:min_threads, DEFAULT_MIN_POOL_SIZE).to_i
138+
max_length = opts.fetch(:max_threads, DEFAULT_MAX_POOL_SIZE).to_i
139+
idletime = opts.fetch(:idletime, DEFAULT_THREAD_IDLETIMEOUT).to_i
140+
@max_queue = opts.fetch(:max_queue, DEFAULT_MAX_QUEUE_SIZE).to_i
141+
@fallback_policy = opts.fetch(:fallback_policy, opts.fetch(:overflow_policy, :abort))
142+
warn '[DEPRECATED] :overflow_policy is deprecated terminology, please use :fallback_policy instead' if opts.has_key?(:overflow_policy)
143+
144+
raise ArgumentError.new('max_threads must be greater than zero') if max_length <= 0
145+
raise ArgumentError.new('min_threads cannot be less than zero') if min_length < 0
146+
raise ArgumentError.new('min_threads cannot be more than max_threads') if min_length > max_length
147+
raise ArgumentError.new("#{fallback_policy} is not a valid fallback policy") unless FALLBACK_POLICY_CLASSES.include?(@fallback_policy)
148+
149+
if @max_queue == 0
150+
queue = java.util.concurrent.LinkedBlockingQueue.new
151+
else
152+
queue = java.util.concurrent.LinkedBlockingQueue.new(@max_queue)
153+
end
154+
155+
@executor = java.util.concurrent.ThreadPoolExecutor.new(
156+
min_length, max_length,
157+
idletime, java.util.concurrent.TimeUnit::SECONDS,
158+
queue, FALLBACK_POLICY_CLASSES[@fallback_policy].new)
159+
160+
self.auto_terminate = opts.fetch(:auto_terminate, true)
161+
end
158162
end
159163
end
160164
end

lib/concurrent/executor/ruby_single_thread_executor.rb

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,16 +18,19 @@ class RubySingleThreadExecutor < RubyExecutorService
1818
# @see http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/Executors.html
1919
# @see http://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ExecutorService.html
2020
def initialize(opts = {})
21-
super()
21+
super
22+
end
23+
24+
protected
25+
26+
def ns_initialize(opts)
2227
@queue = Queue.new
2328
@thread = nil
2429
@fallback_policy = opts.fetch(:fallback_policy, :discard)
2530
raise ArgumentError.new("#{@fallback_policy} is not a valid fallback policy") unless FALLBACK_POLICIES.include?(@fallback_policy)
2631
self.auto_terminate = opts.fetch(:auto_terminate, true)
2732
end
2833

29-
protected
30-
3134
# @!visibility private
3235
def execute(*args, &task)
3336
supervise

lib/concurrent/executor/ruby_thread_pool_executor.rb

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -128,7 +128,7 @@ def worker_died(worker)
128128

129129
protected
130130

131-
def ns_initialize(opts = {})
131+
def ns_initialize(opts)
132132
@min_length = opts.fetch(:min_threads, DEFAULT_MIN_POOL_SIZE).to_i
133133
@max_length = opts.fetch(:max_threads, DEFAULT_MAX_POOL_SIZE).to_i
134134
@idletime = opts.fetch(:idletime, DEFAULT_THREAD_IDLETIMEOUT).to_i
@@ -345,6 +345,5 @@ def run_task(pool, task, args)
345345
throw :stop
346346
end
347347
end
348-
349348
end
350349
end

lib/concurrent/executor/serialized_execution.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ def posts(posts)
6161
true
6262
end
6363

64-
private
64+
protected
6565

6666
def ns_initialize
6767
@being_executed = false

lib/concurrent/executor/timer_set.rb

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -24,12 +24,7 @@ class TimerSet < RubyExecutorService
2424
# `:operation` returns the global operation pool, and `:immediate` returns a new
2525
# `ImmediateExecutor` object.
2626
def initialize(opts = {})
27-
super()
28-
@queue = PriorityQueue.new(order: :min)
29-
@task_executor = Executor.executor_from_options(opts) || Concurrent.global_io_executor
30-
@timer_executor = SingleThreadExecutor.new
31-
@condition = Event.new
32-
self.auto_terminate = opts.fetch(:auto_terminate, true)
27+
super(opts)
3328
end
3429

3530
# Post a task to be execute run after a given delay (in seconds). If the
@@ -99,7 +94,7 @@ def self.calculate_delay!(delay)
9994
end
10095
end
10196

102-
private
97+
protected
10398

10499
# A struct for encapsulating a task and its intended execution time.
105100
# It facilitates proper prioritization by overriding the comparison
@@ -117,6 +112,14 @@ def <=>(other)
117112

118113
private_constant :Task
119114

115+
def ns_initialize(opts)
116+
@queue = PriorityQueue.new(order: :min)
117+
@task_executor = Executor.executor_from_options(opts) || Concurrent.global_io_executor
118+
@timer_executor = SingleThreadExecutor.new
119+
@condition = Event.new
120+
self.auto_terminate = opts.fetch(:auto_terminate, true)
121+
end
122+
120123
# @!visibility private
121124
def shutdown_execution
122125
@queue.clear

lib/concurrent/timer_task.rb

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -267,11 +267,16 @@ def timeout_interval=(value)
267267
end
268268
end
269269

270+
if Concurrent.on_jruby?
271+
#FIXME: JRuby seems to handle privacy different here
272+
public :synchronize
273+
end
274+
270275
private :post, :<<
271276

272277
protected
273278

274-
def ns_initialize(opts = {}, &task)
279+
def ns_initialize(opts, &task)
275280
init_mutex(self)
276281
set_deref_options(opts)
277282

0 commit comments

Comments
 (0)