Skip to content

Commit cf9213e

Browse files
committed
Synchronized initialization of RubyThreadPoolExecutor.
1 parent 41c31ff commit cf9213e

File tree

1 file changed

+28
-26
lines changed

1 file changed

+28
-26
lines changed

lib/concurrent/executor/ruby_thread_pool_executor.rb

Lines changed: 28 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -70,32 +70,7 @@ class RubyThreadPoolExecutor < RubyExecutorService
7070
#
7171
# @see http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/ThreadPoolExecutor.html
7272
def initialize(opts = {})
73-
super()
74-
75-
@min_length = opts.fetch(:min_threads, DEFAULT_MIN_POOL_SIZE).to_i
76-
@max_length = opts.fetch(:max_threads, DEFAULT_MAX_POOL_SIZE).to_i
77-
@idletime = opts.fetch(:idletime, DEFAULT_THREAD_IDLETIMEOUT).to_i
78-
@max_queue = opts.fetch(:max_queue, DEFAULT_MAX_QUEUE_SIZE).to_i
79-
@fallback_policy = opts.fetch(:fallback_policy, opts.fetch(:overflow_policy, :abort))
80-
raise ArgumentError.new("#{@fallback_policy} is not a valid fallback policy") unless FALLBACK_POLICIES.include?(@fallback_policy)
81-
warn '[DEPRECATED] :overflow_policy is deprecated terminology, please use :fallback_policy instead' if opts.has_key?(:overflow_policy)
82-
83-
raise ArgumentError.new('max_threads must be greater than zero') if @max_length <= 0
84-
raise ArgumentError.new('min_threads cannot be less than zero') if @min_length < 0
85-
raise ArgumentError.new('min_threads cannot be more than max_threads') if min_length > max_length
86-
87-
self.auto_terminate = opts.fetch(:auto_terminate, true)
88-
89-
@pool = [] # all workers
90-
@ready = [] # used as a stash (most idle worker is at the start)
91-
@queue = [] # used as queue
92-
# @ready or @queue is empty at all times
93-
@scheduled_task_count = 0
94-
@completed_task_count = 0
95-
@largest_length = 0
96-
97-
@gc_interval = opts.fetch(:gc_interval, @idletime / 2.0).to_i # undocumented
98-
@next_gc_time = Concurrent.monotonic_time + @gc_interval
73+
super(opts)
9974
end
10075

10176
# @!macro executor_module_method_can_overflow_question
@@ -153,6 +128,33 @@ def worker_died(worker)
153128

154129
protected
155130

131+
def ns_initialize(opts = {})
132+
@min_length = opts.fetch(:min_threads, DEFAULT_MIN_POOL_SIZE).to_i
133+
@max_length = opts.fetch(:max_threads, DEFAULT_MAX_POOL_SIZE).to_i
134+
@idletime = opts.fetch(:idletime, DEFAULT_THREAD_IDLETIMEOUT).to_i
135+
@max_queue = opts.fetch(:max_queue, DEFAULT_MAX_QUEUE_SIZE).to_i
136+
@fallback_policy = opts.fetch(:fallback_policy, opts.fetch(:overflow_policy, :abort))
137+
raise ArgumentError.new("#{@fallback_policy} is not a valid fallback policy") unless FALLBACK_POLICIES.include?(@fallback_policy)
138+
warn '[DEPRECATED] :overflow_policy is deprecated terminology, please use :fallback_policy instead' if opts.has_key?(:overflow_policy)
139+
140+
raise ArgumentError.new('max_threads must be greater than zero') if @max_length <= 0
141+
raise ArgumentError.new('min_threads cannot be less than zero') if @min_length < 0
142+
raise ArgumentError.new('min_threads cannot be more than max_threads') if min_length > max_length
143+
144+
self.auto_terminate = opts.fetch(:auto_terminate, true)
145+
146+
@pool = [] # all workers
147+
@ready = [] # used as a stash (most idle worker is at the start)
148+
@queue = [] # used as queue
149+
# @ready or @queue is empty at all times
150+
@scheduled_task_count = 0
151+
@completed_task_count = 0
152+
@largest_length = 0
153+
154+
@gc_interval = opts.fetch(:gc_interval, @idletime / 2.0).to_i # undocumented
155+
@next_gc_time = Concurrent.monotonic_time + @gc_interval
156+
end
157+
156158
def ns_limited_queue?
157159
@max_queue != 0
158160
end

0 commit comments

Comments
 (0)