Skip to content

Commit e20922d

Browse files
committed
TimerSet now creates ScheduledTasks.
1 parent 9174a1d commit e20922d

File tree

2 files changed

+147
-153
lines changed

2 files changed

+147
-153
lines changed

lib/concurrent/executor/timer_set.rb

Lines changed: 9 additions & 122 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,8 @@
1-
require 'concurrent/errors'
2-
require 'concurrent/ivar'
1+
require 'concurrent/scheduled_task'
32
require 'concurrent/atomic/event'
43
require 'concurrent/collection/priority_queue'
5-
require 'concurrent/executor/executor'
64
require 'concurrent/executor/executor_service'
75
require 'concurrent/executor/single_thread_executor'
8-
require 'concurrent/utility/monotonic_time'
96

107
module Concurrent
118

@@ -16,97 +13,6 @@ module Concurrent
1613
# @!macro monotonic_clock_warning
1714
class TimerSet < RubyExecutorService
1815

19-
# An `IVar` representing a tasked queued for execution in a `TimerSet`.
20-
class Task < Concurrent::IVar
21-
include Comparable
22-
23-
attr_reader :executor
24-
25-
def initialize(parent, delay, args, task, opts = {})
26-
super(IVar::NO_VALUE, opts, &nil)
27-
synchronize do
28-
ns_set_delay_and_time!(delay)
29-
@parent = parent
30-
@args = args
31-
@task = task
32-
@executor = Executor.executor_from_options(opts) || Concurrent.global_io_executor
33-
self.observers = CopyOnNotifyObserverSet.new
34-
end
35-
end
36-
37-
def original_delay
38-
synchronize { @delay }
39-
end
40-
41-
def schedule_time
42-
synchronize { @time }
43-
end
44-
45-
def <=>(other)
46-
self.schedule_time <=> other.schedule_time
47-
end
48-
49-
# Has the task been cancelled?
50-
#
51-
# @return [Boolean] true if the task is in the given state else false
52-
def cancelled?
53-
synchronize { ns_check_state?(:cancelled) }
54-
end
55-
56-
# In the task execution in progress?
57-
#
58-
# @return [Boolean] true if the task is in the given state else false
59-
def processing?
60-
synchronize { ns_check_state?(:processing) }
61-
end
62-
63-
# Cancel this task and prevent it from executing. A task can only be
64-
# cancelled if it is pending or unscheduled.
65-
#
66-
# @return [Boolean] true if task execution is successfully cancelled
67-
# else false
68-
def cancel
69-
if compare_and_set_state(:cancelled, :pending, :unscheduled)
70-
complete(false, nil, CancelledOperationError.new)
71-
# To avoid deadlocks this call must occur outside of #synchronize
72-
# Changing the state above should prevent redundant calls
73-
@parent.send(:remove_task, self)
74-
else
75-
false
76-
end
77-
end
78-
79-
def reset
80-
synchronize{ ns_reschedule(@delay) }
81-
end
82-
83-
def reschedule(delay)
84-
synchronize{ ns_reschedule(delay) }
85-
end
86-
87-
# @!visibility private
88-
def process_task
89-
safe_execute(@task, @args)
90-
end
91-
92-
protected :set, :try_set, :fail, :complete
93-
94-
protected
95-
96-
def ns_set_delay_and_time!(delay)
97-
@delay = TimerSet.calculate_delay!(delay)
98-
@time = Concurrent.monotonic_time + @delay
99-
end
100-
101-
def ns_reschedule(delay, fail_if_cannot_remove = true)
102-
return false unless ns_check_state?(:pending)
103-
ns_set_delay_and_time!(delay)
104-
removed = @parent.send(:remove_task, self)
105-
return false if fail_if_cannot_remove && !removed
106-
@parent.send(:post_task, self)
107-
end
108-
end
109-
11016
# Create a new set of timed tasks.
11117
#
11218
# @!macro [attach] executor_options
@@ -137,9 +43,14 @@ def initialize(opts = {})
13743
# @!macro deprecated_scheduling_by_clock_time
13844
def post(delay, *args, &task)
13945
raise ArgumentError.new('no block given') unless block_given?
140-
task = Task.new(self, delay, args, task, {executor: @task_executor}) # may raise exception
141-
ok = synchronize{ ns_post_task(task) }
142-
ok ? task : false
46+
return false unless running?
47+
opts = {
48+
executor: @task_executor,
49+
args: args,
50+
timer_set: self
51+
}
52+
task = ScheduledTask.execute(delay, opts, &task) # may raise exception
53+
task.unscheduled? ? false : task
14354
end
14455

14556
# Begin an immediate shutdown. In-progress tasks will be allowed to
@@ -150,30 +61,6 @@ def kill
15061
shutdown
15162
end
15263

153-
# Schedule a task to be executed after a given delay (in seconds).
154-
#
155-
# @param [Float] delay the number of seconds to wait for before executing the task
156-
#
157-
# @return [Float] the number of seconds to delay
158-
#
159-
# @raise [ArgumentError] if the intended execution time is not in the future
160-
# @raise [ArgumentError] if no block is given
161-
#
162-
# @!macro deprecated_scheduling_by_clock_time
163-
#
164-
# @!visibility private
165-
def self.calculate_delay!(delay)
166-
if delay.is_a?(Time)
167-
warn '[DEPRECATED] Use an interval not a clock time; schedule is now based on a monotonic clock'
168-
now = Time.now
169-
raise ArgumentError.new('schedule time must be in the future') if delay <= now
170-
delay.to_f - now.to_f
171-
else
172-
raise ArgumentError.new('seconds must be greater than zero') if delay.to_f < 0.0
173-
delay.to_f
174-
end
175-
end
176-
17764
private :<<
17865

17966
protected

lib/concurrent/scheduled_task.rb

Lines changed: 138 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,9 @@
1+
require 'concurrent/errors'
2+
require 'concurrent/ivar'
13
require 'concurrent/configuration'
4+
require 'concurrent/executor/executor'
25
require 'concurrent/executor/timer_set'
6+
require 'concurrent/utility/monotonic_time'
37

48
module Concurrent
59

@@ -131,7 +135,10 @@ module Concurrent
131135
# #>> The task completed at 2013-11-07 12:26:09 -0500 with value 'What does the fox say?'
132136
#
133137
# @!macro monotonic_clock_warning
134-
class ScheduledTask < TimerSet::Task
138+
class ScheduledTask < IVar
139+
include Comparable
140+
141+
attr_reader :executor
135142

136143
# Schedule a task for execution at a specified future time.
137144
#
@@ -147,26 +154,108 @@ class ScheduledTask < TimerSet::Task
147154
# more accurate, but only when scheduling based on a delay interval.
148155
# Scheduling a task based on a clock time is deprecated. It will still work
149156
# but will not be supported in the 1.0 release.
150-
def initialize(delay, opts = {}, &block)
157+
def initialize(delay, opts = {}, &task)
151158
raise ArgumentError.new('no block given') unless block_given?
152-
timer_set = opts.fetch(:timer_set, Concurrent.global_timer_set)
153-
args = get_arguments_from(opts)
154-
super(timer_set, delay, args, block, opts, &nil)
159+
super(IVar::NO_VALUE, opts, &nil)
155160
synchronize do
161+
@original_delay = delay
162+
ns_set_delay_and_time!(delay) # may raise exception
156163
ns_set_state(:unscheduled)
157-
@__original_delay__ = delay
164+
@parent = opts.fetch(:timer_set, Concurrent.global_timer_set)
165+
@args = get_arguments_from(opts)
166+
@task = task
158167
@time = nil
168+
@executor = Executor.executor_from_options(opts) || Concurrent.global_io_executor
169+
self.observers = CopyOnNotifyObserverSet.new
159170
end
160171
end
161172

173+
def original_delay
174+
synchronize { @delay }
175+
end
176+
177+
# @deprecated
178+
def delay
179+
warn '[DEPRECATED] use #original_delay instead'
180+
original_delay
181+
end
182+
183+
def schedule_time
184+
synchronize { @time }
185+
end
186+
187+
def <=>(other)
188+
self.schedule_time <=> other.schedule_time
189+
end
190+
191+
# Has the task been cancelled?
192+
#
193+
# @return [Boolean] true if the task is in the given state else false
194+
def cancelled?
195+
synchronize { ns_check_state?(:cancelled) }
196+
end
197+
198+
# In the task execution in progress?
199+
#
200+
# @return [Boolean] true if the task is in the given state else false
201+
def processing?
202+
synchronize { ns_check_state?(:processing) }
203+
end
204+
205+
# In the task execution in progress?
206+
#
207+
# @return [Boolean] true if the task is in the given state else false
208+
#
209+
# @deprecated
210+
def in_progress?
211+
warn '[DEPRECATED] use #processing? instead'
212+
processing?
213+
end
214+
215+
# Cancel this task and prevent it from executing. A task can only be
216+
# cancelled if it is pending or unscheduled.
217+
#
218+
# @return [Boolean] true if task execution is successfully cancelled
219+
# else false
220+
def cancel
221+
if compare_and_set_state(:cancelled, :pending, :unscheduled)
222+
complete(false, nil, CancelledOperationError.new)
223+
# To avoid deadlocks this call must occur outside of #synchronize
224+
# Changing the state above should prevent redundant calls
225+
@parent.send(:remove_task, self)
226+
else
227+
false
228+
end
229+
end
230+
231+
# Cancel this task and prevent it from executing. A task can only be
232+
# cancelled if it is pending or unscheduled.
233+
#
234+
# @return [Boolean] true if task execution is successfully cancelled
235+
# else false
236+
#
237+
# @deprecated
238+
def stop
239+
warn '[DEPRECATED] use #cancel instead'
240+
cancel
241+
end
242+
243+
def reset
244+
synchronize{ ns_reschedule(@delay) }
245+
end
246+
247+
def reschedule(delay)
248+
synchronize{ ns_reschedule(delay) }
249+
end
250+
162251
# Execute an `:unscheduled` `ScheduledTask`. Immediately sets the state to `:pending`
163252
# and starts counting down toward execution. Does nothing if the `ScheduledTask` is
164253
# in any state other than `:unscheduled`.
165254
#
166255
# @return [ScheduledTask] a reference to `self`
167256
def execute
168257
if compare_and_set_state(:pending, :unscheduled)
169-
synchronize{ ns_reschedule(@__original_delay__, false) }
258+
synchronize{ ns_reschedule(@original_delay, false) }
170259
end
171260
self
172261
end
@@ -183,36 +272,54 @@ def execute
183272
# @raise [ArgumentError] if no block is given
184273
#
185274
# @!macro deprecated_scheduling_by_clock_time
186-
def self.execute(delay, opts = {}, &block)
187-
new(delay, opts, &block).execute
275+
def self.execute(delay, opts = {}, &task)
276+
new(delay, opts, &task).execute
188277
end
189278

190-
# In the task execution in progress?
191-
#
192-
# @return [Boolean] true if the task is in the given state else false
193-
#
194-
# @deprecated
195-
def in_progress?
196-
warn '[DEPRECATED] use #processing? instead'
197-
processing?
279+
# @!visibility private
280+
def process_task
281+
safe_execute(@task, @args)
198282
end
199283

200-
# Cancel this task and prevent it from executing. A task can only be
201-
# cancelled if it is pending or unscheduled.
202-
#
203-
# @return [Boolean] true if task execution is successfully cancelled
204-
# else false
205-
#
206-
# @deprecated
207-
def stop
208-
warn '[DEPRECATED] use #processing? instead'
209-
cancel
284+
protected :set, :try_set, :fail, :complete
285+
286+
protected
287+
288+
def ns_set_delay_and_time!(delay)
289+
@delay = calculate_delay!(delay)
290+
@time = Concurrent.monotonic_time + @delay
210291
end
211292

212-
# @deprecated
213-
def delay
214-
warn '[DEPRECATED] use #original_delay instead'
215-
original_delay
293+
def ns_reschedule(delay, fail_if_cannot_remove = true)
294+
return false unless ns_check_state?(:pending)
295+
ns_set_delay_and_time!(delay)
296+
removed = @parent.send(:remove_task, self)
297+
return false if fail_if_cannot_remove && !removed
298+
@parent.send(:post_task, self)
299+
end
300+
301+
# Schedule a task to be executed after a given delay (in seconds).
302+
#
303+
# @param [Float] delay the number of seconds to wait for before executing the task
304+
#
305+
# @return [Float] the number of seconds to delay
306+
#
307+
# @raise [ArgumentError] if the intended execution time is not in the future
308+
# @raise [ArgumentError] if no block is given
309+
#
310+
# @!macro deprecated_scheduling_by_clock_time
311+
#
312+
# @!visibility private
313+
def calculate_delay!(delay)
314+
if delay.is_a?(Time)
315+
warn '[DEPRECATED] Use an interval not a clock time; schedule is now based on a monotonic clock'
316+
now = Time.now
317+
raise ArgumentError.new('schedule time must be in the future') if delay <= now
318+
delay.to_f - now.to_f
319+
else
320+
raise ArgumentError.new('seconds must be greater than zero') if delay.to_f < 0.0
321+
delay.to_f
322+
end
216323
end
217324
end
218325
end

0 commit comments

Comments
 (0)