Skip to content

Commit 217c46e

Browse files
committed
ScheduledTask is now a TimerSet::Task.
1 parent a1f3d4f commit 217c46e

File tree

4 files changed

+59
-98
lines changed

4 files changed

+59
-98
lines changed

lib/concurrent/executor/timer_set.rb

Lines changed: 22 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -20,13 +20,14 @@ class TimerSet < RubyExecutorService
2020
class Task < Concurrent::IVar
2121
include Comparable
2222

23-
def initialize(parent, delay, args, task)
24-
super()
23+
def initialize(parent, delay, args, task, opts = {})
24+
super(IVar::NO_VALUE, opts, &nil)
2525
synchronize do
2626
ns_set_delay_and_time!(delay)
2727
@parent = parent
2828
@args = args
2929
@task = task
30+
self.observers = CopyOnNotifyObserverSet.new
3031
end
3132
end
3233

@@ -56,8 +57,13 @@ def processing?
5657
synchronize { ns_check_state?(:processing) }
5758
end
5859

60+
# Cancel this task and prevent it from executing. A task can only be
61+
# cancelled if it is pending or unscheduled.
62+
#
63+
# @return [Boolean] true if task execution is successfully cancelled
64+
# else false
5965
def cancel
60-
if compare_and_set_state(:cancelled, :pending)
66+
if compare_and_set_state(:cancelled, :pending, :unscheduled)
6167
complete(false, nil, CancelledOperationError.new)
6268
# To avoid deadlocks this call must occur outside of #synchronize
6369
# Changing the state above should prevent redundant calls
@@ -76,11 +82,11 @@ def reschedule(delay)
7682
end
7783

7884
# @!visibility private
79-
def execute
85+
def process_task
8086
safe_execute(@task, @args)
8187
end
8288

83-
protected :set, :try_set
89+
protected :set, :try_set, :fail, :complete
8490

8591
protected
8692

@@ -89,11 +95,12 @@ def ns_set_delay_and_time!(delay)
8995
@time = Concurrent.monotonic_time + @delay
9096
end
9197

92-
def ns_reschedule(delay)
98+
def ns_reschedule(delay, fail_if_cannot_remove = true)
9399
return false unless ns_check_state?(:pending)
94100
ns_set_delay_and_time!(delay)
95-
return false unless @parent.send(:remove_task, self)
96-
@parent.send(:ns_post_task, self)
101+
removed = @parent.send(:remove_task, self)
102+
return false if fail_if_cannot_remove && !removed
103+
@parent.send(:post_task, self)
97104
end
98105
end
99106

@@ -177,15 +184,19 @@ def ns_initialize(opts)
177184
self.auto_terminate = opts.fetch(:auto_terminate, true)
178185
end
179186

187+
def post_task(task)
188+
synchronize{ ns_post_task(task) }
189+
end
190+
180191
# @!visibility private
181192
def ns_post_task(task)
182193
return false unless ns_running?
183194
if (task.original_delay) <= 0.01
184-
@task_executor.post{ task.execute }
195+
@task_executor.post{ task.process_task }
185196
else
186197
@queue.push(task)
187198
# only post the process method when the queue is empty
188-
@timer_executor.post(&method(:process_tasks)) if @queue.size == 1
199+
@timer_executor.post(&method(:process_tasks)) if @queue.length == 1
189200
@condition.set
190201
end
191202
true
@@ -202,15 +213,6 @@ def remove_task(task)
202213
synchronize{ @queue.delete(task) }
203214
end
204215

205-
# @note This is intended as a callback method from Task only.
206-
# It is not intended to be used directly. Cancel a task by
207-
# using the `Task#cancel` method.
208-
#
209-
# @!visibility private
210-
def reschedule_task(task, delay)
211-
212-
end
213-
214216
# @!visibility private
215217
def shutdown_execution
216218
@queue.clear
@@ -246,7 +248,7 @@ def process_tasks
246248
# queue now must have the same pop time, or a closer one, as
247249
# when we peeked).
248250
task = synchronize { @queue.pop }
249-
@task_executor.post{ task.execute }
251+
@task_executor.post{ task.process_task }
250252
else
251253
@condition.wait([diff, 60].min)
252254
end

lib/concurrent/obligation.rb

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -164,7 +164,7 @@ def set_state(success, value, reason)
164164

165165
# @!visibility private
166166
def state=(value)
167-
mutex.synchronize { @state = value }
167+
mutex.synchronize { ns_set_state(value) }
168168
end
169169

170170
# Atomic compare and set operation
@@ -176,9 +176,9 @@ def state=(value)
176176
# @return [Boolean] true is state is changed, false otherwise
177177
#
178178
# @!visibility private
179-
def compare_and_set_state(next_state, expected_current)
179+
def compare_and_set_state(next_state, *expected_current)
180180
mutex.synchronize do
181-
if @state == expected_current
181+
if expected_current.include? @state
182182
@state = next_state
183183
true
184184
else
@@ -215,5 +215,10 @@ def if_state(*expected_states)
215215
def ns_check_state?(expected)
216216
@state == expected
217217
end
218+
219+
# @!visibility private
220+
def ns_set_state(value)
221+
@state = value
222+
end
218223
end
219224
end

lib/concurrent/scheduled_task.rb

Lines changed: 23 additions & 71 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,5 @@
1-
require 'concurrent/ivar'
2-
require 'concurrent/utility/timer'
3-
require 'concurrent/executor/executor'
4-
require 'concurrent/executor/safe_task_executor'
1+
require 'concurrent/configuration'
2+
require 'concurrent/executor/timer_set'
53

64
module Concurrent
75

@@ -133,22 +131,15 @@ module Concurrent
133131
# #>> The task completed at 2013-11-07 12:26:09 -0500 with value 'What does the fox say?'
134132
#
135133
# @!macro monotonic_clock_warning
136-
class ScheduledTask < IVar
137-
138-
attr_reader :delay
134+
class ScheduledTask < TimerSet::Task
139135

140136
# Schedule a task for execution at a specified future time.
141137
#
142138
# @yield the task to be performed
143139
#
144140
# @param [Float] delay the number of seconds to wait for before executing the task
145141
#
146-
# @param [Hash] opts the options controlling how the future will be processed
147-
# @option opts [Boolean] :operation (false) when `true` will execute the future on the global
148-
# operation pool (for long-running operations), when `false` will execute the future on the
149-
# global task pool (for short-running tasks)
150-
# @option opts [object] :executor when provided will run all operations on
151-
# this executor rather than the global thread pool (overrides :operation)
142+
# @!macro executor_and_deref_options
152143
#
153144
# @!macro [attach] deprecated_scheduling_by_clock_time
154145
#
@@ -158,16 +149,12 @@ class ScheduledTask < IVar
158149
# but will not be supported in the 1.0 release.
159150
def initialize(delay, opts = {}, &block)
160151
raise ArgumentError.new('no block given') unless block_given?
161-
@delay = TimerSet.calculate_delay!(delay)
162-
super(IVar::NO_VALUE, opts.merge(__task_from_block__: block), &nil)
163-
end
164-
165-
def ns_initialize(value, opts)
166-
super
167-
self.observers = CopyOnNotifyObserverSet.new
168-
@state = :unscheduled
169-
@task = opts[:__task_from_block__]
170-
@executor = Executor.executor_from_options(opts) || Concurrent.global_io_executor
152+
super(Concurrent.global_timer_set, delay, [], block, &nil)
153+
synchronize do
154+
ns_set_state(:unscheduled)
155+
@__original_delay__ = delay
156+
@time = nil
157+
end
171158
end
172159

173160
# Execute an `:unscheduled` `ScheduledTask`. Immediately sets the state to `:pending`
@@ -177,51 +164,25 @@ def ns_initialize(value, opts)
177164
# @return [ScheduledTask] a reference to `self`
178165
def execute
179166
if compare_and_set_state(:pending, :unscheduled)
180-
@schedule_time = Time.now + @delay
181-
Concurrent::timer(@delay) { @executor.post(&method(:process_task)) }
182-
self
167+
synchronize{ ns_reschedule(@__original_delay__, false) }
183168
end
169+
self
184170
end
185171

186172
# Create a new `ScheduledTask` object with the given block, execute it, and return the
187173
# `:pending` object.
188174
#
189175
# @param [Float] delay the number of seconds to wait for before executing the task
190176
#
191-
# @param [Hash] opts the options controlling how the future will be processed
192-
# @option opts [Boolean] :operation (false) when `true` will execute the future on the global
193-
# operation pool (for long-running operations), when `false` will execute the future on the
194-
# global task pool (for short-running tasks)
195-
# @option opts [object] :executor when provided will run all operations on
196-
# this executor rather than the global thread pool (overrides :operation)
177+
# @!macro executor_and_deref_options
197178
#
198179
# @return [ScheduledTask] the newly created `ScheduledTask` in the `:pending` state
199180
#
200181
# @raise [ArgumentError] if no block is given
201182
#
202183
# @!macro deprecated_scheduling_by_clock_time
203184
def self.execute(delay, opts = {}, &block)
204-
return ScheduledTask.new(delay, opts, &block).execute
205-
end
206-
207-
# @deprecated
208-
def schedule_time
209-
warn '[DEPRECATED] time is now based on a monotonic clock'
210-
@schedule_time
211-
end
212-
213-
# Has the task been cancelled?
214-
#
215-
# @return [Boolean] true if the task is in the given state else false
216-
def cancelled?
217-
state == :cancelled
218-
end
219-
220-
# In the task execution in progress?
221-
#
222-
# @return [Boolean] true if the task is in the given state else false
223-
def processing?
224-
state == :processing
185+
new(delay, &block).execute
225186
end
226187

227188
# In the task execution in progress?
@@ -239,26 +200,17 @@ def in_progress?
239200
#
240201
# @return [Boolean] true if task execution is successfully cancelled
241202
# else false
242-
def cancel
243-
if_state(:unscheduled, :pending) do
244-
@state = :cancelled
245-
event.set
246-
true
247-
end
203+
#
204+
# @deprecated
205+
def stop
206+
warn '[DEPRECATED] use #processing? instead'
207+
cancel
248208
end
249-
alias_method :stop, :cancel
250-
251-
protected :set, :fail, :complete
252209

253-
private
254-
255-
# @!visibility private
256-
def process_task
257-
safe_execute(@task) do |success, val, reason|
258-
event.set
259-
time = Time.now
260-
observers.notify_and_delete_observers { [time, self.value, reason] }
261-
end
210+
# @deprecated
211+
def delay
212+
warn '[DEPRECATED] use #original_delay instead'
213+
original_delay
262214
end
263215
end
264216
end

spec/concurrent/scheduled_task_spec.rb

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
module Concurrent
66

77
describe ScheduledTask do
8+
89
context 'behavior' do
910

1011
# obligation
@@ -57,7 +58,7 @@ def trigger_observable(observable)
5758
Timecop.freeze do
5859
now = Time.now
5960
task = ScheduledTask.new(expected){ nil }.execute
60-
expect(task.delay).to be_within(0.1).of(expected)
61+
expect(task.original_delay).to be_within(0.1).of(expected)
6162
end
6263
end
6364

@@ -66,7 +67,7 @@ def trigger_observable(observable)
6667
expected = 60 * 10
6768
schedule = Time.now + expected
6869
task = ScheduledTask.new(schedule){ nil }.execute
69-
expect(task.delay).to be_within(0.1).of(expected)
70+
expect(task.original_delay).to be_within(0.1).of(expected)
7071
end
7172

7273
it 'raises an exception when seconds is less than zero' do
@@ -185,11 +186,12 @@ def trigger_observable(observable)
185186
expect(task.cancel).to be_truthy
186187
end
187188

188-
it 'sets the state to :cancelled when cancelled' do
189+
it 'sets the reason to CancelledOperationError when cancelled' do
189190
task = ScheduledTask.new(10){ 42 }.execute
190191
sleep(0.1)
191192
task.cancel
192-
expect(task).to be_cancelled
193+
expect(task).to be_rejected
194+
expect(task.reason).to be_a CancelledOperationError
193195
end
194196
end
195197

0 commit comments

Comments
 (0)