Skip to content

Commit d2d160b

Browse files
committed
Merge pull request #53 from jdantonio/refactor/with-ste
Extracted into Executor and JavaExecutor mixin modules
2 parents f4bea8d + 07afabb commit d2d160b

12 files changed

+343
-432
lines changed

lib/concurrent/executor/executor.rb

Lines changed: 222 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,222 @@
1+
require 'concurrent/atomic/event'
2+
3+
module Concurrent
4+
5+
# An exception class raised when the maximum queue size is reached and the
6+
# `overflow_policy` is set to `:abort`.
7+
RejectedExecutionError = Class.new(StandardError)
8+
9+
module Executor
10+
11+
# Submit a task to the executor for asynchronous processing.
12+
#
13+
# @param [Array] args zero or more arguments to be passed to the task
14+
#
15+
# @yield the asynchronous task to perform
16+
#
17+
# @return [Boolean] `true` if the task is queued, `false` if the executor
18+
# is not running
19+
#
20+
# @raise [ArgumentError] if no task is given
21+
def post(*args, &task)
22+
raise ArgumentError.new('no block given') unless block_given?
23+
mutex.synchronize do
24+
return false unless running?
25+
execute(*args, &task)
26+
true
27+
end
28+
end
29+
30+
# Submit a task to the executor for asynchronous processing.
31+
#
32+
# @param [Proc] task the asynchronous task to perform
33+
#
34+
# @return [self] returns itself
35+
def <<(task)
36+
post(&task)
37+
self
38+
end
39+
40+
# Is the executor running?
41+
#
42+
# @return [Boolean] `true` when running, `false` when shutting down or shutdown
43+
def running?
44+
! stop_event.set?
45+
end
46+
47+
# Is the executor shuttingdown?
48+
#
49+
# @return [Boolean] `true` when not running and not shutdown, else `false`
50+
def shuttingdown?
51+
! (running? || shutdown?)
52+
end
53+
54+
# Is the executor shutdown?
55+
#
56+
# @return [Boolean] `true` when shutdown, `false` when shutting down or running
57+
def shutdown?
58+
stopped_event.set?
59+
end
60+
61+
# Begin an orderly shutdown. Tasks already in the queue will be executed,
62+
# but no new tasks will be accepted. Has no additional effect if the
63+
# thread pool is not running.
64+
def shutdown
65+
mutex.synchronize do
66+
return unless running?
67+
stop_event.set
68+
shutdown_execution
69+
end
70+
true
71+
end
72+
73+
# Begin an immediate shutdown. In-progress tasks will be allowed to
74+
# complete but enqueued tasks will be dismissed and no new tasks
75+
# will be accepted. Has no additional effect if the thread pool is
76+
# not running.
77+
def kill
78+
mutex.synchronize do
79+
return if shutdown?
80+
stop_event.set
81+
kill_execution
82+
stopped_event.set
83+
end
84+
true
85+
end
86+
87+
# Block until executor shutdown is complete or until `timeout` seconds have
88+
# passed.
89+
#
90+
# @note Does not initiate shutdown or termination. Either `shutdown` or `kill`
91+
# must be called before this method (or on another thread).
92+
#
93+
# @param [Integer] timeout the maximum number of seconds to wait for shutdown to complete
94+
#
95+
# @return [Boolean] `true` if shutdown complete or false on `timeout`
96+
def wait_for_termination(timeout)
97+
stopped_event.wait(timeout.to_f)
98+
end
99+
100+
protected
101+
102+
attr_reader :mutex, :stop_event, :stopped_event
103+
104+
def init_executor
105+
@mutex = Mutex.new
106+
@stop_event = Event.new
107+
@stopped_event = Event.new
108+
end
109+
110+
def execute(*args, &task)
111+
raise NotImplementedError
112+
end
113+
114+
def shutdown_execution
115+
stopped_event.set
116+
end
117+
118+
def kill_execution
119+
# do nothing
120+
end
121+
end
122+
123+
if RUBY_PLATFORM == 'java'
124+
125+
module JavaExecutor
126+
127+
# Submit a task to the executor for asynchronous processing.
128+
#
129+
# @param [Array] args zero or more arguments to be passed to the task
130+
#
131+
# @yield the asynchronous task to perform
132+
#
133+
# @return [Boolean] `true` if the task is queued, `false` if the executor
134+
# is not running
135+
#
136+
# @raise [ArgumentError] if no task is given
137+
def post(*args)
138+
raise ArgumentError.new('no block given') unless block_given?
139+
if running?
140+
@executor.submit{ yield(*args) }
141+
true
142+
else
143+
false
144+
end
145+
rescue Java::JavaUtilConcurrent::RejectedExecutionException => ex
146+
raise RejectedExecutionError
147+
end
148+
149+
# Submit a task to the executor for asynchronous processing.
150+
#
151+
# @param [Proc] task the asynchronous task to perform
152+
#
153+
# @return [self] returns itself
154+
def <<(task)
155+
post(&task)
156+
self
157+
end
158+
159+
# Is the executor running?
160+
#
161+
# @return [Boolean] `true` when running, `false` when shutting down or shutdown
162+
def running?
163+
! (shuttingdown? || shutdown?)
164+
end
165+
166+
# Is the executor shuttingdown?
167+
#
168+
# @return [Boolean] `true` when not running and not shutdown, else `false`
169+
def shuttingdown?
170+
if @executor.respond_to? :isTerminating
171+
@executor.isTerminating
172+
else
173+
false
174+
end
175+
end
176+
177+
# Is the executor shutdown?
178+
#
179+
# @return [Boolean] `true` when shutdown, `false` when shutting down or running
180+
def shutdown?
181+
@executor.isShutdown || @executor.isTerminated
182+
end
183+
184+
# Block until executor shutdown is complete or until `timeout` seconds have
185+
# passed.
186+
#
187+
# @note Does not initiate shutdown or termination. Either `shutdown` or `kill`
188+
# must be called before this method (or on another thread).
189+
#
190+
# @param [Integer] timeout the maximum number of seconds to wait for shutdown to complete
191+
#
192+
# @return [Boolean] `true` if shutdown complete or false on `timeout`
193+
def wait_for_termination(timeout)
194+
@executor.awaitTermination(timeout.to_i, java.util.concurrent.TimeUnit::SECONDS)
195+
end
196+
197+
# Begin an orderly shutdown. Tasks already in the queue will be executed,
198+
# but no new tasks will be accepted. Has no additional effect if the
199+
# executor is not running.
200+
def shutdown
201+
@executor.shutdown
202+
nil
203+
end
204+
205+
# Begin an immediate shutdown. In-progress tasks will be allowed to
206+
# complete but enqueued tasks will be dismissed and no new tasks
207+
# will be accepted. Has no additional effect if the executor is
208+
# not running.
209+
def kill
210+
@executor.shutdownNow
211+
nil
212+
end
213+
214+
protected
215+
216+
def set_shutdown_hook
217+
# without this the process may fail to exit
218+
at_exit { self.kill }
219+
end
220+
end
221+
end
222+
end

lib/concurrent/executor/immediate_executor.rb

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,14 @@
11
module Concurrent
22
class ImmediateExecutor
33

4-
def post(*args, &block)
4+
def post(*args, &task)
55
raise ArgumentError.new('no block given') unless block_given?
6-
block.call(*args)
6+
task.call(*args)
77
return true
88
end
99

10-
def <<(block)
11-
post(&block)
10+
def <<(task)
11+
post(&task)
1212
self
1313
end
1414
end

lib/concurrent/executor/java_cached_thread_pool.rb

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,7 @@ def initialize(opts = {})
2424
@executor = java.util.concurrent.Executors.newCachedThreadPool
2525
@executor.setRejectedExecutionHandler(OVERFLOW_POLICIES[@overflow_policy].new)
2626

27-
# without this the process may fail to exit
28-
at_exit { self.kill }
27+
set_shutdown_hook
2928
end
3029
end
3130
end

lib/concurrent/executor/java_fixed_thread_pool.rb

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,7 @@ def initialize(num_threads, opts = {})
2626
@executor = java.util.concurrent.Executors.newFixedThreadPool(num_threads)
2727
@executor.setRejectedExecutionHandler(OVERFLOW_POLICIES[@overflow_policy].new)
2828

29-
# without this the process may fail to exit
30-
at_exit { self.kill }
29+
set_shutdown_hook
3130
end
3231
end
3332
end

lib/concurrent/executor/java_single_thread_executor.rb

Lines changed: 3 additions & 81 deletions
Original file line numberDiff line numberDiff line change
@@ -1,98 +1,20 @@
11
if RUBY_PLATFORM == 'java'
2+
require_relative 'executor'
23

34
module Concurrent
45

56
# @!macro single_thread_executor
67
class JavaSingleThreadExecutor
8+
include JavaExecutor
79

810
# Create a new thread pool.
911
#
1012
# @see http://docs.oracle.com/javase/tutorial/essential/concurrency/pools.html
1113
# @see http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/Executors.html
1214
# @see http://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ExecutorService.html
1315
def initialize(opts = {})
14-
1516
@executor = java.util.concurrent.Executors.newSingleThreadExecutor
16-
17-
# without this the process may fail to exit
18-
at_exit { self.kill }
19-
end
20-
21-
# Is the thread pool running?
22-
#
23-
# @return [Boolean] `true` when running, `false` when shutting down or shutdown
24-
def running?
25-
! (@executor.isShutdown || @executor.isTerminated)
26-
end
27-
28-
# Is the thread pool shutdown?
29-
#
30-
# @return [Boolean] `true` when shutdown, `false` when shutting down or running
31-
def shutdown?
32-
@executor.isShutdown
33-
end
34-
35-
# Block until thread pool shutdown is complete or until `timeout` seconds have
36-
# passed.
37-
#
38-
# @note Does not initiate shutdown or termination. Either `shutdown` or `kill`
39-
# must be called before this method (or on another thread).
40-
#
41-
# @param [Integer] timeout the maximum number of seconds to wait for shutdown to complete
42-
#
43-
# @return [Boolean] `true` if shutdown complete or false on `timeout`
44-
def wait_for_termination(timeout)
45-
@executor.awaitTermination(timeout.to_i, java.util.concurrent.TimeUnit::SECONDS)
46-
end
47-
48-
# Submit a task to the thread pool for asynchronous processing.
49-
#
50-
# @param [Array] args zero or more arguments to be passed to the task
51-
#
52-
# @yield the asynchronous task to perform
53-
#
54-
# @return [Boolean] `true` if the task is queued, `false` if the thread pool
55-
# is not running
56-
#
57-
# @raise [ArgumentError] if no task is given
58-
def post(*args)
59-
raise ArgumentError.new('no block given') unless block_given?
60-
if running?
61-
@executor.submit{ yield(*args) }
62-
true
63-
else
64-
false
65-
end
66-
rescue Java::JavaUtilConcurrent::RejectedExecutionException => ex
67-
raise RejectedExecutionError
68-
end
69-
70-
# Submit a task to the thread pool for asynchronous processing.
71-
#
72-
# @param [Proc] task the asynchronous task to perform
73-
#
74-
# @return [self] returns itself
75-
def <<(task)
76-
@executor.submit(&task)
77-
rescue Java::JavaUtilConcurrent::RejectedExecutionException => ex
78-
raise RejectedExecutionError
79-
end
80-
81-
# Begin an orderly shutdown. Tasks already in the queue will be executed,
82-
# but no new tasks will be accepted. Has no additional effect if the
83-
# thread pool is not running.
84-
def shutdown
85-
@executor.shutdown
86-
return nil
87-
end
88-
89-
# Begin an immediate shutdown. In-progress tasks will be allowed to
90-
# complete but enqueued tasks will be dismissed and no new tasks
91-
# will be accepted. Has no additional effect if the thread pool is
92-
# not running.
93-
def kill
94-
@executor.shutdownNow
95-
return nil
17+
set_shutdown_hook
9618
end
9719
end
9820
end

0 commit comments

Comments
 (0)