Skip to content

Commit 168376d

Browse files
committed
TimerSet now uses SingleThreadExecutor instead of a pure Ruby thread.
1 parent b0b217b commit 168376d

File tree

2 files changed

+19
-45
lines changed

2 files changed

+19
-45
lines changed

lib/concurrent/executor/timer_set.rb

Lines changed: 14 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
require 'concurrent/options_parser'
44
require 'concurrent/atomic/event'
55
require 'concurrent/collection/priority_queue'
6+
require 'concurrent/executor/single_thread_executor'
67

78
module Concurrent
89

@@ -23,8 +24,8 @@ class TimerSet
2324
# this executor rather than the global thread pool (overrides :operation)
2425
def initialize(opts = {})
2526
@queue = PriorityQueue.new(order: :min)
26-
@executor = get_executor_from(opts)
27-
@thread = nil
27+
@task_executor = get_executor_from(opts)
28+
@timer_executor = SingleThreadExecutor.new
2829
@condition = Condition.new
2930
init_executor
3031
end
@@ -50,10 +51,10 @@ def post(intended_time, *args, &task)
5051
return false unless running?
5152

5253
if (time - Time.now.to_f) <= 0.01
53-
@executor.post(*args, &task)
54+
@task_executor.post(*args, &task)
5455
else
5556
@queue.push(Task.new(time, args, task))
56-
check_processing_thread!
57+
@timer_executor.post(&method(:process_tasks))
5758
end
5859

5960
true
@@ -107,28 +108,10 @@ def <=>(other)
107108
# @!visibility private
108109
def shutdown_execution
109110
@queue.clear
110-
@thread.kill if @thread
111+
@timer_executor.kill
111112
stopped_event.set
112113
end
113114

114-
# Check the status of the processing thread. This thread is responsible
115-
# for monitoring the internal task queue and sending tasks to the
116-
# executor when it is time for them to be processed. If there is no
117-
# processing thread one will be created. If the processing thread is
118-
# sleeping it will be woken up. If the processing thread has died it
119-
# will be garbage collected and a new one will be created.
120-
#
121-
# @!visibility private
122-
def check_processing_thread!
123-
return if shutdown? || @queue.empty?
124-
125-
@thread ||= Thread.new do
126-
Thread.current.abort_on_exception = true
127-
process_tasks
128-
end
129-
end
130-
131-
132115
# Run a loop and execute tasks in the scheduled order and at the approximate
133116
# scheduled time. If no tasks remain the thread will exit gracefully so that
134117
# garbage collection can occur. If there are no ready tasks it will sleep
@@ -137,19 +120,16 @@ def check_processing_thread!
137120
# @!visibility private
138121
def process_tasks
139122
loop do
140-
mutex.synchronize do
141-
if @queue.empty?
142-
@thread = nil
143-
break
144-
end
123+
break if @queue.empty?
145124

146-
task = @queue.peek
147-
interval = task.time - Time.now.to_f
125+
task = @queue.peek
126+
interval = task.time - Time.now.to_f
148127

149-
if interval <= 0
150-
@executor.post(*task.args, &task.op)
151-
@queue.pop
152-
else
128+
if interval <= 0
129+
@task_executor.post(*task.args, &task.op)
130+
@queue.pop
131+
else
132+
mutex.synchronize do
153133
@condition.wait(mutex, [interval, 60].min)
154134
end
155135
end

spec/concurrent/executor/timer_set_spec.rb

Lines changed: 5 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ module Concurrent
3535
latch.wait(0.2).should be_true
3636
end
3737

38-
it 'passes all areguments to the task on execution'
38+
it 'passes all arguments to the task on execution'
3939

4040
it 'immediately posts a task when the delay is zero' do
4141
Thread.should_not_receive(:new).with(any_args)
@@ -101,23 +101,17 @@ module Concurrent
101101
end
102102

103103
it 'stops the monitor thread on #shutdown' do
104-
subject.post(0.2){ nil } # start the monitor thread
105-
sleep(0.1)
106-
thread = subject.instance_variable_get(:@thread)
107-
thread.should_not be_nil
104+
timer_executor = subject.instance_variable_get(:@timer_executor)
108105
subject.shutdown
109106
sleep(0.1)
110-
thread.should_not be_alive
107+
timer_executor.should_not be_running
111108
end
112109

113110
it 'kills the monitor thread on #kill' do
114-
subject.post(0.2){ nil } # start the monitor thread
115-
sleep(0.1)
116-
thread = subject.instance_variable_get(:@thread)
117-
thread.should_not be_nil
111+
timer_executor = subject.instance_variable_get(:@timer_executor)
118112
subject.kill
119113
sleep(0.1)
120-
thread.should_not be_alive
114+
timer_executor.should_not be_running
121115
end
122116

123117
it 'rejects tasks once shutdown' do

0 commit comments

Comments
 (0)