Skip to content

Commit 423866d

Browse files
committed
Shared #shutdown for RubySingleThreadExecutor, RubyThreadPoolExecutor, and TimerSet.
1 parent 0518797 commit 423866d

File tree

4 files changed

+42
-42
lines changed

4 files changed

+42
-42
lines changed

lib/concurrent/executor/executor.rb

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,16 @@ def shutdown?
5151
stop_event.set?
5252
end
5353

54+
# Begin an orderly shutdown. Tasks already in the queue will be executed,
55+
# but no new tasks will be accepted. Has no additional effect if the
56+
# thread pool is not running.
5457
def shutdown
58+
mutex.synchronize do
59+
return unless running?
60+
stop_event.set
61+
stop_execution
62+
end
63+
true
5564
end
5665

5766
def kill
@@ -83,6 +92,10 @@ def init_executor
8392
def execute(*args, &task)
8493
raise NotImplementedError
8594
end
95+
96+
def stop_execution
97+
stopped_event.set
98+
end
8699
end
87100

88101
if RUBY_PLATFORM == 'java'

lib/concurrent/executor/ruby_single_thread_executor.rb

Lines changed: 11 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -17,18 +17,6 @@ def initialize(opts = {})
1717
init_executor
1818
end
1919

20-
# Begin an orderly shutdown. Tasks already in the queue will be executed,
21-
# but no new tasks will be accepted. Has no additional effect if the
22-
# thread pool is not running.
23-
def shutdown
24-
mutex.synchronize do
25-
return unless running?
26-
stop_event.set
27-
@queue << :stop
28-
stopped_event.set unless alive?
29-
end
30-
end
31-
3220
# Begin an immediate shutdown. In-progress tasks will be allowed to
3321
# complete but enqueued tasks will be dismissed and no new tasks
3422
# will be accepted. Has no additional effect if the thread pool is
@@ -46,26 +34,37 @@ def kill
4634

4735
protected
4836

37+
# @!visibility private
4938
def execute(*args, &task)
5039
supervise
5140
@queue << [args, task]
5241
end
5342

43+
# @!visibility private
44+
def stop_execution
45+
@queue << :stop
46+
stopped_event.set unless alive?
47+
end
48+
49+
# @!visibility private
5450
def alive?
5551
@thread && @thread.alive?
5652
end
5753

54+
# @!visibility private
5855
def supervise
5956
@thread = new_worker_thread unless alive?
6057
end
6158

59+
# @!visibility private
6260
def new_worker_thread
6361
Thread.new do
6462
Thread.current.abort_on_exception = false
6563
work
6664
end
6765
end
6866

67+
# @!visibility private
6968
def work
7069
loop do
7170
task = @queue.pop

lib/concurrent/executor/ruby_thread_pool_executor.rb

Lines changed: 10 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -132,23 +132,6 @@ def status
132132
mutex.synchronize { @pool.collect { |worker| worker.status } }
133133
end
134134

135-
# Begin an orderly shutdown. Tasks already in the queue will be executed,
136-
# but no new tasks will be accepted. Has no additional effect if the
137-
# thread pool is not running.
138-
def shutdown
139-
mutex.synchronize do
140-
break unless running?
141-
@queue.clear
142-
stop_event.set
143-
if @pool.empty?
144-
stopped_event.set
145-
else
146-
@pool.length.times{ @queue << :stop }
147-
end
148-
end
149-
true
150-
end
151-
152135
# Begin an immediate shutdown. In-progress tasks will be allowed to
153136
# complete but enqueued tasks will be dismissed and no new tasks
154137
# will be accepted. Has no additional effect if the thread pool is
@@ -200,6 +183,16 @@ def execute(*args, &task)
200183
grow_pool
201184
end
202185

186+
# @!visibility private
187+
def stop_execution
188+
@queue.clear
189+
if @pool.empty?
190+
stopped_event.set
191+
else
192+
@pool.length.times{ @queue << :stop }
193+
end
194+
end
195+
203196
# Handler which executes the `overflow_policy` once the queue size
204197
# reaches `max_queue`.
205198
#

lib/concurrent/executor/timer_set.rb

Lines changed: 8 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -49,19 +49,6 @@ def post(intended_time, &task)
4949
end
5050
end
5151

52-
# Begin an orderly shutdown. Tasks already in the queue will be executed,
53-
# but no new tasks will be accepted. Has no additional effect if the
54-
# thread pool is not running.
55-
def shutdown
56-
mutex.synchronize do
57-
break unless running?
58-
stop_event.set
59-
@queue.clear
60-
@thread.kill if @thread
61-
stopped_event.set
62-
end
63-
true
64-
end
6552
alias_method :kill, :shutdown
6653

6754
# Calculate an Epoch time with milliseconds at which to execute a
@@ -102,6 +89,7 @@ def <=>(other)
10289
end
10390
end
10491

92+
# @!visibility private
10593
def execute(time, &task)
10694
if (time - Time.now.to_f) <= 0.01
10795
@executor.post(&task)
@@ -110,6 +98,13 @@ def execute(time, &task)
11098
end
11199
end
112100

101+
# @!visibility private
102+
def stop_execution
103+
@queue.clear
104+
@thread.kill if @thread
105+
stopped_event.set
106+
end
107+
113108
# Check the status of the processing thread. This thread is responsible
114109
# for monitoring the internal task queue and sending tasks to the
115110
# executor when it is time for them to be processed. If there is no

0 commit comments

Comments
 (0)