Skip to content

Commit fb742a5

Browse files
committed
Refactored ScheduledTask with IVar and Concurrent::timer.
1 parent 6ba70be commit fb742a5

File tree

2 files changed

+21
-26
lines changed

2 files changed

+21
-26
lines changed

lib/concurrent/scheduled_task.rb

Lines changed: 21 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,10 @@
1-
require 'concurrent/obligation'
2-
require 'concurrent/observable'
1+
require 'concurrent/ivar'
32
require 'concurrent/safe_task_executor'
3+
require 'concurrent/utilities'
44

55
module Concurrent
66

7-
class ScheduledTask
8-
include Obligation
9-
include Concurrent::Observable
7+
class ScheduledTask < IVar
108

119
SchedulingError = Class.new(ArgumentError)
1210

@@ -16,20 +14,19 @@ def initialize(schedule_time, opts = {}, &block)
1614
raise SchedulingError.new('no block given') unless block_given?
1715
calculate_schedule_time!(schedule_time) # raise exception if in past
1816

19-
init_obligation
20-
self.observers = CopyOnWriteObserverSet.new
17+
super(NO_VALUE, opts)
18+
2119
@state = :unscheduled
2220
@intended_schedule_time = schedule_time
2321
@schedule_time = nil
2422
@task = block
25-
set_deref_options(opts)
2623
end
2724

2825
# @since 0.5.0
2926
def execute
3027
if compare_and_set_state(:pending, :unscheduled)
3128
@schedule_time = calculate_schedule_time!(@intended_schedule_time).freeze
32-
Thread.new { work }
29+
do_next_interval
3330
self
3431
end
3532
end
@@ -54,20 +51,19 @@ def cancel
5451
true
5552
end
5653
end
57-
5854
alias_method :stop, :cancel
5955

60-
def add_observer(observer, func = :update)
56+
def add_observer(*args)
6157
if_state(:unscheduled, :pending, :in_progress) do
62-
observers.add_observer(observer, func)
58+
observers.add_observer(*args)
6359
end
6460
end
6561

66-
protected
62+
protected :set, :fail, :complete
6763

68-
def work
69-
sleep_until_scheduled_time
64+
private
7065

66+
def do_work
7167
if compare_and_set_state(:in_progress, :pending)
7268
success, val, reason = SafeTaskExecutor.new(@task).execute
7369

@@ -79,14 +75,19 @@ def work
7975
time = Time.now
8076
observers.notify_and_delete_observers{ [time, self.value, reason] }
8177
end
82-
8378
end
8479

85-
private
80+
def do_next_interval
81+
return if cancelled?
82+
83+
interval = mutex.synchronize do
84+
[60, [(@schedule_time.to_f - Time.now.to_f), 0].max].min
85+
end
8686

87-
def sleep_until_scheduled_time
88-
while (diff = @schedule_time.to_f - Time.now.to_f) > 0
89-
sleep(diff > 60 ? 60 : diff)
87+
if interval > 0
88+
Concurrent::timer(interval, &method(:do_next_interval))
89+
else
90+
do_work
9091
end
9192
end
9293

spec/concurrent/scheduled_task_spec.rb

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -125,12 +125,6 @@ def execute_dereferenceable(subject)
125125
end
126126
end
127127

128-
it 'spawns a new thread when a block was given on construction' do
129-
Thread.should_receive(:new).with(any_args)
130-
task = ScheduledTask.new(1){ nil }
131-
task.execute
132-
end
133-
134128
it 'sets the sate to :pending' do
135129
task = ScheduledTask.new(1){ nil }
136130
task.execute

0 commit comments

Comments
 (0)