Skip to content

Commit 317c7fe

Browse files
committed
More consistent shutdown/kill behavior for executors.
1 parent 2de76a7 commit 317c7fe

File tree

4 files changed

+19
-58
lines changed

4 files changed

+19
-58
lines changed

lib/concurrent/executor/executor.rb

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ module Executor
2121
def post(*args, &task)
2222
raise ArgumentError.new('no block given') unless block_given?
2323
mutex.synchronize do
24-
break false unless running?
24+
return false unless running?
2525
execute(*args, &task)
2626
true
2727
end
@@ -41,14 +41,14 @@ def <<(task)
4141
#
4242
# @return [Boolean] `true` when running, `false` when shutting down or shutdown
4343
def running?
44-
! @stop_event.set?
44+
! stop_event.set?
4545
end
4646

4747
# Is the executor shutdown?
4848
#
4949
# @return [Boolean] `true` when shutdown, `false` when shutting down or running
5050
def shutdown?
51-
@stop_event.set?
51+
stop_event.set?
5252
end
5353

5454
def shutdown
@@ -67,7 +67,7 @@ def kill
6767
#
6868
# @return [Boolean] `true` if shutdown complete or false on `timeout`
6969
def wait_for_termination(timeout)
70-
@stopped_event.wait(timeout.to_i)
70+
stopped_event.wait(timeout.to_i)
7171
end
7272

7373
protected

lib/concurrent/executor/ruby_single_thread_executor.rb

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ def kill
4141
@thread.kill if alive?
4242
stopped_event.set unless alive?
4343
end
44+
true
4445
end
4546

4647
protected

lib/concurrent/executor/ruby_thread_pool_executor.rb

Lines changed: 2 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,6 @@ def initialize(opts = {})
9090
init_executor
9191

9292
@pool = []
93-
@stopped_event = Event.new
9493
@queue = Queue.new
9594
@scheduled_task_count = 0
9695
@completed_task_count = 0
@@ -133,19 +132,6 @@ def status
133132
mutex.synchronize { @pool.collect { |worker| worker.status } }
134133
end
135134

136-
# Block until thread pool shutdown is complete or until `timeout` seconds have
137-
# passed.
138-
#
139-
# @note Does not initiate shutdown or termination. Either `shutdown` or `kill`
140-
# must be called before this method (or on another thread).
141-
#
142-
# @param [Integer] timeout the maximum number of seconds to wait for shutdown to complete
143-
#
144-
# @return [Boolean] `true` if shutdown complete or false on `timeout`
145-
def wait_for_termination(timeout)
146-
return @stopped_event.wait(timeout.to_i)
147-
end
148-
149135
# Begin an orderly shutdown. Tasks already in the queue will be executed,
150136
# but no new tasks will be accepted. Has no additional effect if the
151137
# thread pool is not running.
@@ -155,11 +141,12 @@ def shutdown
155141
@queue.clear
156142
stop_event.set
157143
if @pool.empty?
158-
@stopped_event.set
144+
stopped_event.set
159145
else
160146
@pool.length.times{ @queue << :stop }
161147
end
162148
end
149+
true
163150
end
164151

165152
# Begin an immediate shutdown. In-progress tasks will be allowed to

lib/concurrent/executor/timer_set.rb

Lines changed: 12 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -22,37 +22,10 @@ class TimerSet
2222
# @option opts [object] :executor when provided will run all operations on
2323
# this executor rather than the global thread pool (overrides :operation)
2424
def initialize(opts = {})
25-
@mutex = Mutex.new
26-
@shutdown = Event.new
2725
@queue = PriorityQueue.new(order: :min)
2826
@executor = get_executor_from(opts)
2927
@thread = nil
30-
end
31-
32-
# Am I running?
33-
#
34-
# @return [Boolean] `true` when running, `false` when shutting down or shutdown
35-
def running?
36-
! @shutdown.set?
37-
end
38-
39-
# Am I shutdown?
40-
#
41-
# @return [Boolean] `true` when shutdown, `false` when shutting down or running
42-
def shutdown?
43-
@shutdown.set?
44-
end
45-
46-
# Block until shutdown is complete or until `timeout` seconds have passed.
47-
#
48-
# @note Does not initiate shutdown or termination. Either `shutdown` or `kill`
49-
# must be called before this method (or on another thread).
50-
#
51-
# @param [Integer] timeout the maximum number of seconds to wait for shutdown to complete
52-
#
53-
# @return [Boolean] `true` if shutdown complete or false on `timeout`
54-
def wait_for_termination(timeout)
55-
@shutdown.wait(timeout.to_f)
28+
init_executor
5629
end
5730

5831
# Post a task to be execute at the specified time. The given time may be either
@@ -69,8 +42,8 @@ def wait_for_termination(timeout)
6942
# @raise [ArgumentError] if the intended execution time is not in the future
7043
# @raise [ArgumentError] if no block is given
7144
def post(intended_time, &block)
72-
@mutex.synchronize do
73-
return false if shutdown?
45+
mutex.synchronize do
46+
return false unless running?
7447
raise ArgumentError.new('no block given') unless block_given?
7548
time = TimerSet.calculate_schedule_time(intended_time).to_f
7649

@@ -88,12 +61,12 @@ def post(intended_time, &block)
8861
# but no new tasks will be accepted. Has no additional effect if the
8962
# thread pool is not running.
9063
def shutdown
91-
@mutex.synchronize do
92-
unless @shutdown.set?
93-
@queue.clear
94-
@thread.kill if @thread
95-
@shutdown.set
96-
end
64+
mutex.synchronize do
65+
break unless running?
66+
stop_event.set
67+
@queue.clear
68+
@thread.kill if @thread
69+
stopped_event.set
9770
end
9871
true
9972
end
@@ -146,7 +119,7 @@ def <=>(other)
146119
#
147120
# @!visibility private
148121
def check_processing_thread!
149-
@mutex.synchronize do
122+
mutex.synchronize do
150123
return if shutdown? || @queue.empty?
151124
if @thread && @thread.status == 'sleep'
152125
@thread.wakeup
@@ -165,7 +138,7 @@ def check_processing_thread!
165138
#
166139
# @!visibility private
167140
def next_task
168-
@mutex.synchronize do
141+
mutex.synchronize do
169142
unless @queue.empty? || @queue.peek.time > Time.now.to_f
170143
@queue.pop
171144
else
@@ -182,7 +155,7 @@ def next_task
182155
#
183156
# @!visibility private
184157
def next_sleep_interval
185-
@mutex.synchronize do
158+
mutex.synchronize do
186159
if @queue.empty?
187160
nil
188161
else

0 commit comments

Comments
 (0)