Skip to content

Commit 7ce524f

Browse files
committed
Complete specs and YARD doc for TimerSet and Concurrent::timer
1 parent 2d1b35f commit 7ce524f

File tree

3 files changed

+310
-14
lines changed

3 files changed

+310
-14
lines changed

lib/concurrent/executor/timer_set.rb

Lines changed: 130 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,55 +1,159 @@
11
require 'thread'
22
require 'concurrent/options_parser'
3+
require 'concurrent/atomic/event'
34
require 'concurrent/collection/priority_queue'
45

56
module Concurrent
67

8+
# Executes a collection of tasks at the specified times. A master thread
9+
# monitors the set and schedules each task for execution at the appropriate
10+
# time. Tasks are run on the global task pool or on the supplied executor.
711
class TimerSet
812

13+
# Create a new set of timed tasks.
14+
#
15+
# @param [Hash] opts the options controlling how the future will be processed
16+
# @option opts [Boolean] :operation (false) when `true` will execute the future on the global
17+
# operation pool (for long-running operations), when `false` will execute the future on the
18+
# global task pool (for short-running tasks)
19+
# @option opts [object] :executor when provided will run all operations on
20+
# this executor rather than the global thread pool (overrides :operation)
921
def initialize(opts = {})
1022
@mutex = Mutex.new
23+
@shutdown = Event.new
1124
@queue = PriorityQueue.new(order: :min)
1225
@executor = OptionsParser::get_executor_from(opts)
1326
@thread = nil
1427
end
1528

29+
# Am I running?
30+
#
31+
# @return [Boolean] `true` when running, `false` when shutting down or shutdown
32+
def running?
33+
! @shutdown.set?
34+
end
35+
36+
# Am I shutdown?
37+
#
38+
# @return [Boolean] `true` when shutdown, `false` when shutting down or running
39+
def shutdown?
40+
@shutdown.set?
41+
end
42+
43+
# Block until shutdown is complete or until `timeout` seconds have passed.
44+
#
45+
# @note Does not initiate shutdown or termination. Either `shutdown` or `kill`
46+
# must be called before this method (or on another thread).
47+
#
48+
# @param [Integer] timeout the maximum number of seconds to wait for shutdown to complete
49+
#
50+
# @return [Boolean] `true` if shutdown complete or false on `timeout`
51+
def wait_for_termination(timeout)
52+
@shutdown.wait(timeout.to_f)
53+
end
54+
55+
# Post a task to be execute at the specified time. The given time may be either
56+
# a `Time` object or the number of seconds to wait. If the intended execution
57+
# time is within 1/100th of a second of the current time the task will be
58+
# immediately post to the executor.
59+
#
60+
# @param [Object] intended_time the time to schedule the task for execution
61+
#
62+
# @yield the task to be performed
63+
#
64+
# @return [Boolean] true if the message is post, false after shutdown
65+
#
66+
# @raise [ArgumentError] if the intended execution time is not in the future
67+
# @raise [ArgumentError] if no block is given
1668
def post(intended_time, &block)
17-
raise ArgumentError.new('no block given') unless block_given?
18-
time = calculate_schedule_time(intended_time)
19-
@mutex.synchronize{ @queue.push(Task.new(time, block)) }
20-
check_processing_thread
69+
@mutex.synchronize do
70+
return false if shutdown?
71+
raise ArgumentError.new('no block given') unless block_given?
72+
time = calculate_schedule_time(intended_time)
73+
74+
if (time - Time.now.to_f) <= 0.01
75+
@executor.post(&block)
76+
else
77+
@queue.push(Task.new(time, block))
78+
end
79+
end
80+
check_processing_thread!
81+
true
82+
end
83+
84+
def shutdown
85+
@mutex.synchronize do
86+
unless @shutdown.set?
87+
@queue.clear
88+
@thread.kill if @thread
89+
@shutdown.set
90+
end
91+
end
92+
true
2193
end
94+
alias_method :kill, :shutdown
2295

2396
private
2497

98+
# A struct for encapsulating a task and its intended execution time.
99+
# It facilitates proper prioritization by overriding the comparison
100+
# (spaceship) operator as a comparison of the intended execution
101+
# times.
102+
#
103+
# @!visibility private
25104
Task = Struct.new(:time, :op) do
26105
include Comparable
27106
def <=>(other)
28107
self.time <=> other.time
29108
end
30109
end
31110

111+
# Calculate an Epoch time with milliseconds at which to execute a
112+
# task. If the given time is a `Time` object it will be converted
113+
# accordingly. If the time is an integer value greate than zero
114+
# it will be understood as a number of seconds in the future and
115+
# will be added to the current time to calculate Epoch.
116+
#
117+
# @raise [ArgumentError] if the intended execution time is not in the future
118+
#
119+
# @!visibility private
32120
def calculate_schedule_time(intended_time, now = Time.now)
33121
if intended_time.is_a?(Time)
34-
raise SchedulingError.new('schedule time must be in the future') if intended_time <= now
122+
raise ArgumentError.new('schedule time must be in the future') if intended_time <= now
35123
intended_time.to_f
36124
else
37-
raise SchedulingError.new('seconds must be greater than zero') if intended_time.to_f <= 0.0
125+
raise ArgumentError.new('seconds must be greater than zero') if intended_time.to_f < 0.0
38126
now.to_f + intended_time.to_f
39127
end
40128
end
41129

42-
def check_processing_thread
43-
if @thread && @thread.status == 'sleep'
44-
@thread.wakeup
45-
elsif @thread.nil? || ! @thread.alive?
46-
@thread = Thread.new do
47-
Thread.current.abort_on_exception = false
48-
process_tasks
130+
# Check the status of the processing thread. This thread is responsible
131+
# for monitoring the internal task queue and sending tasks to the
132+
# executor when it is time for them to be processed. If there is no
133+
# processing thread one will be created. If the processing thread is
134+
# sleeping it will be worken up. If the processing thread has died it
135+
# will be garbage collected and a new one will be created.
136+
#
137+
# @!visibility private
138+
def check_processing_thread!
139+
@mutex.synchronize do
140+
return if shutdown? || @queue.empty?
141+
if @thread && @thread.status == 'sleep'
142+
@thread.wakeup
143+
elsif @thread.nil? || ! @thread.alive?
144+
@thread = Thread.new do
145+
Thread.current.abort_on_exception = false
146+
process_tasks
147+
end
49148
end
50149
end
51150
end
52151

152+
# Check the head of the internal task queue for a ready task.
153+
#
154+
# @return [Task] the next task to be executed or nil if none are ready
155+
#
156+
# @!visibility private
53157
def next_task
54158
@mutex.synchronize do
55159
unless @queue.empty? || @queue.peek.time > Time.now.to_f
@@ -60,6 +164,13 @@ def next_task
60164
end
61165
end
62166

167+
# Calculate the time difference, in seconds and milliseconds, between
168+
# now and the intended execution time of the next task to be ececuted.
169+
#
170+
# @return [Integer] the number of seconds and milliseconds to sleep
171+
# or nil if the task queue is empty
172+
#
173+
# @!visibility private
63174
def next_sleep_interval
64175
@mutex.synchronize do
65176
if @queue.empty?
@@ -70,6 +181,12 @@ def next_sleep_interval
70181
end
71182
end
72183

184+
# Run a loop and execute tasks in the scheduled order and at the approximate
185+
# shceduled time. If no tasks remain the thread will exit gracefully so that
186+
# garbage collection can occur. If there are no ready tasks it will sleep
187+
# for up to 60 seconds waiting for the next scheduled task.
188+
#
189+
# @!visibility private
73190
def process_tasks
74191
loop do
75192
while task = next_task do
Lines changed: 179 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,179 @@
1+
require 'spec_helper'
2+
3+
module Concurrent
4+
5+
describe TimerSet do
6+
7+
subject{ TimerSet.new(executor: ImmediateExecutor.new) }
8+
9+
after(:each){ subject.kill }
10+
11+
it 'uses the executor given at construction' do
12+
executor = double(:executor)
13+
executor.should_receive(:post).with(no_args)
14+
subject = TimerSet.new(executor: executor)
15+
subject.post(0){ nil }
16+
sleep(0.1)
17+
end
18+
19+
it 'uses the global task pool be default' do
20+
Concurrent.configuration.global_task_pool.should_receive(:post).with(no_args)
21+
subject = TimerSet.new
22+
subject.post(0){ nil }
23+
sleep(0.1)
24+
end
25+
26+
it 'executes a given task when given a Time' do
27+
expected = false
28+
subject.post(Time.now + 0.1){ expected = true }
29+
sleep(0.2)
30+
expected.should be_true
31+
end
32+
33+
it 'executes a given task when given an interval in seconds' do
34+
expected = false
35+
subject.post(0.1){ expected = true }
36+
sleep(0.2)
37+
end
38+
39+
it 'immediately posts a task when the delay is zero' do
40+
Thread.should_not_receive(:new).with(any_args)
41+
expected = false
42+
subject.post(0){ expected = true }
43+
end
44+
45+
it 'does not execute tasks early' do
46+
expected = AtomicFixnum.new(0)
47+
subject.post(0.2){ expected.increment }
48+
sleep(0.1)
49+
expected.value.should eq 0
50+
sleep(0.1)
51+
expected.value.should eq 1
52+
end
53+
54+
it 'raises an exception when given a task with a past Time value' do
55+
expect {
56+
subject.post(Time.now - 10){ nil }
57+
}.to raise_error(ArgumentError)
58+
end
59+
60+
it 'raises an exception when given a task with a delay less than zero' do
61+
expect {
62+
subject.post(-10){ nil }
63+
}.to raise_error(ArgumentError)
64+
end
65+
66+
it 'raises an exception when no block given' do
67+
expect {
68+
subject.post(10)
69+
}.to raise_error(ArgumentError)
70+
end
71+
72+
it 'executes all tasks scheduled for the same time' do
73+
expected = AtomicFixnum.new(0)
74+
5.times{ subject.post(0.1){ expected.increment } }
75+
sleep(0.2)
76+
expected.value.should eq 5
77+
end
78+
79+
it 'executes tasks with different times in schedule order' do
80+
expected = []
81+
3.times{|i| subject.post(i/10){ expected << i } }
82+
sleep(0.3)
83+
expected.should eq [0, 1, 2]
84+
end
85+
86+
it 'cancels all pending tasks on #shutdown' do
87+
expected = AtomicFixnum.new(0)
88+
10.times{ subject.post(0.2){ expected.increment } }
89+
sleep(0.1)
90+
subject.shutdown
91+
sleep(0.2)
92+
expected.value.should eq 0
93+
end
94+
95+
it 'cancels all pending tasks on #kill' do
96+
expected = AtomicFixnum.new(0)
97+
10.times{ subject.post(0.2){ expected.increment } }
98+
sleep(0.1)
99+
subject.kill
100+
sleep(0.2)
101+
expected.value.should eq 0
102+
end
103+
104+
it 'stops the monitor thread on #shutdown' do
105+
subject.post(0.1){ nil } # start the monitor thread
106+
sleep(0.2)
107+
subject.instance_variable_get(:@thread).should_not be_nil
108+
subject.shutdown
109+
sleep(0.1)
110+
subject.instance_variable_get(:@thread).should_not be_alive
111+
end
112+
113+
it 'kills the monitor thread on #kill' do
114+
subject.post(0.1){ nil } # start the monitor thread
115+
sleep(0.2)
116+
subject.instance_variable_get(:@thread).should_not be_nil
117+
subject.kill
118+
sleep(0.1)
119+
subject.instance_variable_get(:@thread).should_not be_alive
120+
end
121+
122+
it 'rejects tasks once shutdown' do
123+
expected = AtomicFixnum.new(0)
124+
subject.shutdown
125+
sleep(0.1)
126+
subject.post(0){ expected.increment }.should be_false
127+
sleep(0.1)
128+
expected.value.should eq 0
129+
end
130+
131+
it 'rejects tasks once killed' do
132+
expected = AtomicFixnum.new(0)
133+
subject.kill
134+
sleep(0.1)
135+
subject.post(0){ expected.increment }.should be_false
136+
sleep(0.1)
137+
expected.value.should eq 0
138+
end
139+
140+
it 'is running? when first created' do
141+
subject.should be_running
142+
subject.should_not be_shutdown
143+
end
144+
145+
it 'is running? after tasks have been post' do
146+
subject.post(0.1){ nil }
147+
subject.should be_running
148+
subject.should_not be_shutdown
149+
end
150+
151+
it 'is shutdown? after shutdown completes' do
152+
subject.shutdown
153+
sleep(0.1)
154+
subject.should_not be_running
155+
subject.should be_shutdown
156+
end
157+
158+
it 'is shutdown? after being killed' do
159+
subject.kill
160+
sleep(0.1)
161+
subject.should_not be_running
162+
subject.should be_shutdown
163+
end
164+
165+
specify '#wait_for_termination returns true if shutdown completes before timeout' do
166+
subject.post(0.1){ nil }
167+
sleep(0.1)
168+
subject.shutdown
169+
subject.wait_for_termination(0.1).should be_true
170+
end
171+
172+
specify '#wait_for_termination returns false on timeout' do
173+
subject.post(0.1){ nil }
174+
sleep(0.1)
175+
# do not call shutdown -- force timeout
176+
subject.wait_for_termination(0.1).should be_false
177+
end
178+
end
179+
end

spec/spec_helper.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
Dir[File.join(File.dirname(__FILE__), 'support/**/*.rb')].each { |f| require File.expand_path(f) }
2222

2323
RSpec.configure do |config|
24-
config.order = 'random'
24+
#config.order = 'random'
2525

2626
config.before(:suite) do
2727
end

0 commit comments

Comments
 (0)