Skip to content

Commit 1cdd308

Browse files
committed
Merge pull request #299 from ruby-concurrency/cancel-future
Future, TimerSet, ScheduleTask cancel, reset, and reschedule
2 parents d28112a + d65d5f0 commit 1cdd308

File tree

13 files changed

+941
-375
lines changed

13 files changed

+941
-375
lines changed

.rspec

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
11
--require spec_helper
22
--color
3-
--backtrace
43
--format documentation

.yardopts

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
--protected
21
--no-private
32
--embed-mixins
43
--output-dir ./yardoc

lib/concurrent/collection/priority_queue.rb

Lines changed: 51 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ module Concurrent
2121
# When running under JRuby the class `PriorityQueue` extends `JavaPriorityQueue`.
2222
# When running under all other interpreters it extends `MutexPriorityQueue`.
2323
#
24-
# @note This implementation is *not* thread safe and performs no blocking.
24+
# @note This implementation is *not* thread safe.
2525
#
2626
# @see http://en.wikipedia.org/wiki/Priority_queue
2727
# @see http://ruby-doc.org/stdlib-2.0.0/libdoc/thread/rdoc/Queue.html
@@ -292,14 +292,57 @@ def self.from_list(list, opts = {})
292292
queue
293293
end
294294
end
295+
end
295296

296-
# @!macro priority_queue
297-
class PriorityQueue < JavaPriorityQueue
298-
end
299-
else
297+
PriorityQueueImplementation = case
298+
when Concurrent.on_jruby?
299+
JavaPriorityQueue
300+
else
301+
MutexPriorityQueue
302+
end
303+
private_constant :PriorityQueueImplementation
300304

301-
# @!macro priority_queue
302-
class PriorityQueue < MutexPriorityQueue
303-
end
305+
# @!macro priority_queue
306+
class PriorityQueue < PriorityQueueImplementation
307+
308+
alias_method :has_priority?, :include?
309+
310+
alias_method :size, :length
311+
312+
alias_method :deq, :pop
313+
alias_method :shift, :pop
314+
315+
alias_method :<<, :push
316+
alias_method :enq, :push
317+
318+
# @!method initialize(opts = {})
319+
# @!macro priority_queue_method_initialize
320+
321+
# @!method clear
322+
# @!macro priority_queue_method_clear
323+
324+
# @!method delete(item)
325+
# @!macro priority_queue_method_delete
326+
327+
# @!method empty?
328+
# @!macro priority_queue_method_empty
329+
330+
# @!method include?(item)
331+
# @!macro priority_queue_method_include
332+
333+
# @!method length
334+
# @!macro priority_queue_method_length
335+
336+
# @!method peek
337+
# @!macro priority_queue_method_peek
338+
339+
# @!method pop
340+
# @!macro priority_queue_method_pop
341+
342+
# @!method push(item)
343+
# @!macro priority_queue_method_push
344+
345+
# @!method self.from_list(list, opts = {})
346+
# @!macro priority_queue_method_from_list
304347
end
305348
end

lib/concurrent/errors.rb

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,9 @@ module Concurrent
33
# Raised when errors occur during configuration.
44
ConfigurationError = Class.new(StandardError)
55

6+
# Raised when an asynchronous operation is cancelled before execution.
7+
CancelledOperationError = Class.new(StandardError)
8+
69
# Raised when a lifecycle method (such as `stop`) is called in an improper
710
# sequence or when the object is in an inappropriate state.
811
LifecycleError = Class.new(StandardError)

lib/concurrent/executor/timer_set.rb

Lines changed: 63 additions & 69 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,17 @@
1-
require 'thread'
1+
require 'concurrent/scheduled_task'
22
require 'concurrent/atomic/event'
33
require 'concurrent/collection/priority_queue'
4-
require 'concurrent/executor/executor'
54
require 'concurrent/executor/executor_service'
65
require 'concurrent/executor/single_thread_executor'
7-
require 'concurrent/utility/monotonic_time'
86

97
module Concurrent
108

119
# Executes a collection of tasks, each after a given delay. A master task
1210
# monitors the set and schedules each task for execution at the appropriate
13-
# time. Tasks are run on the global task pool or on the supplied executor.
11+
# time. Tasks are run on the global thread pool or on the supplied executor.
12+
# Each task is represented as a `ScheduledTask`.
13+
#
14+
# @see Concurrent::ScheduledTask
1415
#
1516
# @!macro monotonic_clock_warning
1617
class TimerSet < RubyExecutorService
@@ -32,32 +33,28 @@ def initialize(opts = {})
3233
# delay is less than 1/100th of a second the task will be immediately post
3334
# to the executor.
3435
#
35-
# @param [Float] delay the number of seconds to wait for before executing the task
36+
# @param [Float] delay the number of seconds to wait for before executing the task.
37+
# @param [Array<Object>] args the arguments passed to the task on execution.
3638
#
37-
# @yield the task to be performed
39+
# @yield the task to be performed.
3840
#
39-
# @return [Boolean] true if the message is post, false after shutdown
41+
# @return [Concurrent::ScheduledTask, false] IVar representing the task if the post
42+
# is successful; false after shutdown.
4043
#
41-
# @raise [ArgumentError] if the intended execution time is not in the future
42-
# @raise [ArgumentError] if no block is given
44+
# @raise [ArgumentError] if the intended execution time is not in the future.
45+
# @raise [ArgumentError] if no block is given.
4346
#
4447
# @!macro deprecated_scheduling_by_clock_time
4548
def post(delay, *args, &task)
4649
raise ArgumentError.new('no block given') unless block_given?
47-
delay = TimerSet.calculate_delay!(delay) # raises exceptions
48-
49-
synchronize do
50-
return false unless running?
51-
52-
if (delay) <= 0.01
53-
@task_executor.post(*args, &task)
54-
else
55-
@queue.push(Task.new(Concurrent.monotonic_time + delay, args, task))
56-
@timer_executor.post(&method(:process_tasks))
57-
end
58-
@condition.set
59-
end
60-
true
50+
return false unless running?
51+
opts = {
52+
executor: @task_executor,
53+
args: args,
54+
timer_set: self
55+
}
56+
task = ScheduledTask.execute(delay, opts, &task) # may raise exception
57+
task.unscheduled? ? false : task
6158
end
6259

6360
# Begin an immediate shutdown. In-progress tasks will be allowed to
@@ -68,48 +65,14 @@ def kill
6865
shutdown
6966
end
7067

71-
# Schedule a task to be executed after a given delay (in seconds).
72-
#
73-
# @param [Float] delay the number of seconds to wait for before executing the task
74-
#
75-
# @return [Float] the number of seconds to delay
76-
#
77-
# @raise [ArgumentError] if the intended execution time is not in the future
78-
# @raise [ArgumentError] if no block is given
79-
#
80-
# @!macro deprecated_scheduling_by_clock_time
81-
#
82-
# @!visibility private
83-
def self.calculate_delay!(delay)
84-
if delay.is_a?(Time)
85-
warn '[DEPRECATED] Use an interval not a clock time; schedule is now based on a monotonic clock'
86-
now = Time.now
87-
raise ArgumentError.new('schedule time must be in the future') if delay <= now
88-
delay.to_f - now.to_f
89-
else
90-
raise ArgumentError.new('seconds must be greater than zero') if delay.to_f < 0.0
91-
delay.to_f
92-
end
93-
end
68+
private :<<
9469

9570
protected
9671

97-
# A struct for encapsulating a task and its intended execution time.
98-
# It facilitates proper prioritization by overriding the comparison
99-
# (spaceship) operator as a comparison of the intended execution
100-
# times.
72+
# Initialize the object.
10173
#
74+
# @param [Hash] opts the options to create the object with.
10275
# @!visibility private
103-
Task = Struct.new(:time, :args, :op) do
104-
include Comparable
105-
106-
def <=>(other)
107-
self.time <=> other.time
108-
end
109-
end
110-
111-
private_constant :Task
112-
11376
def ns_initialize(opts)
11477
@queue = PriorityQueue.new(order: :min)
11578
@task_executor = Executor.executor_from_options(opts) || Concurrent.global_io_executor
@@ -118,6 +81,44 @@ def ns_initialize(opts)
11881
self.auto_terminate = opts.fetch(:auto_terminate, true)
11982
end
12083

84+
# Post the task to the internal queue.
85+
#
86+
# @note This is intended as a callback method from ScheduledTask
87+
# only. It is not intended to be used directly. Post a task
88+
# by using the `SchedulesTask#execute` method.
89+
#
90+
# @!visibility private
91+
def post_task(task)
92+
synchronize{ ns_post_task(task) }
93+
end
94+
95+
# @!visibility private
96+
def ns_post_task(task)
97+
return false unless ns_running?
98+
if (task.initial_delay) <= 0.01
99+
task.executor.post{ task.process_task }
100+
else
101+
@queue.push(task)
102+
# only post the process method when the queue is empty
103+
@timer_executor.post(&method(:process_tasks)) if @queue.length == 1
104+
@condition.set
105+
end
106+
true
107+
end
108+
109+
# Remove the given task from the queue.
110+
#
111+
# @note This is intended as a callback method from `ScheduledTask`
112+
# only. It is not intended to be used directly. Cancel a task
113+
# by using the `ScheduledTask#cancel` method.
114+
#
115+
# @!visibility private
116+
def remove_task(task)
117+
synchronize{ @queue.delete(task) }
118+
end
119+
120+
# `ExecutorServic` callback called during shutdown.
121+
#
121122
# @!visibility private
122123
def shutdown_execution
123124
@queue.clear
@@ -137,7 +138,7 @@ def process_tasks
137138
break unless task
138139

139140
now = Concurrent.monotonic_time
140-
diff = task.time - now
141+
diff = task.schedule_time - now
141142

142143
if diff <= 0
143144
# We need to remove the task from the queue before passing
@@ -153,18 +154,11 @@ def process_tasks
153154
# queue now must have the same pop time, or a closer one, as
154155
# when we peeked).
155156
task = synchronize { @queue.pop }
156-
@task_executor.post(*task.args, &task.op)
157+
task.executor.post{ task.process_task }
157158
else
158159
@condition.wait([diff, 60].min)
159160
end
160161
end
161162
end
162-
163-
private
164-
165-
def <<(task)
166-
post(0.0, &task)
167-
self
168-
end
169163
end
170164
end

lib/concurrent/future.rb

Lines changed: 38 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ def initialize(opts = {}, &block)
4545
# future.state #=> :pending
4646
def execute
4747
if compare_and_set_state(:pending, :unscheduled)
48-
@executor.post(@args){ work }
48+
@executor.post{ safe_execute(@task, @args) }
4949
self
5050
end
5151
end
@@ -84,6 +84,43 @@ def set(value = IVar::NO_VALUE, &block)
8484
execute
8585
end
8686

87+
# Attempt to cancel the operation if it has not already processed.
88+
# The operation can only be cancelled while still `pending`. It cannot
89+
# be cancelled once it has begun processing or has completed.
90+
#
91+
# @return [Boolean] was the operation successfully cancelled.
92+
def cancel
93+
if compare_and_set_state(:cancelled, :pending)
94+
complete(false, nil, CancelledOperationError.new)
95+
true
96+
else
97+
false
98+
end
99+
end
100+
101+
# Has the operation been successfully cancelled?
102+
#
103+
# @return [Boolean]
104+
def cancelled?
105+
state == :cancelled
106+
end
107+
108+
# Wait the given number of seconds for the operation to complete.
109+
# On timeout attempt to cancel the operation.
110+
#
111+
# @param [Numeric] timeout the maximum time in seconds to wait.
112+
# @return [Boolean] true if the operation completed before the timeout
113+
# else false
114+
def wait_or_cancel(timeout)
115+
wait(timeout)
116+
if complete?
117+
true
118+
else
119+
cancel
120+
false
121+
end
122+
end
123+
87124
protected
88125

89126
def ns_initialize(value, opts)
@@ -93,13 +130,5 @@ def ns_initialize(value, opts)
93130
@executor = Executor.executor_from_options(opts) || Concurrent.global_io_executor
94131
@args = get_arguments_from(opts)
95132
end
96-
97-
private
98-
99-
# @!visibility private
100-
def work # :nodoc:
101-
success, val, reason = SafeTaskExecutor.new(@task, rescue_exception: true).execute(*@args)
102-
complete(success, val, reason)
103-
end
104133
end
105134
end

lib/concurrent/ivar.rb

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -148,6 +148,7 @@ def try_set(value = NO_VALUE, &block)
148148

149149
protected
150150

151+
# @!visibility private
151152
def ns_initialize(value, opts)
152153
init_obligation(self)
153154
self.observers = CopyOnWriteObserverSet.new
@@ -160,21 +161,34 @@ def ns_initialize(value, opts)
160161
end
161162
end
162163

164+
# @!visibility private
165+
def safe_execute(task, args = [])
166+
if compare_and_set_state(:processing, :pending)
167+
success, val, reason = SafeTaskExecutor.new(task, rescue_exception: true).execute(*@args)
168+
complete(success, val, reason)
169+
yield(success, val, reason) if block_given?
170+
end
171+
end
172+
173+
# @!visibility private
163174
def complete(success, value, reason)
164175
complete_without_notification(success, value, reason)
165176
notify_observers(self.value, reason)
166177
self
167178
end
168179

180+
# @!visibility private
169181
def complete_without_notification(success, value, reason)
170182
synchronize { ns_complete_without_notification(success, value, reason) }
171183
self
172184
end
173185

186+
# @!visibility private
174187
def notify_observers(value, reason)
175188
observers.notify_and_delete_observers{ [Time.now, value, reason] }
176189
end
177190

191+
# @!visibility private
178192
def ns_complete_without_notification(success, value, reason)
179193
raise MultipleAssignmentError if [:fulfilled, :rejected].include? @state
180194
set_state(success, value, reason)

0 commit comments

Comments
 (0)