Skip to content

Commit d65d5f0

Browse files
committed
Updated docs for TimerSet, ScheduledTask, and Concurrent#timer.
1 parent e20922d commit d65d5f0

File tree

5 files changed

+117
-46
lines changed

5 files changed

+117
-46
lines changed

lib/concurrent/executor/timer_set.rb

Lines changed: 27 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,10 @@ module Concurrent
88

99
# Executes a collection of tasks, each after a given delay. A master task
1010
# monitors the set and schedules each task for execution at the appropriate
11-
# time. Tasks are run on the global task pool or on the supplied executor.
11+
# time. Tasks are run on the global thread pool or on the supplied executor.
12+
# Each task is represented as a `ScheduledTask`.
13+
#
14+
# @see Concurrent::ScheduledTask
1215
#
1316
# @!macro monotonic_clock_warning
1417
class TimerSet < RubyExecutorService
@@ -30,15 +33,16 @@ def initialize(opts = {})
3033
# delay is less than 1/100th of a second the task will be immediately post
3134
# to the executor.
3235
#
33-
# @param [Float] delay the number of seconds to wait for before executing the task
36+
# @param [Float] delay the number of seconds to wait for before executing the task.
37+
# @param [Array<Object>] args the arguments passed to the task on execution.
3438
#
35-
# @yield the task to be performed
39+
# @yield the task to be performed.
3640
#
37-
# @return [Concurrent::TimerSet::Task, false] IVar representing the task if the post
38-
# is successful; false after shutdown
41+
# @return [Concurrent::ScheduledTask, false] IVar representing the task if the post
42+
# is successful; false after shutdown.
3943
#
40-
# @raise [ArgumentError] if the intended execution time is not in the future
41-
# @raise [ArgumentError] if no block is given
44+
# @raise [ArgumentError] if the intended execution time is not in the future.
45+
# @raise [ArgumentError] if no block is given.
4246
#
4347
# @!macro deprecated_scheduling_by_clock_time
4448
def post(delay, *args, &task)
@@ -65,6 +69,9 @@ def kill
6569

6670
protected
6771

72+
# Initialize the object.
73+
#
74+
# @param [Hash] opts the options to create the object with.
6875
# @!visibility private
6976
def ns_initialize(opts)
7077
@queue = PriorityQueue.new(order: :min)
@@ -74,14 +81,21 @@ def ns_initialize(opts)
7481
self.auto_terminate = opts.fetch(:auto_terminate, true)
7582
end
7683

84+
# Post the task to the internal queue.
85+
#
86+
# @note This is intended as a callback method from ScheduledTask
87+
# only. It is not intended to be used directly. Post a task
88+
# by using the `SchedulesTask#execute` method.
89+
#
90+
# @!visibility private
7791
def post_task(task)
7892
synchronize{ ns_post_task(task) }
7993
end
8094

8195
# @!visibility private
8296
def ns_post_task(task)
8397
return false unless ns_running?
84-
if (task.original_delay) <= 0.01
98+
if (task.initial_delay) <= 0.01
8599
task.executor.post{ task.process_task }
86100
else
87101
@queue.push(task)
@@ -94,15 +108,17 @@ def ns_post_task(task)
94108

95109
# Remove the given task from the queue.
96110
#
97-
# @note This is intended as a callback method from Task only.
98-
# It is not intended to be used directly. Cancel a task by
99-
# using the `Task#cancel` method.
111+
# @note This is intended as a callback method from `ScheduledTask`
112+
# only. It is not intended to be used directly. Cancel a task
113+
# by using the `ScheduledTask#cancel` method.
100114
#
101115
# @!visibility private
102116
def remove_task(task)
103117
synchronize{ @queue.delete(task) }
104118
end
105119

120+
# `ExecutorServic` callback called during shutdown.
121+
#
106122
# @!visibility private
107123
def shutdown_execution
108124
@queue.clear

lib/concurrent/scheduled_task.rb

Lines changed: 73 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ module Concurrent
1212
# whereas a `ScheduledTask` is set to execute after a specified delay. This
1313
# implementation is loosely based on Java's
1414
# [ScheduledExecutorService](http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/ScheduledExecutorService.html).
15+
# It is a more feature-rich variant of {Concurrent.timer}.
1516
#
1617
# The *intended* schedule time of task execution is set on object construction
1718
# with the `delay` argument. The delay is a numeric (floating point or integer)
@@ -135,19 +136,29 @@ module Concurrent
135136
# #>> The task completed at 2013-11-07 12:26:09 -0500 with value 'What does the fox say?'
136137
#
137138
# @!macro monotonic_clock_warning
139+
#
140+
# @see Concurrent.timer
138141
class ScheduledTask < IVar
139142
include Comparable
140143

144+
# The executor on which to execute the task.
145+
# @!visibility private
141146
attr_reader :executor
142147

143148
# Schedule a task for execution at a specified future time.
144149
#
145-
# @yield the task to be performed
146-
#
147150
# @param [Float] delay the number of seconds to wait for before executing the task
148151
#
152+
# @yield the task to be performed
153+
#
149154
# @!macro executor_and_deref_options
150155
#
156+
# @option opts [object, Array] :args zero or more arguments to be passed the task
157+
# block on execution
158+
#
159+
# @raise [ArgumentError] When no block is given
160+
# @raise [ArgumentError] When given a time that is in the past
161+
#
151162
# @!macro [attach] deprecated_scheduling_by_clock_time
152163
#
153164
# @note Scheduling is now based on a monotonic clock. This makes the timer much
@@ -158,8 +169,7 @@ def initialize(delay, opts = {}, &task)
158169
raise ArgumentError.new('no block given') unless block_given?
159170
super(IVar::NO_VALUE, opts, &nil)
160171
synchronize do
161-
@original_delay = delay
162-
ns_set_delay_and_time!(delay) # may raise exception
172+
@delay = calculate_delay!(delay) # may raise exception
163173
ns_set_state(:unscheduled)
164174
@parent = opts.fetch(:timer_set, Concurrent.global_timer_set)
165175
@args = get_arguments_from(opts)
@@ -170,20 +180,33 @@ def initialize(delay, opts = {}, &task)
170180
end
171181
end
172182

173-
def original_delay
183+
# The `delay` value given at instanciation.
184+
#
185+
# @return [Float] the initial delay.
186+
def initial_delay
174187
synchronize { @delay }
175188
end
176189

177-
# @deprecated
190+
# The `delay` value given at instanciation.
191+
#
192+
# @return [Float] the initial delay.
193+
#
194+
# @deprecated use {#initial_delay} instead
178195
def delay
179-
warn '[DEPRECATED] use #original_delay instead'
180-
original_delay
196+
warn '[DEPRECATED] use #initial_delay instead'
197+
initial_delay
181198
end
182199

200+
# The monotonic time at which the the task is scheduled to be executed.
201+
#
202+
# @return [Float] the schedule time or nil if `unscheduled`
183203
def schedule_time
184204
synchronize { @time }
185205
end
186206

207+
# Comparator which orders by schedule time.
208+
#
209+
# @!visibility private
187210
def <=>(other)
188211
self.schedule_time <=> other.schedule_time
189212
end
@@ -206,7 +229,7 @@ def processing?
206229
#
207230
# @return [Boolean] true if the task is in the given state else false
208231
#
209-
# @deprecated
232+
# @deprecated Use {#processing?} instead.
210233
def in_progress?
211234
warn '[DEPRECATED] use #processing? instead'
212235
processing?
@@ -215,8 +238,7 @@ def in_progress?
215238
# Cancel this task and prevent it from executing. A task can only be
216239
# cancelled if it is pending or unscheduled.
217240
#
218-
# @return [Boolean] true if task execution is successfully cancelled
219-
# else false
241+
# @return [Boolean] true if successfully cancelled else false
220242
def cancel
221243
if compare_and_set_state(:cancelled, :pending, :unscheduled)
222244
complete(false, nil, CancelledOperationError.new)
@@ -229,23 +251,34 @@ def cancel
229251
end
230252

231253
# Cancel this task and prevent it from executing. A task can only be
232-
# cancelled if it is pending or unscheduled.
254+
# cancelled if it is `:pending` or `:unscheduled`.
233255
#
234-
# @return [Boolean] true if task execution is successfully cancelled
235-
# else false
256+
# @return [Boolean] true if successfully cancelled else false
236257
#
237-
# @deprecated
258+
# @deprecated Use {#cancel} instead.
238259
def stop
239260
warn '[DEPRECATED] use #cancel instead'
240261
cancel
241262
end
242263

264+
# Reschedule the task using the original delay and the current time.
265+
# A task can only be reset while it is `:pending`.
266+
#
267+
# @return [Boolean] true if successfully rescheduled else false
243268
def reset
244269
synchronize{ ns_reschedule(@delay) }
245270
end
246271

272+
# Reschedule the task using the given delay and the current time.
273+
# A task can only be reset while it is `:pending`.
274+
#
275+
# @param [Float] delay the number of seconds to wait for before executing the task
276+
#
277+
# @return [Boolean] true if successfully rescheduled else false
278+
#
279+
# @raise [ArgumentError] When given a time that is in the past
247280
def reschedule(delay)
248-
synchronize{ ns_reschedule(delay) }
281+
synchronize{ ns_reschedule(calculate_delay!(delay)) }
249282
end
250283

251284
# Execute an `:unscheduled` `ScheduledTask`. Immediately sets the state to `:pending`
@@ -255,7 +288,7 @@ def reschedule(delay)
255288
# @return [ScheduledTask] a reference to `self`
256289
def execute
257290
if compare_and_set_state(:pending, :unscheduled)
258-
synchronize{ ns_reschedule(@original_delay, false) }
291+
synchronize{ ns_schedule(@delay) }
259292
end
260293
self
261294
end
@@ -276,6 +309,8 @@ def self.execute(delay, opts = {}, &task)
276309
new(delay, opts, &task).execute
277310
end
278311

312+
# Execute the task.
313+
#
279314
# @!visibility private
280315
def process_task
281316
safe_execute(@task, @args)
@@ -285,20 +320,33 @@ def process_task
285320

286321
protected
287322

288-
def ns_set_delay_and_time!(delay)
289-
@delay = calculate_delay!(delay)
323+
# Schedule the task using the given delay and the current time.
324+
#
325+
# @param [Float] delay the number of seconds to wait for before executing the task
326+
#
327+
# @return [Boolean] true if successfully rescheduled else false
328+
#
329+
# @!visibility private
330+
def ns_schedule(delay)
331+
@delay = delay
290332
@time = Concurrent.monotonic_time + @delay
333+
@parent.send(:post_task, self)
291334
end
292335

293-
def ns_reschedule(delay, fail_if_cannot_remove = true)
336+
# Reschedule the task using the given delay and the current time.
337+
# A task can only be reset while it is `:pending`.
338+
#
339+
# @param [Float] delay the number of seconds to wait for before executing the task
340+
#
341+
# @return [Boolean] true if successfully rescheduled else false
342+
#
343+
# @!visibility private
344+
def ns_reschedule(delay)
294345
return false unless ns_check_state?(:pending)
295-
ns_set_delay_and_time!(delay)
296-
removed = @parent.send(:remove_task, self)
297-
return false if fail_if_cannot_remove && !removed
298-
@parent.send(:post_task, self)
346+
@parent.send(:remove_task, self) && ns_schedule(delay)
299347
end
300348

301-
# Schedule a task to be executed after a given delay (in seconds).
349+
# Calculate the actual delay in seconds based on the given delay.
302350
#
303351
# @param [Float] delay the number of seconds to wait for before executing the task
304352
#

lib/concurrent/utility/timer.rb

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,11 +5,18 @@ module Concurrent
55

66
# Perform the given operation asynchronously after the given number of seconds.
77
#
8+
# This is a convenience method for posting tasks to the global timer set.
9+
# It is intended to be simple and easy to use. For greater control use
10+
# either `TimerSet` or `ScheduledTask` directly.
11+
#
812
# @param [Fixnum] seconds the interval in seconds to wait before executing the task
913
#
1014
# @yield the task to execute
1115
#
12-
# @return [Concurrent::TimerSet::Task] IVar representing the task
16+
# @return [Concurrent::ScheduledTask] IVar representing the task
17+
#
18+
# @see Concurrent::ScheduledTask
19+
# @see Concurrent::TimerSet
1320
#
1421
# @!macro monotonic_clock_warning
1522
def timer(seconds, *args, &block)

spec/concurrent/executor/timer_set_spec.rb

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -252,14 +252,14 @@ module Concurrent
252252
end
253253

254254
it 'reschdules a pending and unpost task when given a valid time' do
255-
original_delay = 10
255+
initial_delay = 10
256256
rescheduled_delay = 20
257257
expect(queue).to receive(:push).twice.with(any_args).and_call_original
258-
task = subject.post(original_delay){ nil }
258+
task = subject.post(initial_delay){ nil }
259259
original_schedule = task.schedule_time
260260
success = task.reschedule(rescheduled_delay)
261261
expect(success).to be true
262-
expect(task.original_delay).to be_within(0.01).of(rescheduled_delay)
262+
expect(task.initial_delay).to be_within(0.01).of(rescheduled_delay)
263263
expect(task.schedule_time).to be > original_schedule
264264
end
265265

@@ -323,9 +323,9 @@ module Concurrent
323323
context 'task resetting' do
324324

325325
it 'calls #reschedule with the original delay' do
326-
original_delay = 10
327-
task = subject.post(original_delay){ nil }
328-
expect(task).to receive(:ns_reschedule).with(original_delay)
326+
initial_delay = 10
327+
task = subject.post(initial_delay){ nil }
328+
expect(task).to receive(:ns_reschedule).with(initial_delay)
329329
task.reset
330330
end
331331
end

spec/concurrent/scheduled_task_spec.rb

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ def trigger_observable(observable)
6666
Timecop.freeze do
6767
now = Time.now
6868
task = ScheduledTask.new(expected){ nil }.execute
69-
expect(task.original_delay).to be_within(0.1).of(expected)
69+
expect(task.initial_delay).to be_within(0.1).of(expected)
7070
end
7171
end
7272

@@ -75,7 +75,7 @@ def trigger_observable(observable)
7575
expected = 60 * 10
7676
schedule = Time.now + expected
7777
task = ScheduledTask.new(schedule){ nil }.execute
78-
expect(task.original_delay).to be_within(0.1).of(expected)
78+
expect(task.initial_delay).to be_within(0.1).of(expected)
7979
end
8080

8181
it 'raises an exception when seconds is less than zero' do
@@ -165,7 +165,7 @@ def trigger_observable(observable)
165165

166166
it 'uses the :executor from the options' do
167167
latch = Concurrent::CountDownLatch.new
168-
executor = Concurrent::ImmediateExecutor.new
168+
executor = Concurrent::SingleThreadExecutor.new
169169
expect(executor).to receive(:post).once.with(any_args).and_call_original
170170
task = ScheduledTask.execute(0.1, executor: executor) do
171171
latch.count_down

0 commit comments

Comments
 (0)