Skip to content

Commit ad23f95

Browse files
committed
Most executors now share behavior through the Executor and JavaExecutor mixin modules.
1 parent 423866d commit ad23f95

8 files changed

+66
-54
lines changed

lib/concurrent/executor/executor.rb

Lines changed: 40 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -44,11 +44,18 @@ def running?
4444
! stop_event.set?
4545
end
4646

47+
# Is the executor shuttingdown?
48+
#
49+
# @return [Boolean] `true` when not running and not shutdown, else `false`
50+
def shuttingdown?
51+
! (running? || shutdown?)
52+
end
53+
4754
# Is the executor shutdown?
4855
#
4956
# @return [Boolean] `true` when shutdown, `false` when shutting down or running
5057
def shutdown?
51-
stop_event.set?
58+
stopped_event.set?
5259
end
5360

5461
# Begin an orderly shutdown. Tasks already in the queue will be executed,
@@ -58,12 +65,23 @@ def shutdown
5865
mutex.synchronize do
5966
return unless running?
6067
stop_event.set
61-
stop_execution
68+
shutdown_execution
6269
end
6370
true
6471
end
6572

73+
# Begin an immediate shutdown. In-progress tasks will be allowed to
74+
# complete but enqueued tasks will be dismissed and no new tasks
75+
# will be accepted. Has no additional effect if the thread pool is
76+
# not running.
6677
def kill
78+
mutex.synchronize do
79+
return if shutdown?
80+
stop_event.set
81+
kill_execution
82+
stopped_event.set
83+
end
84+
true
6785
end
6886

6987
# Block until executor shutdown is complete or until `timeout` seconds have
@@ -76,7 +94,7 @@ def kill
7694
#
7795
# @return [Boolean] `true` if shutdown complete or false on `timeout`
7896
def wait_for_termination(timeout)
79-
stopped_event.wait(timeout.to_i)
97+
stopped_event.wait(timeout.to_f)
8098
end
8199

82100
protected
@@ -93,9 +111,13 @@ def execute(*args, &task)
93111
raise NotImplementedError
94112
end
95113

96-
def stop_execution
114+
def shutdown_execution
97115
stopped_event.set
98116
end
117+
118+
def kill_execution
119+
# do nothing
120+
end
99121
end
100122

101123
if RUBY_PLATFORM == 'java'
@@ -138,14 +160,25 @@ def <<(task)
138160
#
139161
# @return [Boolean] `true` when running, `false` when shutting down or shutdown
140162
def running?
141-
! (@executor.isShutdown || @executor.isTerminated)
163+
! (shuttingdown? || shutdown?)
164+
end
165+
166+
# Is the executor shuttingdown?
167+
#
168+
# @return [Boolean] `true` when not running and not shutdown, else `false`
169+
def shuttingdown?
170+
if @executor.respond_to? :isTerminating
171+
@executor.isTerminating
172+
else
173+
false
174+
end
142175
end
143176

144177
# Is the executor shutdown?
145178
#
146179
# @return [Boolean] `true` when shutdown, `false` when shutting down or running
147180
def shutdown?
148-
@executor.isShutdown
181+
@executor.isShutdown || @executor.isTerminated
149182
end
150183

151184
# Block until executor shutdown is complete or until `timeout` seconds have
@@ -166,7 +199,7 @@ def wait_for_termination(timeout)
166199
# executor is not running.
167200
def shutdown
168201
@executor.shutdown
169-
return nil
202+
nil
170203
end
171204

172205
# Begin an immediate shutdown. In-progress tasks will be allowed to

lib/concurrent/executor/java_cached_thread_pool.rb

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,7 @@ def initialize(opts = {})
2424
@executor = java.util.concurrent.Executors.newCachedThreadPool
2525
@executor.setRejectedExecutionHandler(OVERFLOW_POLICIES[@overflow_policy].new)
2626

27-
# without this the process may fail to exit
28-
at_exit { self.kill }
27+
set_shutdown_hook
2928
end
3029
end
3130
end

lib/concurrent/executor/java_fixed_thread_pool.rb

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,7 @@ def initialize(num_threads, opts = {})
2626
@executor = java.util.concurrent.Executors.newFixedThreadPool(num_threads)
2727
@executor.setRejectedExecutionHandler(OVERFLOW_POLICIES[@overflow_policy].new)
2828

29-
# without this the process may fail to exit
30-
at_exit { self.kill }
29+
set_shutdown_hook
3130
end
3231
end
3332
end

lib/concurrent/executor/ruby_single_thread_executor.rb

Lines changed: 7 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -17,21 +17,6 @@ def initialize(opts = {})
1717
init_executor
1818
end
1919

20-
# Begin an immediate shutdown. In-progress tasks will be allowed to
21-
# complete but enqueued tasks will be dismissed and no new tasks
22-
# will be accepted. Has no additional effect if the thread pool is
23-
# not running.
24-
def kill
25-
mutex.synchronize do
26-
return if shutdown?
27-
stop_event.set
28-
@queue.clear
29-
@thread.kill if alive?
30-
stopped_event.set unless alive?
31-
end
32-
true
33-
end
34-
3520
protected
3621

3722
# @!visibility private
@@ -41,11 +26,17 @@ def execute(*args, &task)
4126
end
4227

4328
# @!visibility private
44-
def stop_execution
29+
def shutdown_execution
4530
@queue << :stop
4631
stopped_event.set unless alive?
4732
end
4833

34+
# @!visibility private
35+
def kill_execution
36+
@queue.clear
37+
@thread.kill if alive?
38+
end
39+
4940
# @!visibility private
5041
def alive?
5142
@thread && @thread.alive?

lib/concurrent/executor/ruby_thread_pool_executor.rb

Lines changed: 9 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -103,17 +103,15 @@ def initialize(opts = {})
103103
#
104104
# @return [Integer] the length
105105
def length
106-
mutex.synchronize do
107-
running? ? @pool.length : 0
108-
end
106+
mutex.synchronize{ running? ? @pool.length : 0 }
109107
end
110108
alias_method :current_length, :length
111109

112110
# The number of tasks in the queue awaiting execution.
113111
#
114112
# @return [Integer] the queue_length
115113
def queue_length
116-
@queue.length
114+
mutex.synchronize{ running? ? @queue.length : 0 }
117115
end
118116

119117
# Number of tasks that may be enqueued before reaching `max_queue` and rejecting
@@ -132,20 +130,6 @@ def status
132130
mutex.synchronize { @pool.collect { |worker| worker.status } }
133131
end
134132

135-
# Begin an immediate shutdown. In-progress tasks will be allowed to
136-
# complete but enqueued tasks will be dismissed and no new tasks
137-
# will be accepted. Has no additional effect if the thread pool is
138-
# not running.
139-
def kill
140-
mutex.synchronize do
141-
return if shutdown?
142-
stop_event.set
143-
@queue.clear
144-
drain_pool
145-
stopped_event.set
146-
end
147-
end
148-
149133
# Run on task completion.
150134
#
151135
# @!visibility private
@@ -184,7 +168,7 @@ def execute(*args, &task)
184168
end
185169

186170
# @!visibility private
187-
def stop_execution
171+
def shutdown_execution
188172
@queue.clear
189173
if @pool.empty?
190174
stopped_event.set
@@ -193,6 +177,12 @@ def stop_execution
193177
end
194178
end
195179

180+
# @!visibility private
181+
def kill_execution
182+
@queue.clear
183+
drain_pool
184+
end
185+
196186
# Handler which executes the `overflow_policy` once the queue size
197187
# reaches `max_queue`.
198188
#

lib/concurrent/executor/ruby_thread_pool_worker.rb

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3,25 +3,25 @@
33
module Concurrent
44

55
# @!visibility private
6-
class RubyThreadPoolWorker # :nodoc:
6+
class RubyThreadPoolWorker
77

88
# @!visibility private
9-
def initialize(queue, parent) # :nodoc:
9+
def initialize(queue, parent)
1010
@queue = queue
1111
@parent = parent
1212
@mutex = Mutex.new
1313
@last_activity = Time.now.to_f
1414
end
1515

1616
# @!visibility private
17-
def dead? # :nodoc:
17+
def dead?
1818
return @mutex.synchronize do
1919
@thread.nil? ? false : ! @thread.alive?
2020
end
2121
end
2222

2323
# @!visibility private
24-
def last_activity # :nodoc:
24+
def last_activity
2525
@mutex.synchronize { @last_activity }
2626
end
2727

@@ -33,15 +33,15 @@ def status
3333
end
3434

3535
# @!visibility private
36-
def kill # :nodoc:
36+
def kill
3737
@mutex.synchronize do
3838
Thread.kill(@thread) unless @thread.nil?
3939
@thread = nil
4040
end
4141
end
4242

4343
# @!visibility private
44-
def run(thread = Thread.current) # :nodoc:
44+
def run(thread = Thread.current)
4545
@mutex.synchronize do
4646
raise StandardError.new('already running') unless @thread.nil?
4747
@thread = thread

lib/concurrent/executor/safe_task_executor.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,4 +25,4 @@ def execute
2525
[success, value, reason]
2626
end
2727
end
28-
end
28+
end

lib/concurrent/executor/timer_set.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,7 @@ def execute(time, &task)
9999
end
100100

101101
# @!visibility private
102-
def stop_execution
102+
def shutdown_execution
103103
@queue.clear
104104
@thread.kill if @thread
105105
stopped_event.set

0 commit comments

Comments
 (0)