Skip to content

Commit 3c32888

Browse files
committed
Better synchronization and documentation for ExecutorService.
1 parent 39a4e3b commit 3c32888

File tree

2 files changed

+128
-341
lines changed

2 files changed

+128
-341
lines changed

lib/concurrent/executor/executor.rb

Lines changed: 19 additions & 305 deletions
Original file line numberDiff line numberDiff line change
@@ -65,33 +65,34 @@ def self.executor(executor_identifier)
6565
end
6666
end
6767

68+
# @!macro [attach] executor_method_post
69+
#
70+
# Submit a task to the executor for asynchronous processing.
71+
#
72+
# @param [Array] args zero or more arguments to be passed to the task
73+
#
74+
# @yield the asynchronous task to perform
75+
#
76+
# @return [Boolean] `true` if the task is queued, `false` if the executor
77+
# is not running
78+
#
79+
# @raise [ArgumentError] if no task is given
6880
def post(*args, &task)
6981
raise NotImplementedError
7082
end
7183

84+
# @!macro [attach] executor_method_left_shift
85+
#
86+
# Submit a task to the executor for asynchronous processing.
87+
#
88+
# @param [Proc] task the asynchronous task to perform
89+
#
90+
# @return [self] returns itself
7291
def <<(task)
7392
post(&task)
7493
self
7594
end
7695

77-
## The policy defining how rejected tasks (tasks received once the
78-
## queue size reaches the configured `max_queue`, or after the
79-
## executor has shut down) are handled. Must be one of the values
80-
## specified in `FALLBACK_POLICIES`.
81-
#attr_reader :fallback_policy
82-
83-
## def initialize(opts)
84-
## @auto_terminate = opts.fetch(:auto_terminate, true)
85-
## end
86-
87-
#def auto_terminate?
88-
#synchronize { ns_auto_terminate? }
89-
#end
90-
91-
#def auto_terminate=(value)
92-
#synchronize { self.ns_auto_terminate = value }
93-
#end
94-
9596
# @!macro [attach] executor_module_method_can_overflow_question
9697
#
9798
# Does the task queue have a maximum size?
@@ -103,31 +104,6 @@ def can_overflow?
103104
false
104105
end
105106

106-
## Handler which executes the `fallback_policy` once the queue size
107-
## reaches `max_queue`.
108-
##
109-
## @param [Array] args the arguments to the task which is being handled.
110-
##
111-
## @!visibility private
112-
#def handle_fallback(*args)
113-
#case @fallback_policy
114-
#when :abort
115-
#raise RejectedExecutionError
116-
#when :discard
117-
#false
118-
#when :caller_runs
119-
#begin
120-
#yield(*args)
121-
#rescue => ex
122-
## let it fail
123-
#log DEBUG, ex
124-
#end
125-
#true
126-
#else
127-
#fail "Unknown fallback policy #{@fallback_policy}"
128-
#end
129-
#end
130-
131107
# @!macro [attach] executor_module_method_serialized_question
132108
#
133109
# Does this executor guarantee serialization of its operations?
@@ -140,30 +116,6 @@ def can_overflow?
140116
def serialized?
141117
false
142118
end
143-
144-
#private
145-
146-
#def ns_auto_terminate?
147-
#!!@auto_terminate
148-
#end
149-
150-
#def ns_auto_terminate=(value)
151-
#case value
152-
#when true
153-
#AtExit.add(self) { terminate_at_exit }
154-
#@auto_terminate = true
155-
#when false
156-
#AtExit.delete(self)
157-
#@auto_terminate = false
158-
#else
159-
#raise ArgumentError
160-
#end
161-
#end
162-
163-
#def terminate_at_exit
164-
#kill # TODO be gentle first
165-
#wait_for_termination(10)
166-
#end
167119
end
168120

169121
# Indicates that the including `Executor` or `ExecutorService` guarantees
@@ -193,242 +145,4 @@ def serialized?
193145
true
194146
end
195147
end
196-
197-
#module RubyExecutor
198-
#include Executor
199-
#include Logging
200-
201-
## The set of possible fallback policies that may be set at thread pool creation.
202-
#FALLBACK_POLICIES = [:abort, :discard, :caller_runs]
203-
204-
## @!macro [attach] executor_method_post
205-
##
206-
## Submit a task to the executor for asynchronous processing.
207-
##
208-
## @param [Array] args zero or more arguments to be passed to the task
209-
##
210-
## @yield the asynchronous task to perform
211-
##
212-
## @return [Boolean] `true` if the task is queued, `false` if the executor
213-
## is not running
214-
##
215-
## @raise [ArgumentError] if no task is given
216-
#def post(*args, &task)
217-
#raise ArgumentError.new('no block given') unless block_given?
218-
#mutex.synchronize do
219-
## If the executor is shut down, reject this task
220-
#return handle_fallback(*args, &task) unless running?
221-
#execute(*args, &task)
222-
#true
223-
#end
224-
#end
225-
226-
## @!macro [attach] executor_method_left_shift
227-
##
228-
## Submit a task to the executor for asynchronous processing.
229-
##
230-
## @param [Proc] task the asynchronous task to perform
231-
##
232-
## @return [self] returns itself
233-
#def <<(task)
234-
#post(&task)
235-
#self
236-
#end
237-
238-
## @!macro [attach] executor_method_running_question
239-
##
240-
## Is the executor running?
241-
##
242-
## @return [Boolean] `true` when running, `false` when shutting down or shutdown
243-
#def running?
244-
#!stop_event.set?
245-
#end
246-
247-
## @!macro [attach] executor_method_shuttingdown_question
248-
##
249-
## Is the executor shuttingdown?
250-
##
251-
## @return [Boolean] `true` when not running and not shutdown, else `false`
252-
#def shuttingdown?
253-
#!(running? || shutdown?)
254-
#end
255-
256-
## @!macro [attach] executor_method_shutdown_question
257-
##
258-
## Is the executor shutdown?
259-
##
260-
## @return [Boolean] `true` when shutdown, `false` when shutting down or running
261-
#def shutdown?
262-
#stopped_event.set?
263-
#end
264-
265-
## @!macro [attach] executor_method_shutdown
266-
##
267-
## Begin an orderly shutdown. Tasks already in the queue will be executed,
268-
## but no new tasks will be accepted. Has no additional effect if the
269-
## thread pool is not running.
270-
#def shutdown
271-
#mutex.synchronize do
272-
#break unless running?
273-
#self.ns_auto_terminate = false
274-
#stop_event.set
275-
#shutdown_execution
276-
#end
277-
#true
278-
#end
279-
280-
## @!macro [attach] executor_method_kill
281-
##
282-
## Begin an immediate shutdown. In-progress tasks will be allowed to
283-
## complete but enqueued tasks will be dismissed and no new tasks
284-
## will be accepted. Has no additional effect if the thread pool is
285-
## not running.
286-
#def kill
287-
#mutex.synchronize do
288-
#break if shutdown?
289-
#self.ns_auto_terminate = false
290-
#stop_event.set
291-
#kill_execution
292-
#stopped_event.set
293-
#end
294-
#true
295-
#end
296-
297-
## @!macro [attach] executor_method_wait_for_termination
298-
##
299-
## Block until executor shutdown is complete or until `timeout` seconds have
300-
## passed.
301-
##
302-
## @note Does not initiate shutdown or termination. Either `shutdown` or `kill`
303-
## must be called before this method (or on another thread).
304-
##
305-
## @param [Integer] timeout the maximum number of seconds to wait for shutdown to complete
306-
##
307-
## @return [Boolean] `true` if shutdown complete or false on `timeout`
308-
#def wait_for_termination(timeout = nil)
309-
#stopped_event.wait(timeout)
310-
#end
311-
312-
#protected
313-
314-
#attr_reader :mutex, :stop_event, :stopped_event
315-
316-
## @!macro [attach] executor_method_init_executor
317-
##
318-
## Initialize the executor by creating and initializing all the
319-
## internal synchronization objects.
320-
#def init_executor
321-
#@mutex = Mutex.new
322-
#@stop_event = Event.new
323-
#@stopped_event = Event.new
324-
#end
325-
326-
## @!macro [attach] executor_method_execute
327-
#def execute(*args, &task)
328-
#raise NotImplementedError
329-
#end
330-
331-
## @!macro [attach] executor_method_shutdown_execution
332-
##
333-
## Callback method called when an orderly shutdown has completed.
334-
## The default behavior is to signal all waiting threads.
335-
#def shutdown_execution
336-
#stopped_event.set
337-
#end
338-
339-
## @!macro [attach] executor_method_kill_execution
340-
##
341-
## Callback method called when the executor has been killed.
342-
## The default behavior is to do nothing.
343-
#def kill_execution
344-
## do nothing
345-
#end
346-
#end
347-
348-
#if Concurrent.on_jruby?
349-
350-
#module JavaExecutor
351-
#include Executor
352-
#java_import 'java.lang.Runnable'
353-
354-
## The set of possible fallback policies that may be set at thread pool creation.
355-
#FALLBACK_POLICIES = {
356-
#abort: java.util.concurrent.ThreadPoolExecutor::AbortPolicy,
357-
#discard: java.util.concurrent.ThreadPoolExecutor::DiscardPolicy,
358-
#caller_runs: java.util.concurrent.ThreadPoolExecutor::CallerRunsPolicy
359-
#}.freeze
360-
361-
## @!macro executor_method_post
362-
#def post(*args, &task)
363-
#raise ArgumentError.new('no block given') unless block_given?
364-
#return handle_fallback(*args, &task) unless running?
365-
#executor_submit = @executor.java_method(:submit, [Runnable.java_class])
366-
#executor_submit.call { yield(*args) }
367-
#true
368-
#rescue Java::JavaUtilConcurrent::RejectedExecutionException
369-
#raise RejectedExecutionError
370-
#end
371-
372-
## @!macro executor_method_left_shift
373-
#def <<(task)
374-
#post(&task)
375-
#self
376-
#end
377-
378-
## @!macro executor_method_running_question
379-
#def running?
380-
#!(shuttingdown? || shutdown?)
381-
#end
382-
383-
## @!macro executor_method_shuttingdown_question
384-
#def shuttingdown?
385-
#if @executor.respond_to? :isTerminating
386-
#@executor.isTerminating
387-
#else
388-
#false
389-
#end
390-
#end
391-
392-
## @!macro executor_method_shutdown_question
393-
#def shutdown?
394-
#@executor.isShutdown || @executor.isTerminated
395-
#end
396-
397-
## @!macro executor_method_wait_for_termination
398-
#def wait_for_termination(timeout = nil)
399-
#if timeout.nil?
400-
#ok = @executor.awaitTermination(60, java.util.concurrent.TimeUnit::SECONDS) until ok
401-
#true
402-
#else
403-
#@executor.awaitTermination(1000 * timeout, java.util.concurrent.TimeUnit::MILLISECONDS)
404-
#end
405-
#end
406-
407-
## @!macro executor_method_shutdown
408-
#def shutdown
409-
#self.ns_auto_terminate = false
410-
#@executor.shutdown
411-
#nil
412-
#end
413-
414-
## @!macro executor_method_kill
415-
#def kill
416-
#self.ns_auto_terminate = false
417-
#@executor.shutdownNow
418-
#nil
419-
#end
420-
421-
#protected
422-
423-
## FIXME: it's here just for synchronization in auto_terminate methods, should be replaced and solved
424-
## by the synchronization layer
425-
#def mutex
426-
#self
427-
#end
428-
429-
#def synchronize
430-
#JRuby.reference0(self).synchronized { yield }
431-
#end
432-
#end
433-
#end
434148
end

0 commit comments

Comments
 (0)