Skip to content

Commit a1a62fe

Browse files
committed
TimerSet tasks can now be rescheduled.
1 parent 988ed27 commit a1a62fe

File tree

3 files changed

+179
-45
lines changed

3 files changed

+179
-45
lines changed

lib/concurrent/executor/timer_set.rb

Lines changed: 63 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -20,23 +20,26 @@ class TimerSet < RubyExecutorService
2020
class Task < Concurrent::IVar
2121
include Comparable
2222

23-
def initialize(parent, time, args, task)
23+
def initialize(parent, delay, args, task)
2424
super()
2525
synchronize do
26+
ns_set_delay_and_time!(delay)
2627
@parent = parent
27-
@time = time
2828
@args = args
2929
@task = task
3030
end
3131
end
3232

33-
# @!visibility private
34-
def time
33+
def original_delay
34+
synchronize { @delay }
35+
end
36+
37+
def schedule_time
3538
synchronize { @time }
3639
end
3740

3841
def <=>(other)
39-
self.time <=> other.time
42+
self.schedule_time <=> other.schedule_time
4043
end
4144

4245
def cancelled?
@@ -49,18 +52,37 @@ def cancel
4952
# To avoid deadlocks this call must occur outside of #synchronize
5053
# Changing the state above should prevent redundant calls
5154
@parent.send(:remove_task, self)
52-
true
5355
else
5456
false
5557
end
5658
end
5759

60+
#def reset
61+
# reschedule(synchronize{ @delay })
62+
#end
63+
64+
def reschedule(delay)
65+
synchronize do
66+
return false unless ns_check_state?(:pending)
67+
ns_set_delay_and_time!(delay)
68+
return false unless @parent.send(:remove_task, self)
69+
@parent.send(:ns_post_task, self)
70+
end
71+
end
72+
5873
# @!visibility private
5974
def execute
6075
safe_execute(@task, @args)
6176
end
6277

6378
protected :set, :try_set
79+
80+
protected
81+
82+
def ns_set_delay_and_time!(delay)
83+
@delay = TimerSet.calculate_delay!(delay)
84+
@time = Concurrent.monotonic_time + @delay
85+
end
6486
end
6587

6688
# Create a new set of timed tasks.
@@ -93,26 +115,9 @@ def initialize(opts = {})
93115
# @!macro deprecated_scheduling_by_clock_time
94116
def post(delay, *args, &task)
95117
raise ArgumentError.new('no block given') unless block_given?
96-
delay = TimerSet.calculate_delay!(delay) # raises exceptions
97-
98-
synchronize do
99-
return false unless running?
100-
101-
time = Concurrent.monotonic_time + delay
102-
task = Task.new(self, time, args, task)
103-
104-
if (delay) <= 0.01
105-
@task_executor.post{ task.execute }
106-
else
107-
@queue.push(task)
108-
# `process_tasks` method will run until queue is empty
109-
# only post the process method when the queue is empty
110-
@timer_executor.post(&method(:process_tasks)) if @queue.size == 1
111-
end
112-
113-
@condition.set
114-
task
115-
end
118+
task = Task.new(self, delay, args, task) # may raise exception
119+
ok = synchronize{ ns_post_task(task) }
120+
ok ? task : false
116121
end
117122

118123
# Begin an immediate shutdown. In-progress tasks will be allowed to
@@ -147,8 +152,33 @@ def self.calculate_delay!(delay)
147152
end
148153
end
149154

155+
private :<<
156+
150157
protected
151158

159+
# @!visibility private
160+
def ns_initialize(opts)
161+
@queue = PriorityQueue.new(order: :min)
162+
@task_executor = Executor.executor_from_options(opts) || Concurrent.global_io_executor
163+
@timer_executor = SingleThreadExecutor.new
164+
@condition = Event.new
165+
self.auto_terminate = opts.fetch(:auto_terminate, true)
166+
end
167+
168+
# @!visibility private
169+
def ns_post_task(task)
170+
return false unless ns_running?
171+
if (task.original_delay) <= 0.01
172+
@task_executor.post{ task.execute }
173+
else
174+
@queue.push(task)
175+
# only post the process method when the queue is empty
176+
@timer_executor.post(&method(:process_tasks)) if @queue.size == 1
177+
@condition.set
178+
end
179+
true
180+
end
181+
152182
# Remove the given task from the queue.
153183
#
154184
# @note This is intended as a callback method from Task only.
@@ -160,13 +190,13 @@ def remove_task(task)
160190
synchronize{ @queue.delete(task) }
161191
end
162192

193+
# @note This is intended as a callback method from Task only.
194+
# It is not intended to be used directly. Cancel a task by
195+
# using the `Task#cancel` method.
196+
#
163197
# @!visibility private
164-
def ns_initialize(opts)
165-
@queue = PriorityQueue.new(order: :min)
166-
@task_executor = Executor.executor_from_options(opts) || Concurrent.global_io_executor
167-
@timer_executor = SingleThreadExecutor.new
168-
@condition = Event.new
169-
self.auto_terminate = opts.fetch(:auto_terminate, true)
198+
def reschedule_task(task, delay)
199+
170200
end
171201

172202
# @!visibility private
@@ -188,7 +218,7 @@ def process_tasks
188218
break unless task
189219

190220
now = Concurrent.monotonic_time
191-
diff = task.time - now
221+
diff = task.schedule_time - now
192222

193223
if diff <= 0
194224
# We need to remove the task from the queue before passing
@@ -210,12 +240,5 @@ def process_tasks
210240
end
211241
end
212242
end
213-
214-
private
215-
216-
# @!visibility private
217-
def <<(task)
218-
raise NotImplementedError.new
219-
end
220243
end
221244
end

lib/concurrent/obligation.rb

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -187,7 +187,7 @@ def compare_and_set_state(next_state, expected_current)
187187
end
188188
end
189189

190-
# executes the block within mutex if current state is included in expected_states
190+
# Executes the block within mutex if current state is included in expected_states
191191
#
192192
# @return block value if executed, false otherwise
193193
#
@@ -203,5 +203,17 @@ def if_state(*expected_states)
203203
end
204204
end
205205
end
206+
207+
protected
208+
209+
# Am I in the current state?
210+
#
211+
# @param [Symbol] expected The state to check against
212+
# @return [Boolean] true if in the expected state else false
213+
#
214+
# @!visibility private
215+
def ns_check_state?(expected)
216+
@state == expected
217+
end
206218
end
207219
end

spec/concurrent/executor/timer_set_spec.rb

Lines changed: 103 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,8 @@ module Concurrent
44

55
describe TimerSet do
66

7-
subject{ TimerSet.new(executor: :immediate) }
7+
let(:executor){ Concurrent::SingleThreadExecutor.new }
8+
subject{ TimerSet.new(executor: executor) }
89

910
after(:each){ subject.kill }
1011

@@ -48,7 +49,8 @@ module Concurrent
4849
end
4950

5051
it 'immediately posts a task when the delay is zero' do
51-
expect(Thread).not_to receive(:new).with(any_args)
52+
timer = subject.instance_variable_get(:@timer_executor)
53+
expect(timer).not_to receive(:post).with(any_args)
5254
subject.post(0){ true }
5355
end
5456
end
@@ -192,7 +194,7 @@ module Concurrent
192194
start_latch.wait(2)
193195
success = job.cancel
194196
continue_latch.count_down
195-
197+
196198
expect(success).to be false
197199
expect(job.value).to eq 42
198200
expect(job.reason).to be_nil
@@ -203,7 +205,7 @@ module Concurrent
203205

204206
job.wait(2)
205207
success = job.cancel
206-
208+
207209
expect(success).to be false
208210
expect(job.value).to eq 42
209211
expect(job.reason).to be_nil
@@ -219,6 +221,103 @@ module Concurrent
219221
expect(job.value(0)).to be_nil
220222
expect(job.reason).to be_a CancelledOperationError
221223
end
224+
225+
it 'returns false when not running' do
226+
task = subject.post(10){ nil }
227+
subject.shutdown
228+
subject.wait_for_termination(2)
229+
expect(task.cancel).to be false
230+
end
231+
end
232+
233+
context 'task rescheduling' do
234+
235+
let(:queue) { subject.instance_variable_get(:@queue) }
236+
237+
it 'raises an exception when given an invalid time' do
238+
expect(queue).to receive(:push).once.with(any_args).and_call_original
239+
task = subject.post(10){ nil }
240+
expect{ task.reschedule(-1) }.to raise_error(ArgumentError)
241+
end
242+
243+
it 'does not change the current schedule when given an invalid time' do
244+
expect(queue).to receive(:push).once.with(any_args).and_call_original
245+
task = subject.post(10){ nil }
246+
expected = task.schedule_time
247+
begin
248+
task.reschedule(-1)
249+
rescue
250+
end
251+
expect(task.schedule_time).to eq expected
252+
end
253+
254+
it 'reschdules a pending and unpost task when given a valid time' do
255+
original_delay = 10
256+
rescheduled_delay = 20
257+
expect(queue).to receive(:push).twice.with(any_args).and_call_original
258+
task = subject.post(original_delay){ nil }
259+
original_schedule = task.schedule_time
260+
success = task.reschedule(rescheduled_delay)
261+
expect(success).to be true
262+
expect(task.original_delay).to be_within(0.01).of(rescheduled_delay)
263+
expect(task.schedule_time).to be > original_schedule
264+
end
265+
266+
it 'returns false once the task has been post to the executor' do
267+
expect(queue).to receive(:push).once.with(any_args).and_call_original
268+
start_latch = Concurrent::CountDownLatch.new
269+
continue_latch = Concurrent::CountDownLatch.new
270+
271+
task = subject.post(0.1) do
272+
start_latch.count_down
273+
continue_latch.wait(2)
274+
end
275+
start_latch.wait(2)
276+
277+
expected = task.schedule_time
278+
success = task.reschedule(10)
279+
continue_latch.count_down
280+
expect(success).to be false
281+
expect(task.schedule_time).to eq expected
282+
end
283+
284+
it 'returns false once the task is processing' do
285+
expect(queue).to receive(:push).once.with(any_args).and_call_original
286+
start_latch = Concurrent::CountDownLatch.new
287+
continue_latch = Concurrent::CountDownLatch.new
288+
task = subject.post(0.1) do
289+
start_latch.count_down
290+
continue_latch.wait(2)
291+
end
292+
start_latch.wait(2)
293+
294+
expected = task.schedule_time
295+
success = task.reschedule(10)
296+
continue_latch.count_down
297+
expect(success).to be false
298+
expect(task.schedule_time).to eq expected
299+
end
300+
301+
it 'returns false once the task has is complete' do
302+
expect(queue).to receive(:push).once.with(any_args).and_call_original
303+
task = subject.post(0.1){ nil }
304+
task.value(2)
305+
expected = task.schedule_time
306+
success = task.reschedule(10)
307+
expect(success).to be false
308+
expect(task.schedule_time).to eq expected
309+
end
310+
311+
it 'returns false when not running' do
312+
expect(queue).to receive(:push).once.with(any_args).and_call_original
313+
task = subject.post(10){ nil }
314+
subject.shutdown
315+
subject.wait_for_termination(2)
316+
expected = task.schedule_time
317+
success = task.reschedule(10)
318+
expect(success).to be false
319+
expect(task.schedule_time).to be_within(0.01).of(expected)
320+
end
222321
end
223322

224323
context 'termination' do

0 commit comments

Comments
 (0)