Skip to content

Commit ebde179

Browse files
committed
ScheduledTask noe uses global timer pool.
1 parent 569194b commit ebde179

File tree

4 files changed

+42
-60
lines changed

4 files changed

+42
-60
lines changed

lib/concurrent/executor/timer_set.rb

Lines changed: 27 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ def post(intended_time, &block)
7070
@mutex.synchronize do
7171
return false if shutdown?
7272
raise ArgumentError.new('no block given') unless block_given?
73-
time = calculate_schedule_time(intended_time)
73+
time = TimerSet.calculate_schedule_time(intended_time).to_f
7474

7575
if (time - Time.now.to_f) <= 0.01
7676
@executor.post(&block)
@@ -82,6 +82,9 @@ def post(intended_time, &block)
8282
true
8383
end
8484

85+
# Begin an orderly shutdown. Tasks already in the queue will be executed,
86+
# but no new tasks will be accepted. Has no additional effect if the
87+
# thread pool is not running.
8588
def shutdown
8689
@mutex.synchronize do
8790
unless @shutdown.set?
@@ -94,6 +97,29 @@ def shutdown
9497
end
9598
alias_method :kill, :shutdown
9699

100+
# Calculate an Epoch time with milliseconds at which to execute a
101+
# task. If the given time is a `Time` object it will be converted
102+
# accordingly. If the time is an integer value greate than zero
103+
# it will be understood as a number of seconds in the future and
104+
# will be added to the current time to calculate Epoch.
105+
#
106+
# @param [Object] intended_time the time (as a `Time` object or an integer)
107+
# to schedule the task for execution
108+
# @param [Time] now (Time.now) the time from which to calculate an interval
109+
#
110+
# @return [Fixnum] the intended time as seconds/millis from Epoch
111+
#
112+
# @raise [ArgumentError] if the intended execution time is not in the future
113+
def self.calculate_schedule_time(intended_time, now = Time.now)
114+
if intended_time.is_a?(Time)
115+
raise ArgumentError.new('schedule time must be in the future') if intended_time <= now
116+
intended_time
117+
else
118+
raise ArgumentError.new('seconds must be greater than zero') if intended_time.to_f < 0.0
119+
now + intended_time
120+
end
121+
end
122+
97123
private
98124

99125
# A struct for encapsulating a task and its intended execution time.
@@ -109,25 +135,6 @@ def <=>(other)
109135
end
110136
end
111137

112-
# Calculate an Epoch time with milliseconds at which to execute a
113-
# task. If the given time is a `Time` object it will be converted
114-
# accordingly. If the time is an integer value greate than zero
115-
# it will be understood as a number of seconds in the future and
116-
# will be added to the current time to calculate Epoch.
117-
#
118-
# @raise [ArgumentError] if the intended execution time is not in the future
119-
#
120-
# @!visibility private
121-
def calculate_schedule_time(intended_time, now = Time.now)
122-
if intended_time.is_a?(Time)
123-
raise ArgumentError.new('schedule time must be in the future') if intended_time <= now
124-
intended_time.to_f
125-
else
126-
raise ArgumentError.new('seconds must be greater than zero') if intended_time.to_f < 0.0
127-
now.to_f + intended_time.to_f
128-
end
129-
end
130-
131138
# Check the status of the processing thread. This thread is responsible
132139
# for monitoring the internal task queue and sending tasks to the
133140
# executor when it is time for them to be processed. If there is no

lib/concurrent/scheduled_task.rb

Lines changed: 9 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -6,34 +6,31 @@ module Concurrent
66

77
class ScheduledTask < IVar
88

9-
SchedulingError = Class.new(ArgumentError)
10-
119
attr_reader :schedule_time
1210

13-
def initialize(schedule_time, opts = {}, &block)
14-
raise SchedulingError.new('no block given') unless block_given?
15-
calculate_schedule_time!(schedule_time) # raise exception if in past
11+
def initialize(intended_time, opts = {}, &block)
12+
raise ArgumentError.new('no block given') unless block_given?
13+
TimerSet.calculate_schedule_time(intended_time) # raises exceptons
1614

1715
super(NO_VALUE, opts)
1816

17+
@intended_time = intended_time
1918
@state = :unscheduled
20-
@intended_schedule_time = schedule_time
21-
@schedule_time = nil
2219
@task = block
2320
end
2421

2522
# @since 0.5.0
2623
def execute
2724
if compare_and_set_state(:pending, :unscheduled)
28-
@schedule_time = calculate_schedule_time!(@intended_schedule_time).freeze
29-
do_next_interval
25+
@schedule_time = TimerSet.calculate_schedule_time(@intended_time)
26+
Concurrent::timer(@schedule_time.to_f - Time.now.to_f, &method(:process_task))
3027
self
3128
end
3229
end
3330

3431
# @since 0.5.0
35-
def self.execute(schedule_time, opts = {}, &block)
36-
return ScheduledTask.new(schedule_time, opts, &block).execute
32+
def self.execute(intended_time, opts = {}, &block)
33+
return ScheduledTask.new(intended_time, opts, &block).execute
3734
end
3835

3936
def cancelled?
@@ -63,7 +60,7 @@ def add_observer(*args)
6360

6461
private
6562

66-
def do_work
63+
def process_task
6764
if compare_and_set_state(:in_progress, :pending)
6865
success, val, reason = SafeTaskExecutor.new(@task).execute
6966

@@ -76,29 +73,5 @@ def do_work
7673
observers.notify_and_delete_observers{ [time, self.value, reason] }
7774
end
7875
end
79-
80-
def do_next_interval
81-
return if cancelled?
82-
83-
interval = mutex.synchronize do
84-
[60, [(@schedule_time.to_f - Time.now.to_f), 0].max].min
85-
end
86-
87-
if interval > 0
88-
Concurrent::timer(interval, &method(:do_next_interval))
89-
else
90-
do_work
91-
end
92-
end
93-
94-
def calculate_schedule_time!(schedule_time, now = Time.now)
95-
if schedule_time.is_a?(Time)
96-
raise SchedulingError.new('schedule time must be in the future') if schedule_time <= now
97-
schedule_time.dup
98-
else
99-
raise SchedulingError.new('seconds must be greater than zero') if schedule_time.to_f <= 0.0
100-
now + schedule_time.to_f
101-
end
102-
end
10376
end
10477
end

lib/concurrent/utilities/timer.rb

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,9 @@ module Concurrent
66
# Perform the given operation asynchronously after the given number of seconds.
77
#
88
# @param [Fixnum] seconds the interval in seconds to wait before executing the task
9+
#
910
# @yield the task to execute
11+
#
1012
# @return [Boolean] true
1113
def timer(seconds, &block)
1214
raise ArgumentError.new('no block given') unless block_given?

spec/concurrent/scheduled_task_spec.rb

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -65,19 +65,19 @@ def execute_dereferenceable(subject)
6565
it 'raises an exception when seconds is less than zero' do
6666
expect {
6767
ScheduledTask.new(-1){ nil }
68-
}.to raise_error(ScheduledTask::SchedulingError)
68+
}.to raise_error(ArgumentError)
6969
end
7070

7171
it 'raises an exception when schedule time is in the past' do
7272
expect {
7373
ScheduledTask.new(Time.now - 60){ nil }
74-
}.to raise_error(ScheduledTask::SchedulingError)
74+
}.to raise_error(ArgumentError)
7575
end
7676

7777
it 'raises an exception when no block given' do
7878
expect {
7979
ScheduledTask.new(1)
80-
}.to raise_error(ScheduledTask::SchedulingError)
80+
}.to raise_error(ArgumentError)
8181
end
8282

8383
it 'sets the initial state to :unscheduled' do
@@ -121,7 +121,7 @@ def execute_dereferenceable(subject)
121121
Timecop.travel(60)
122122
expect {
123123
task.execute
124-
}.to raise_error(ScheduledTask::SchedulingError)
124+
}.to raise_error(ArgumentError)
125125
end
126126
end
127127

0 commit comments

Comments
 (0)