Skip to content

Commit efa3d36

Browse files
committed
wip: Java executors now share JavaExecutor mixin module.
1 parent 9d5a757 commit efa3d36

File tree

3 files changed

+96
-139
lines changed

3 files changed

+96
-139
lines changed

lib/concurrent/executor/executor.rb

Lines changed: 89 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -44,16 +44,6 @@ def wait_for_termination(timeout)
4444
def post(*args, &task)
4545
end
4646

47-
# Submit a task to the thread pool for asynchronous processing.
48-
#
49-
# @param [Proc] task the asynchronous task to perform
50-
#
51-
# @return [self] returns itself
52-
def <<(task)
53-
post(&task)
54-
self
55-
end
56-
5747
protected
5848

5949
attr_reader :mutex, :stop_event, :stopped_event
@@ -67,4 +57,93 @@ def init_executor
6757
def execute(*args, &task)
6858
end
6959
end
60+
61+
if RUBY_PLATFORM == 'java'
62+
63+
module JavaExecutor
64+
65+
# Submit a task to the thread pool for asynchronous processing.
66+
#
67+
# @param [Array] args zero or more arguments to be passed to the task
68+
#
69+
# @yield the asynchronous task to perform
70+
#
71+
# @return [Boolean] `true` if the task is queued, `false` if the thread pool
72+
# is not running
73+
#
74+
# @raise [ArgumentError] if no task is given
75+
def post(*args)
76+
raise ArgumentError.new('no block given') unless block_given?
77+
if running?
78+
@executor.submit{ yield(*args) }
79+
true
80+
else
81+
false
82+
end
83+
rescue Java::JavaUtilConcurrent::RejectedExecutionException => ex
84+
raise RejectedExecutionError
85+
end
86+
87+
# Submit a task to the thread pool for asynchronous processing.
88+
#
89+
# @param [Proc] task the asynchronous task to perform
90+
#
91+
# @return [self] returns itself
92+
def <<(task)
93+
post(&task)
94+
self
95+
end
96+
97+
# Is the thread pool running?
98+
#
99+
# @return [Boolean] `true` when running, `false` when shutting down or shutdown
100+
def running?
101+
! (@executor.isShutdown || @executor.isTerminated)
102+
end
103+
104+
# Is the thread pool shutdown?
105+
#
106+
# @return [Boolean] `true` when shutdown, `false` when shutting down or running
107+
def shutdown?
108+
@executor.isShutdown
109+
end
110+
111+
# Block until thread pool shutdown is complete or until `timeout` seconds have
112+
# passed.
113+
#
114+
# @note Does not initiate shutdown or termination. Either `shutdown` or `kill`
115+
# must be called before this method (or on another thread).
116+
#
117+
# @param [Integer] timeout the maximum number of seconds to wait for shutdown to complete
118+
#
119+
# @return [Boolean] `true` if shutdown complete or false on `timeout`
120+
def wait_for_termination(timeout)
121+
@executor.awaitTermination(timeout.to_i, java.util.concurrent.TimeUnit::SECONDS)
122+
end
123+
124+
# Begin an orderly shutdown. Tasks already in the queue will be executed,
125+
# but no new tasks will be accepted. Has no additional effect if the
126+
# thread pool is not running.
127+
def shutdown
128+
@executor.shutdown
129+
return nil
130+
end
131+
132+
# Begin an immediate shutdown. In-progress tasks will be allowed to
133+
# complete but enqueued tasks will be dismissed and no new tasks
134+
# will be accepted. Has no additional effect if the thread pool is
135+
# not running.
136+
def kill
137+
@executor.shutdownNow
138+
nil
139+
end
140+
141+
protected
142+
143+
def set_shutdown_hook
144+
# without this the process may fail to exit
145+
at_exit { self.kill }
146+
end
147+
end
148+
end
70149
end

lib/concurrent/executor/java_single_thread_executor.rb

Lines changed: 2 additions & 71 deletions
Original file line numberDiff line numberDiff line change
@@ -5,85 +5,16 @@ module Concurrent
55

66
# @!macro single_thread_executor
77
class JavaSingleThreadExecutor
8-
include Executor
8+
include JavaExecutor
99

1010
# Create a new thread pool.
1111
#
1212
# @see http://docs.oracle.com/javase/tutorial/essential/concurrency/pools.html
1313
# @see http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/Executors.html
1414
# @see http://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ExecutorService.html
1515
def initialize(opts = {})
16-
1716
@executor = java.util.concurrent.Executors.newSingleThreadExecutor
18-
19-
# without this the process may fail to exit
20-
at_exit { self.kill }
21-
end
22-
23-
# Is the thread pool running?
24-
#
25-
# @return [Boolean] `true` when running, `false` when shutting down or shutdown
26-
def running?
27-
! (@executor.isShutdown || @executor.isTerminated)
28-
end
29-
30-
# Is the thread pool shutdown?
31-
#
32-
# @return [Boolean] `true` when shutdown, `false` when shutting down or running
33-
def shutdown?
34-
@executor.isShutdown
35-
end
36-
37-
# Block until thread pool shutdown is complete or until `timeout` seconds have
38-
# passed.
39-
#
40-
# @note Does not initiate shutdown or termination. Either `shutdown` or `kill`
41-
# must be called before this method (or on another thread).
42-
#
43-
# @param [Integer] timeout the maximum number of seconds to wait for shutdown to complete
44-
#
45-
# @return [Boolean] `true` if shutdown complete or false on `timeout`
46-
def wait_for_termination(timeout)
47-
@executor.awaitTermination(timeout.to_i, java.util.concurrent.TimeUnit::SECONDS)
48-
end
49-
50-
# Submit a task to the thread pool for asynchronous processing.
51-
#
52-
# @param [Array] args zero or more arguments to be passed to the task
53-
#
54-
# @yield the asynchronous task to perform
55-
#
56-
# @return [Boolean] `true` if the task is queued, `false` if the thread pool
57-
# is not running
58-
#
59-
# @raise [ArgumentError] if no task is given
60-
def post(*args)
61-
raise ArgumentError.new('no block given') unless block_given?
62-
if running?
63-
@executor.submit{ yield(*args) }
64-
true
65-
else
66-
false
67-
end
68-
rescue Java::JavaUtilConcurrent::RejectedExecutionException => ex
69-
raise RejectedExecutionError
70-
end
71-
72-
# Begin an orderly shutdown. Tasks already in the queue will be executed,
73-
# but no new tasks will be accepted. Has no additional effect if the
74-
# thread pool is not running.
75-
def shutdown
76-
@executor.shutdown
77-
return nil
78-
end
79-
80-
# Begin an immediate shutdown. In-progress tasks will be allowed to
81-
# complete but enqueued tasks will be dismissed and no new tasks
82-
# will be accepted. Has no additional effect if the thread pool is
83-
# not running.
84-
def kill
85-
@executor.shutdownNow
86-
return nil
17+
set_shutdown_hook
8718
end
8819
end
8920
end

lib/concurrent/executor/java_thread_pool_executor.rb

Lines changed: 5 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ module Concurrent
55

66
# @!macro thread_pool_executor
77
class JavaThreadPoolExecutor
8-
include Executor
8+
include JavaExecutor
99

1010
# Default maximum number of threads that will be created in the pool.
1111
DEFAULT_MAX_POOL_SIZE = java.lang.Integer::MAX_VALUE # 2147483647
@@ -86,8 +86,7 @@ def initialize(opts = {})
8686
idletime, java.util.concurrent.TimeUnit::SECONDS,
8787
queue, OVERFLOW_POLICIES[@overflow_policy].new)
8888

89-
# without this the process may fail to exit
90-
at_exit { self.kill }
89+
set_shutdown_hook
9190
end
9291

9392
# The minimum number of threads that may be retained in the pool.
@@ -168,68 +167,16 @@ def status
168167
#
169168
# @return [Boolean] `true` when running, `false` when shutting down or shutdown
170169
def running?
171-
! (@executor.isShutdown || @executor.isTerminated || @executor.isTerminating)
172-
end
173-
174-
# Is the thread pool shutdown?
175-
#
176-
# @return [Boolean] `true` when shutdown, `false` when shutting down or running
177-
def shutdown?
178-
@executor.isShutdown
179-
end
180-
181-
# Block until thread pool shutdown is complete or until `timeout` seconds have
182-
# passed.
183-
#
184-
# @note Does not initiate shutdown or termination. Either `shutdown` or `kill`
185-
# must be called before this method (or on another thread).
186-
#
187-
# @param [Integer] timeout the maximum number of seconds to wait for shutdown to complete
188-
#
189-
# @return [Boolean] `true` if shutdown complete or false on `timeout`
190-
def wait_for_termination(timeout)
191-
@executor.awaitTermination(timeout.to_i, java.util.concurrent.TimeUnit::SECONDS)
192-
end
193-
194-
# Submit a task to the thread pool for asynchronous processing.
195-
#
196-
# @param [Array] args zero or more arguments to be passed to the task
197-
#
198-
# @yield the asynchronous task to perform
199-
#
200-
# @return [Boolean] `true` if the task is queued, `false` if the thread pool
201-
# is not running
202-
#
203-
# @raise [ArgumentError] if no task is given
204-
def post(*args)
205-
raise ArgumentError.new('no block given') unless block_given?
206-
if running?
207-
@executor.submit{ yield(*args) }
208-
true
209-
else
210-
false
211-
end
212-
rescue Java::JavaUtilConcurrent::RejectedExecutionException => ex
213-
raise RejectedExecutionError
170+
super && ! @executor.isTerminating
214171
end
215172

216173
# Begin an orderly shutdown. Tasks already in the queue will be executed,
217174
# but no new tasks will be accepted. Has no additional effect if the
218175
# thread pool is not running.
219176
def shutdown
220-
@executor.shutdown
221-
@executor.getQueue.clear
222-
return nil
223-
end
224-
225-
# Begin an immediate shutdown. In-progress tasks will be allowed to
226-
# complete but enqueued tasks will be dismissed and no new tasks
227-
# will be accepted. Has no additional effect if the thread pool is
228-
# not running.
229-
def kill
230-
@executor.shutdownNow
177+
super
231178
@executor.getQueue.clear
232-
return nil
179+
nil
233180
end
234181
end
235182
end

0 commit comments

Comments
 (0)