Skip to content

Commit bd7507b

Browse files
committed
changed timer set executor strategy
1 parent 523cc77 commit bd7507b

File tree

2 files changed

+33
-57
lines changed

2 files changed

+33
-57
lines changed

lib/concurrent/executor/timer_set.rb

Lines changed: 20 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ def initialize(opts = {})
2525
@queue = PriorityQueue.new(order: :min)
2626
@executor = get_executor_from(opts)
2727
@thread = nil
28+
@condition = Condition.new
2829
init_executor
2930
end
3031

@@ -120,47 +121,13 @@ def shutdown_execution
120121
# @!visibility private
121122
def check_processing_thread!
122123
return if shutdown? || @queue.empty?
123-
if @thread && @thread.status == 'sleep'
124-
@thread.wakeup
125-
elsif @thread.nil? || !@thread.alive?
126-
@thread = Thread.new do
127-
Thread.current.abort_on_exception = true
128-
process_tasks
129-
end
130-
end
131-
end
132124

133-
# Check the head of the internal task queue for a ready task.
134-
#
135-
# @return [Task] the next task to be executed or nil if none are ready
136-
#
137-
# @!visibility private
138-
def next_task
139-
mutex.synchronize do
140-
unless @queue.empty? || @queue.peek.time > Time.now.to_f
141-
@queue.pop
142-
else
143-
nil
144-
end
125+
@thread ||= Thread.new do
126+
Thread.current.abort_on_exception = true
127+
process_tasks
145128
end
146129
end
147130

148-
# Calculate the time difference, in seconds and milliseconds, between
149-
# now and the intended execution time of the next task to be ececuted.
150-
#
151-
# @return [Integer] the number of seconds and milliseconds to sleep
152-
# or nil if the task queue is empty
153-
#
154-
# @!visibility private
155-
def next_sleep_interval
156-
mutex.synchronize do
157-
if @queue.empty?
158-
nil
159-
else
160-
@queue.peek.time - Time.now.to_f
161-
end
162-
end
163-
end
164131

165132
# Run a loop and execute tasks in the scheduled order and at the approximate
166133
# scheduled time. If no tasks remain the thread will exit gracefully so that
@@ -170,13 +137,22 @@ def next_sleep_interval
170137
# @!visibility private
171138
def process_tasks
172139
loop do
173-
while task = next_task do
174-
@executor.post(&task.op)
175-
end
176-
if (interval = next_sleep_interval).nil?
177-
break
178-
else
179-
sleep([interval, 60].min)
140+
141+
mutex.synchronize do
142+
if @queue.empty?
143+
@thread = nil
144+
break
145+
end
146+
147+
task = @queue.peek
148+
interval = task.time - Time.now.to_f
149+
150+
if interval <= 0
151+
@executor.post(&task.op)
152+
@queue.pop
153+
else
154+
@condition.wait(mutex, [interval, 60].min)
155+
end
180156
end
181157
end
182158
end

spec/concurrent/executor/timer_set_spec.rb

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -41,12 +41,11 @@ module Concurrent
4141
end
4242

4343
it 'does not execute tasks early' do
44-
pending('intermittently failing on Travis CI')
4544
expected = AtomicFixnum.new(0)
4645
subject.post(0.2){ expected.increment }
47-
sleep(0.1)
46+
sleep(0.15)
4847
expected.value.should eq 0
49-
sleep(0.1)
48+
sleep(0.10)
5049
expected.value.should eq 1
5150
end
5251

@@ -69,10 +68,9 @@ module Concurrent
6968
end
7069

7170
it 'executes all tasks scheduled for the same time' do
72-
pending('intermittently failing on Travis CI')
7371
latch = CountDownLatch.new(5)
7472
5.times{ subject.post(0.1){ latch.count_down } }
75-
latch.wait(0.2).should eq 5
73+
latch.wait(0.2).should be_true
7674
end
7775

7876
it 'executes tasks with different times in schedule order' do
@@ -101,21 +99,23 @@ module Concurrent
10199
end
102100

103101
it 'stops the monitor thread on #shutdown' do
104-
subject.post(0.1){ nil } # start the monitor thread
105-
sleep(0.2)
106-
subject.instance_variable_get(:@thread).should_not be_nil
102+
subject.post(0.2){ nil } # start the monitor thread
103+
sleep(0.1)
104+
thread = subject.instance_variable_get(:@thread)
105+
thread.should_not be_nil
107106
subject.shutdown
108107
sleep(0.1)
109-
subject.instance_variable_get(:@thread).should_not be_alive
108+
thread.should_not be_alive
110109
end
111110

112111
it 'kills the monitor thread on #kill' do
113-
subject.post(0.1){ nil } # start the monitor thread
114-
sleep(0.2)
115-
subject.instance_variable_get(:@thread).should_not be_nil
112+
subject.post(0.2){ nil } # start the monitor thread
113+
sleep(0.1)
114+
thread = subject.instance_variable_get(:@thread)
115+
thread.should_not be_nil
116116
subject.kill
117117
sleep(0.1)
118-
subject.instance_variable_get(:@thread).should_not be_alive
118+
thread.should_not be_alive
119119
end
120120

121121
it 'rejects tasks once shutdown' do

0 commit comments

Comments
 (0)