Skip to content

Commit 260aea7

Browse files
committed
Updated ScheduledTask to use new monotinic clock.
1 parent 95ebd9f commit 260aea7

File tree

4 files changed

+187
-131
lines changed

4 files changed

+187
-131
lines changed

lib/concurrent/executor/timer_set.rb

Lines changed: 30 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88

99
module Concurrent
1010

11-
# Executes a collection of tasks at the specified times. A master thread
11+
# Executes a collection of tasks, each after a given delay. A master thread
1212
# monitors the set and schedules each task for execution at the appropriate
1313
# time. Tasks are run on the global task pool or on the supplied executor.
1414
#
@@ -32,12 +32,11 @@ def initialize(opts = {})
3232
init_executor
3333
end
3434

35-
# Post a task to be execute at the specified time. The given time may be either
36-
# a `Time` object or the number of seconds to wait. If the intended execution
37-
# time is within 1/100th of a second of the current time the task will be
38-
# immediately post to the executor.
35+
# Post a task to be execute run after a given delay (in seconds). If the
36+
# delay is less than 1/100th of a second the task will be immediately post
37+
# to the executor.
3938
#
40-
# @param [Object] intended_time the time to schedule the task for execution
39+
# @param [Float] delay the number of seconds to wait for before executing the task
4140
#
4241
# @yield the task to be performed
4342
#
@@ -46,24 +45,10 @@ def initialize(opts = {})
4645
# @raise [ArgumentError] if the intended execution time is not in the future
4746
# @raise [ArgumentError] if no block is given
4847
#
49-
# @!macro [attach] convert_time_to_interval_warning
50-
#
51-
# @note Clock times are susceptible to changes in the system clock that
52-
# occur while the application is running. Timers based on intervals are
53-
# much more accurate because they can be set based on a monotonic clock.
54-
# Subsequently, execution intervals based on clock times will be
55-
# immediately converted to intervals based on a monotonic clock. Under
56-
# most scenarios this will make no difference. Should the system clock
57-
# change *after* the interval has been calculated, the interval will *not*
58-
# change. This is the intended behavior. This timer is not intended for
59-
# use in realtime operations or as a replacement for `cron` or similar
60-
# services. This level of accuracy is sufficient for the use cases this
61-
# timer was intended to solve.
62-
#
63-
# @!macro monotonic_clock_warning
64-
def post(intended_time, *args, &task)
48+
# @!macro deprecated_scheduling_by_clock_time
49+
def post(delay, *args, &task)
6550
raise ArgumentError.new('no block given') unless block_given?
66-
interval = calculate_interval(intended_time)
51+
interval = TimerSet.calculate_interval(delay)
6752

6853
mutex.synchronize do
6954
return false unless running?
@@ -80,40 +65,45 @@ def post(intended_time, *args, &task)
8065
true
8166
end
8267

68+
# @!visibility private
69+
def <<(task)
70+
post(0.0, &task)
71+
self
72+
end
73+
8374
# For a timer, #kill is like an orderly shutdown, except we need to manually
8475
# (and destructively) clear the queue first
8576
def kill
8677
mutex.synchronize { @queue.clear }
8778
shutdown
8879
end
8980

90-
private
91-
92-
# Calculate a time interval with milliseconds at which to execute a
93-
# task. If the given time is a `Time` object it will be converted
94-
# accordingly. If the time is a floating point value greater than
95-
# zero it will be understood as a number of seconds in the future.
81+
# Schedule a task to be execute run after a given delay (in seconds).
9682
#
97-
# @param [Time, Float] intended_time the time to schedule the task for
98-
# execution, expressed as a `Time` object or a floating point number
99-
# representing a number of seconds
83+
# @param [Float] delay the number of seconds to wait for before executing the task
10084
#
101-
# @return [Float] the intended time interval as seconds/millis
85+
# @return [Float] the number of seconds to delay
10286
#
10387
# @raise [ArgumentError] if the intended execution time is not in the future
88+
# @raise [ArgumentError] if no block is given
89+
#
90+
# @!macro deprecated_scheduling_by_clock_time
10491
#
105-
# @!macro convert_time_to_interval_warning
106-
def calculate_interval(intended_time)
107-
if intended_time.is_a?(Time)
92+
# @!visibility private
93+
def self.calculate_interval(delay)
94+
if delay.is_a?(Time)
95+
warn '[DEPRECATED] Use an interval not a clock time, schedule is now based on a monotonic clock'
10896
now = Time.now
109-
raise ArgumentError.new('schedule time must be in the future') if intended_time <= now
110-
intended_time.to_f - now.to_f
97+
raise ArgumentError.new('schedule time must be in the future') if delay <= now
98+
delay.to_f - now.to_f
11199
else
112-
raise ArgumentError.new('seconds must be greater than zero') if intended_time.to_f < 0.0
113-
intended_time.to_f
100+
raise ArgumentError.new('seconds must be greater than zero') if delay.to_f < 0.0
101+
delay.to_f
114102
end
115103
end
116104

105+
private
106+
117107
# A struct for encapsulating a task and its intended execution time.
118108
# It facilitates proper prioritization by overriding the comparison
119109
# (spaceship) operator as a comparison of the intended execution

lib/concurrent/scheduled_task.rb

Lines changed: 147 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -4,37 +4,172 @@
44

55
module Concurrent
66

7-
# {include:file:doc/scheduled_task.md}
7+
# `ScheduledTask` is a close relative of `Concurrent::Future` but with one
8+
# important difference. A `Future` is set to execute as soon as possible
9+
# whereas a `ScheduledTask` is set to execute at a specific time. This
10+
# implementation is loosely based on Java's
11+
# [ScheduledExecutorService](http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/ScheduledExecutorService.html).
12+
#
13+
# The *intended* schedule time of task execution is set on object construction
14+
# with first argument, `delay`. The delay is a numeric (floating point or integer)
15+
# representing a number of seconds in the future. Any other value or a numeric
16+
# equal to or less than zero will result in an exception. The *actual* schedule
17+
# time of task execution is set when the `execute` method is called.
18+
#
19+
# The constructor can also be given zero or more processing options. Currently
20+
# the only supported options are those recognized by the
21+
# [Dereferenceable](Dereferenceable) module.
22+
#
23+
# The final constructor argument is a block representing the task to be performed.
24+
# If no block is given an `ArgumentError` will be raised.
25+
#
26+
# **States**
27+
#
28+
# `ScheduledTask` mixes in the [Obligation](Obligation) module thus giving it
29+
# "future" behavior. This includes the expected lifecycle states. `ScheduledTask`
30+
# has one additional state, however. While the task (block) is being executed the
31+
# state of the object will be `:in_progress`. This additional state is necessary
32+
# because it has implications for task cancellation.
33+
#
34+
# **Cancellation**
35+
#
36+
# A `:pending` task can be cancelled using the `#cancel` method. A task in any
37+
# other state, including `:in_progress`, cannot be cancelled. The `#cancel`
38+
# method returns a boolean indicating the success of the cancellation attempt.
39+
# A cancelled `ScheduledTask` cannot be restarted. It is immutable.
40+
#
41+
# **Obligation and Observation**
42+
#
43+
# The result of a `ScheduledTask` can be obtained either synchronously or
44+
# asynchronously. `ScheduledTask` mixes in both the [Obligation](Obligation)
45+
# module and the
46+
# [Observable](http://ruby-doc.org/stdlib-2.0/libdoc/observer/rdoc/Observable.html)
47+
# module from the Ruby standard library. With one exception `ScheduledTask`
48+
# behaves identically to [Future](Observable) with regard to these modules.
49+
#
50+
# @example Basic usage
51+
#
52+
# require 'concurrent'
53+
# require 'thread' # for Queue
54+
# require 'open-uri' # for open(uri)
55+
#
56+
# class Ticker
57+
# def get_year_end_closing(symbol, year)
58+
# uri = "http://ichart.finance.yahoo.com/table.csv?s=#{symbol}&a=11&b=01&c=#{year}&d=11&e=31&f=#{year}&g=m"
59+
# data = open(uri) {|f| f.collect{|line| line.strip } }
60+
# data[1].split(',')[4].to_f
61+
# end
62+
# end
63+
#
64+
# # Future
65+
# price = Concurrent::Future.execute{ Ticker.new.get_year_end_closing('TWTR', 2013) }
66+
# price.state #=> :pending
67+
# sleep(1) # do other stuff
68+
# price.value #=> 63.65
69+
# price.state #=> :fulfilled
70+
#
71+
# # ScheduledTask
72+
# task = Concurrent::ScheduledTask.execute(2){ Ticker.new.get_year_end_closing('INTC', 2013) }
73+
# task.state #=> :pending
74+
# sleep(3) # do other stuff
75+
# task.value #=> 25.96
76+
#
77+
# @example Successful task execution
78+
#
79+
# task = Concurrent::ScheduledTask.new(2){ 'What does the fox say?' }
80+
# task.state #=> :unscheduled
81+
# task.execute
82+
# task.state #=> pending
83+
#
84+
# # wait for it...
85+
# sleep(3)
86+
#
87+
# task.unscheduled? #=> false
88+
# task.pending? #=> false
89+
# task.fulfilled? #=> true
90+
# task.rejected? #=> false
91+
# task.value #=> 'What does the fox say?'
92+
#
93+
# @example One line creation and execution
94+
#
95+
# task = Concurrent::ScheduledTask.new(2){ 'What does the fox say?' }.execute
96+
# task.state #=> pending
97+
#
98+
# @example Failed task execution
99+
#
100+
# task = Concurrent::ScheduledTask.execute(2){ raise StandardError.new('Call me maybe?') }
101+
# task.pending? #=> true
102+
#
103+
# # wait for it...
104+
# sleep(3)
105+
#
106+
# task.unscheduled? #=> false
107+
# task.pending? #=> false
108+
# task.fulfilled? #=> false
109+
# task.rejected? #=> true
110+
# task.value #=> nil
111+
# task.reason #=> #<StandardError: Call me maybe?>
112+
#
113+
# @example Task execution with observation
114+
#
115+
# observer = Class.new{
116+
# def update(time, value, reason)
117+
# puts "The task completed at #{time} with value '#{value}'"
118+
# end
119+
# }.new
120+
#
121+
# task = Concurrent::ScheduledTask.new(2){ 'What does the fox say?' }
122+
# task.add_observer(observer)
123+
# task.execute
124+
# task.pending? #=> true
125+
#
126+
# # wait for it...
127+
# sleep(3)
128+
#
129+
# #>> The task completed at 2013-11-07 12:26:09 -0500 with value 'What does the fox say?'
130+
#
131+
# @!macro monotonic_clock_warning
132+
#
133+
# @!macro [attach] deprecated_scheduling_by_clock_time
134+
#
135+
# @note Scheduling is now based on a monotonic clock. This makes the timer much
136+
# more accurate, but only when scheduling by passing a delay in seconds.
137+
# Scheduling a task based on a clock time is deprecated. It will still work
138+
# but will not be supported in the 1.0 release.
8139
class ScheduledTask < IVar
9140

10-
attr_reader :schedule_time
141+
attr_reader :delay
11142

12-
def initialize(intended_time, opts = {}, &block)
143+
# @!macro deprecated_scheduling_by_clock_time
144+
def initialize(delay, opts = {}, &block)
13145
raise ArgumentError.new('no block given') unless block_given?
14-
TimerSet.calculate_schedule_time(intended_time) # raises exceptons
146+
@delay = TimerSet.calculate_interval(delay)
15147

16148
super(NO_VALUE, opts)
17149

18150
self.observers = CopyOnNotifyObserverSet.new
19-
@intended_time = intended_time
20151
@state = :unscheduled
21152
@task = block
22153
@executor = OptionsParser::get_executor_from(opts) || Concurrent.configuration.global_operation_pool
23154
end
24155

25-
# @since 0.5.0
26156
def execute
27157
if compare_and_set_state(:pending, :unscheduled)
28-
now = Time.now
29-
@schedule_time = TimerSet.calculate_schedule_time(@intended_time, now)
30-
Concurrent::timer(@schedule_time.to_f - now.to_f) { @executor.post(&method(:process_task)) }
158+
@schedule_time = Time.now + @delay
159+
Concurrent::timer(@delay) { @executor.post(&method(:process_task)) }
31160
self
32161
end
33162
end
34163

35-
# @since 0.5.0
36-
def self.execute(intended_time, opts = {}, &block)
37-
return ScheduledTask.new(intended_time, opts, &block).execute
164+
# @!macro deprecated_scheduling_by_clock_time
165+
def self.execute(delay, opts = {}, &block)
166+
return ScheduledTask.new(delay, opts, &block).execute
167+
end
168+
169+
# @deprecated
170+
def schedule_time
171+
warn '[DEPRECATED] time is now based on a monotonic clock'
172+
@schedule_time
38173
end
39174

40175
def cancelled?
@@ -54,12 +189,6 @@ def cancel
54189
end
55190
alias_method :stop, :cancel
56191

57-
def add_observer(*args, &block)
58-
if_state(:unscheduled, :pending, :in_progress) do
59-
observers.add_observer(*args, &block)
60-
end
61-
end
62-
63192
protected :set, :fail, :complete
64193

65194
private

spec/concurrent/executor/timer_set_spec.rb

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ module Concurrent
2222
end
2323

2424
it 'executes a given task when given a Time' do
25+
warn 'deprecated syntax'
2526
latch = CountDownLatch.new(1)
2627
subject.post(Time.now + 0.1){ latch.count_down }
2728
expect(latch.wait(0.2)).to be_truthy
@@ -70,6 +71,7 @@ module Concurrent
7071
end
7172

7273
it 'raises an exception when given a task with a past Time value' do
74+
warn 'deprecated syntax'
7375
expect {
7476
subject.post(Time.now - 10){ nil }
7577
}.to raise_error(ArgumentError)

0 commit comments

Comments
 (0)