1
1
require 'thread'
2
- require_relative 'executor'
3
2
require 'concurrent/options_parser'
4
3
require 'concurrent/atomic/event'
5
4
require 'concurrent/collection/priority_queue'
5
+ require 'concurrent/executor/executor'
6
6
require 'concurrent/executor/single_thread_executor'
7
+ require 'concurrent/utility/monotonic_time'
7
8
8
9
module Concurrent
9
10
10
11
# Executes a collection of tasks at the specified times. A master thread
11
12
# monitors the set and schedules each task for execution at the appropriate
12
13
# time. Tasks are run on the global task pool or on the supplied executor.
14
+ #
15
+ # @!macro monotonic_clock_warning
13
16
class TimerSet
14
17
include RubyExecutor
15
18
@@ -42,17 +45,33 @@ def initialize(opts = {})
42
45
#
43
46
# @raise [ArgumentError] if the intended execution time is not in the future
44
47
# @raise [ArgumentError] if no block is given
48
+ #
49
+ # @!macro [attach] convert_time_to_interval_warning
50
+ #
51
+ # @note Clock times are susceptible to changes in the system clock that
52
+ # occur while the application is running. Timers based on intervals are
53
+ # much more accurate because they can be set based on a monotonic clock.
54
+ # Subsequently, execution intervals based on clock times will be
55
+ # immediately converted to intervals based on a monotonic clock. Under
56
+ # most scenarios this will make no difference. Should the system clock
57
+ # change *after* the interval has been calculated, the interval will *not*
58
+ # change. This is the intended behavior. This timer is not intended for
59
+ # use in realtime operations or as a replacement for `cron` or similar
60
+ # services. This level of accuracy is sufficient for the use cases this
61
+ # timer was intended to solve.
62
+ #
63
+ # @!macro monotonic_clock_warning
45
64
def post ( intended_time , *args , &task )
46
- time = TimerSet . calculate_schedule_time ( intended_time ) . to_f
47
65
raise ArgumentError . new ( 'no block given' ) unless block_given?
66
+ interval = calculate_interval ( intended_time )
48
67
49
68
mutex . synchronize do
50
69
return false unless running?
51
70
52
- if ( time - Time . now . to_f ) <= 0.01
71
+ if ( interval ) <= 0.01
53
72
@task_executor . post ( *args , &task )
54
73
else
55
- @queue . push ( Task . new ( time , args , task ) )
74
+ @queue . push ( Task . new ( Concurrent . monotonic_time + interval , args , task ) )
56
75
@timer_executor . post ( &method ( :process_tasks ) )
57
76
end
58
77
end
@@ -68,31 +87,33 @@ def kill
68
87
shutdown
69
88
end
70
89
71
- # Calculate an Epoch time with milliseconds at which to execute a
90
+ private
91
+
92
+ # Calculate a time interval with milliseconds at which to execute a
72
93
# task. If the given time is a `Time` object it will be converted
73
- # accordingly. If the time is an integer value greater than zero
74
- # it will be understood as a number of seconds in the future and
75
- # will be added to the current time to calculate Epoch.
94
+ # accordingly. If the time is a floating point value greater than
95
+ # zero it will be understood as a number of seconds in the future.
76
96
#
77
- # @param [Object ] intended_time the time (as a `Time` object or an integer)
78
- # to schedule the task for execution
79
- # @param [Time] now (Time.now) the time from which to calculate an interval
97
+ # @param [Time, Float ] intended_time the time to schedule the task for
98
+ # execution, expressed as a `Time` object or a floating point number
99
+ # representing a number of seconds
80
100
#
81
- # @return [Fixnum ] the intended time as seconds/millis from Epoch
101
+ # @return [Float ] the intended time interval as seconds/millis
82
102
#
83
103
# @raise [ArgumentError] if the intended execution time is not in the future
84
- def self . calculate_schedule_time ( intended_time , now = Time . now )
104
+ #
105
+ # @!macro convert_time_to_interval_warning
106
+ def calculate_interval ( intended_time )
85
107
if intended_time . is_a? ( Time )
108
+ now = Time . now
86
109
raise ArgumentError . new ( 'schedule time must be in the future' ) if intended_time <= now
87
- intended_time
110
+ intended_time . to_f - now . to_f
88
111
else
89
112
raise ArgumentError . new ( 'seconds must be greater than zero' ) if intended_time . to_f < 0.0
90
- now + intended_time
113
+ intended_time . to_f
91
114
end
92
115
end
93
116
94
- private
95
-
96
117
# A struct for encapsulating a task and its intended execution time.
97
118
# It facilitates proper prioritization by overriding the comparison
98
119
# (spaceship) operator as a comparison of the intended execution
@@ -126,9 +147,11 @@ def process_tasks
126
147
loop do
127
148
task = mutex . synchronize { @queue . peek }
128
149
break unless task
129
- interval = task . time - Time . now . to_f
130
150
131
- if interval <= 0
151
+ now = Concurrent . monotonic_time
152
+ diff = task . time - now
153
+
154
+ if diff <= 0
132
155
# We need to remove the task from the queue before passing
133
156
# it to the executor, to avoid race conditions where we pass
134
157
# the peek'ed task to the executor and then pop a different
@@ -145,7 +168,7 @@ def process_tasks
145
168
@task_executor . post ( *task . args , &task . op )
146
169
else
147
170
mutex . synchronize do
148
- @condition . wait ( mutex , [ interval , 60 ] . min )
171
+ @condition . wait ( mutex , [ diff , 60 ] . min )
149
172
end
150
173
end
151
174
end
0 commit comments