Skip to content

Commit 3a054ef

Browse files
committed
Improved Supervisor locking policy and small refactorings
1 parent a64aa02 commit 3a054ef

File tree

1 file changed

+36
-29
lines changed

1 file changed

+36
-29
lines changed

lib/concurrent/supervisor.rb

Lines changed: 36 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -20,10 +20,12 @@ class Supervisor
2020
WorkerContext = Struct.new(:worker, :type, :restart) do
2121
attr_accessor :thread
2222
attr_accessor :terminated
23+
2324
def alive?() return thread && thread.alive?; end
25+
2426
def needs_restart?
2527
return false if thread && thread.alive?
26-
return false if terminated == true
28+
return false if terminated
2729
case self.restart
2830
when :permanent
2931
return true
@@ -42,12 +44,12 @@ def add(context)
4244
self.supervisors += 1 if context.type == :supervisor
4345
self.workers += 1 if context.type == :worker
4446
end
45-
def active() sleeping + running + aborting end;
46-
def sleeping() @status.reduce(0){|x, s| x += (s == 'sleep' ? 1 : 0) } end;
47-
def running() @status.reduce(0){|x, s| x += (s == 'run' ? 1 : 0) } end;
48-
def aborting() @status.reduce(0){|x, s| x += (s == 'aborting' ? 1 : 0) } end;
49-
def stopped() @status.reduce(0){|x, s| x += (s == false ? 1 : 0) } end;
50-
def abend() @status.reduce(0){|x, s| x += (s.nil? ? 1 : 0) } end;
47+
def active() sleeping + running + aborting end
48+
def sleeping() @status.reduce(0){|x, s| x += (s == 'sleep' ? 1 : 0) } end
49+
def running() @status.reduce(0){|x, s| x += (s == 'run' ? 1 : 0) } end
50+
def aborting() @status.reduce(0){|x, s| x += (s == 'aborting' ? 1 : 0) } end
51+
def stopped() @status.reduce(0){|x, s| x += (s == false ? 1 : 0) } end
52+
def abend() @status.reduce(0){|x, s| x += (s.nil? ? 1 : 0) } end
5153
end
5254

5355
attr_reader :monitor_interval
@@ -100,12 +102,13 @@ def run
100102
@running = true
101103
end
102104
monitor
103-
return true
105+
true
104106
end
105107

106108
def stop
107-
return true unless @running
108109
@mutex.synchronize do
110+
return true unless @running
111+
109112
@running = false
110113
unless @monitor.nil?
111114
@monitor.run if @monitor.status == 'sleep'
@@ -122,54 +125,55 @@ def stop
122125
end
123126
prune_workers
124127
end
125-
return true
128+
129+
true
126130
end
127131

128132
def running?
129-
return @running == true
133+
@mutex.synchronize { @running }
130134
end
131135

132136
def length
133-
return @workers.length
137+
@mutex.synchronize { @workers.length }
134138
end
135139
alias_method :size, :length
136140

137141
def current_restart_count
138-
return @restart_times.length
142+
@restart_times.length
139143
end
140144

141145
def count
142-
return @mutex.synchronize do
146+
@mutex.synchronize do
143147
@count.status = @workers.collect{|w| w.thread ? w.thread.status : false }
144148
@count.dup.freeze
145149
end
146150
end
147151

148152
def add_worker(worker, opts = {})
149153
return nil if worker.nil? || ! behaves_as_worker?(worker)
150-
return @mutex.synchronize {
154+
@mutex.synchronize {
151155
restart = opts[:restart] || :permanent
152156
type = opts[:type] || (worker.is_a?(Supervisor) ? :supervisor : nil) || :worker
153157
raise ArgumentError.new(":#{restart} is not a valid restart option") unless CHILD_RESTART_OPTIONS.include?(restart)
154158
raise ArgumentError.new(":#{type} is not a valid child type") unless CHILD_TYPES.include?(type)
155159
context = WorkerContext.new(worker, type, restart)
156160
@workers << context
157161
@count.add(context)
158-
worker.run if running?
162+
worker.run if @running
159163
context.object_id
160164
}
161165
end
162166
alias_method :add_child, :add_worker
163167

164168
def add_workers(workers, opts = {})
165-
return workers.collect do |worker|
169+
workers.collect do |worker|
166170
add_worker(worker, opts)
167171
end
168172
end
169173
alias_method :add_children, :add_workers
170174

171175
def remove_worker(worker_id)
172-
return @mutex.synchronize do
176+
@mutex.synchronize do
173177
index, context = find_worker(worker_id)
174178
break(nil) if context.nil?
175179
break(false) if context.alive?
@@ -180,8 +184,9 @@ def remove_worker(worker_id)
180184
alias_method :remove_child, :remove_worker
181185

182186
def stop_worker(worker_id)
183-
return true unless running?
184-
return @mutex.synchronize do
187+
@mutex.synchronize do
188+
return true unless @running
189+
185190
index, context = find_worker(worker_id)
186191
break(nil) if index.nil?
187192
context.terminated = true
@@ -193,8 +198,9 @@ def stop_worker(worker_id)
193198
alias_method :stop_child, :stop_worker
194199

195200
def start_worker(worker_id)
196-
return false unless running?
197-
return @mutex.synchronize do
201+
@mutex.synchronize do
202+
return false unless @running
203+
198204
index, context = find_worker(worker_id)
199205
break(nil) if context.nil?
200206
context.terminated = false
@@ -205,8 +211,9 @@ def start_worker(worker_id)
205211
alias_method :start_child, :start_worker
206212

207213
def restart_worker(worker_id)
208-
return false unless running?
209-
return @mutex.synchronize do
214+
@mutex.synchronize do
215+
return false unless @running
216+
210217
index, context = find_worker(worker_id)
211218
break(nil) if context.nil?
212219
break(false) if context.restart == :temporary
@@ -221,7 +228,7 @@ def restart_worker(worker_id)
221228
private
222229

223230
def behaves_as_worker?(obj)
224-
return WORKER_API.each do |method, arity|
231+
WORKER_API.each do |method, arity|
225232
break(false) unless obj.respond_to?(method) && obj.method(method).arity == arity
226233
true
227234
end
@@ -247,7 +254,7 @@ def run_worker(context)
247254
Thread.current.abort_on_exception = false
248255
context.worker.run
249256
end
250-
return context
257+
context
251258
end
252259

253260
def terminate_worker(context)
@@ -272,9 +279,9 @@ def prune_workers
272279
def find_worker(worker_id)
273280
index = @workers.find_index{|worker| worker.object_id == worker_id}
274281
if index.nil?
275-
return [nil, nil]
282+
[nil, nil]
276283
else
277-
return [index, @workers[index]]
284+
[index, @workers[index]]
278285
end
279286
end
280287

@@ -286,7 +293,7 @@ def exceeded_max_restart_frequency?
286293
elsif diff >= @max_time
287294
@restart_times.pop
288295
end
289-
return false
296+
false
290297
end
291298

292299
#----------------------------------------------------------------

0 commit comments

Comments
 (0)