Skip to content

Commit e91b3b1

Browse files
committed
Merge pull request #54 from jdantonio/safe_timer_set
Safe timer set
2 parents 024edfb + bd7507b commit e91b3b1

File tree

2 files changed

+58
-82
lines changed

2 files changed

+58
-82
lines changed

lib/concurrent/executor/timer_set.rb

Lines changed: 36 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ def initialize(opts = {})
2525
@queue = PriorityQueue.new(order: :min)
2626
@executor = get_executor_from(opts)
2727
@thread = nil
28+
@condition = Condition.new
2829
init_executor
2930
end
3031

@@ -43,10 +44,21 @@ def initialize(opts = {})
4344
# @raise [ArgumentError] if no block is given
4445
def post(intended_time, &task)
4546
time = TimerSet.calculate_schedule_time(intended_time).to_f
46-
if super(time, &task)
47-
check_processing_thread!
47+
raise ArgumentError.new('no block given') unless block_given?
48+
49+
mutex.synchronize do
50+
return false unless running?
51+
52+
if (time - Time.now.to_f) <= 0.01
53+
@executor.post(&task)
54+
else
55+
@queue.push(Task.new(time, task))
56+
check_processing_thread!
57+
end
58+
4859
true
4960
end
61+
5062
end
5163

5264
alias_method :kill, :shutdown
@@ -84,19 +96,13 @@ def self.calculate_schedule_time(intended_time, now = Time.now)
8496
# @!visibility private
8597
Task = Struct.new(:time, :op) do
8698
include Comparable
99+
87100
def <=>(other)
88101
self.time <=> other.time
89102
end
90103
end
91104

92-
# @!visibility private
93-
def execute(time, &task)
94-
if (time - Time.now.to_f) <= 0.01
95-
@executor.post(&task)
96-
else
97-
@queue.push(Task.new(time, task))
98-
end
99-
end
105+
private_constant :Task
100106

101107
# @!visibility private
102108
def shutdown_execution
@@ -114,50 +120,14 @@ def shutdown_execution
114120
#
115121
# @!visibility private
116122
def check_processing_thread!
117-
mutex.synchronize do
118-
return if shutdown? || @queue.empty?
119-
if @thread && @thread.status == 'sleep'
120-
@thread.wakeup
121-
elsif @thread.nil? || ! @thread.alive?
122-
@thread = Thread.new do
123-
Thread.current.abort_on_exception = true
124-
process_tasks
125-
end
126-
end
127-
end
128-
end
123+
return if shutdown? || @queue.empty?
129124

130-
# Check the head of the internal task queue for a ready task.
131-
#
132-
# @return [Task] the next task to be executed or nil if none are ready
133-
#
134-
# @!visibility private
135-
def next_task
136-
mutex.synchronize do
137-
unless @queue.empty? || @queue.peek.time > Time.now.to_f
138-
@queue.pop
139-
else
140-
nil
141-
end
125+
@thread ||= Thread.new do
126+
Thread.current.abort_on_exception = true
127+
process_tasks
142128
end
143129
end
144130

145-
# Calculate the time difference, in seconds and milliseconds, between
146-
# now and the intended execution time of the next task to be ececuted.
147-
#
148-
# @return [Integer] the number of seconds and milliseconds to sleep
149-
# or nil if the task queue is empty
150-
#
151-
# @!visibility private
152-
def next_sleep_interval
153-
mutex.synchronize do
154-
if @queue.empty?
155-
nil
156-
else
157-
@queue.peek.time - Time.now.to_f
158-
end
159-
end
160-
end
161131

162132
# Run a loop and execute tasks in the scheduled order and at the approximate
163133
# scheduled time. If no tasks remain the thread will exit gracefully so that
@@ -167,13 +137,22 @@ def next_sleep_interval
167137
# @!visibility private
168138
def process_tasks
169139
loop do
170-
while task = next_task do
171-
@executor.post(&task.op)
172-
end
173-
if (interval = next_sleep_interval).nil?
174-
break
175-
else
176-
sleep([interval, 60].min)
140+
141+
mutex.synchronize do
142+
if @queue.empty?
143+
@thread = nil
144+
break
145+
end
146+
147+
task = @queue.peek
148+
interval = task.time - Time.now.to_f
149+
150+
if interval <= 0
151+
@executor.post(&task.op)
152+
@queue.pop
153+
else
154+
@condition.wait(mutex, [interval, 60].min)
155+
end
177156
end
178157
end
179158
end

spec/concurrent/executor/timer_set_spec.rb

Lines changed: 22 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -24,31 +24,28 @@ module Concurrent
2424
end
2525

2626
it 'executes a given task when given a Time' do
27-
expected = false
28-
subject.post(Time.now + 0.1){ expected = true }
29-
sleep(0.2)
30-
expected.should be_true
27+
latch = CountDownLatch.new(1)
28+
subject.post(Time.now + 0.1){ latch.count_down }
29+
latch.wait(0.2).should be_true
3130
end
3231

3332
it 'executes a given task when given an interval in seconds' do
34-
expected = false
35-
subject.post(0.1){ expected = true }
36-
sleep(0.2)
33+
latch = CountDownLatch.new(1)
34+
subject.post(0.1){ latch.count_down }
35+
latch.wait(0.2).should be_true
3736
end
3837

3938
it 'immediately posts a task when the delay is zero' do
4039
Thread.should_not_receive(:new).with(any_args)
41-
expected = false
42-
subject.post(0){ expected = true }
40+
subject.post(0){ true }
4341
end
4442

4543
it 'does not execute tasks early' do
46-
pending('intermittently failing on Travis CI')
4744
expected = AtomicFixnum.new(0)
4845
subject.post(0.2){ expected.increment }
49-
sleep(0.1)
46+
sleep(0.15)
5047
expected.value.should eq 0
51-
sleep(0.1)
48+
sleep(0.10)
5249
expected.value.should eq 1
5350
end
5451

@@ -71,11 +68,9 @@ module Concurrent
7168
end
7269

7370
it 'executes all tasks scheduled for the same time' do
74-
pending('intermittently failing on Travis CI')
75-
expected = AtomicFixnum.new(0)
76-
5.times{ subject.post(0.1){ expected.increment } }
77-
sleep(0.2)
78-
expected.value.should eq 5
71+
latch = CountDownLatch.new(5)
72+
5.times{ subject.post(0.1){ latch.count_down } }
73+
latch.wait(0.2).should be_true
7974
end
8075

8176
it 'executes tasks with different times in schedule order' do
@@ -104,21 +99,23 @@ module Concurrent
10499
end
105100

106101
it 'stops the monitor thread on #shutdown' do
107-
subject.post(0.1){ nil } # start the monitor thread
108-
sleep(0.2)
109-
subject.instance_variable_get(:@thread).should_not be_nil
102+
subject.post(0.2){ nil } # start the monitor thread
103+
sleep(0.1)
104+
thread = subject.instance_variable_get(:@thread)
105+
thread.should_not be_nil
110106
subject.shutdown
111107
sleep(0.1)
112-
subject.instance_variable_get(:@thread).should_not be_alive
108+
thread.should_not be_alive
113109
end
114110

115111
it 'kills the monitor thread on #kill' do
116-
subject.post(0.1){ nil } # start the monitor thread
117-
sleep(0.2)
118-
subject.instance_variable_get(:@thread).should_not be_nil
112+
subject.post(0.2){ nil } # start the monitor thread
113+
sleep(0.1)
114+
thread = subject.instance_variable_get(:@thread)
115+
thread.should_not be_nil
119116
subject.kill
120117
sleep(0.1)
121-
subject.instance_variable_get(:@thread).should_not be_alive
118+
thread.should_not be_alive
122119
end
123120

124121
it 'rejects tasks once shutdown' do

0 commit comments

Comments
 (0)