Skip to content

Commit 2d1b35f

Browse files
committed
Concurrent::timer and global timer pool now use TimerSet.
1 parent 2d63347 commit 2d1b35f

File tree

6 files changed

+93
-12
lines changed

6 files changed

+93
-12
lines changed

lib/concurrent/configuration.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ def global_operation_pool
6666
#
6767
# @see Concurrent::timer
6868
def global_timer_pool
69-
@global_timer_pool ||= Concurrent::CachedThreadPool.new
69+
@global_timer_pool ||= Concurrent::TimerSet.new
7070
end
7171

7272
# Global thread pool optimized for short *tasks*.

lib/concurrent/executor.rb

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,3 +5,4 @@
55
require 'concurrent/executor/safe_task_executor'
66
require 'concurrent/executor/single_thread_executor'
77
require 'concurrent/executor/thread_pool_executor'
8+
require 'concurrent/executor/timer_set'

lib/concurrent/executor/timer_set.rb

Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
require 'thread'
2+
require 'concurrent/options_parser'
3+
require 'concurrent/collection/priority_queue'
4+
5+
module Concurrent
6+
7+
class TimerSet
8+
9+
def initialize(opts = {})
10+
@mutex = Mutex.new
11+
@queue = PriorityQueue.new(order: :min)
12+
@executor = OptionsParser::get_executor_from(opts)
13+
@thread = nil
14+
end
15+
16+
def post(intended_time, &block)
17+
raise ArgumentError.new('no block given') unless block_given?
18+
time = calculate_schedule_time(intended_time)
19+
@mutex.synchronize{ @queue.push(Task.new(time, block)) }
20+
check_processing_thread
21+
end
22+
23+
private
24+
25+
Task = Struct.new(:time, :op) do
26+
include Comparable
27+
def <=>(other)
28+
self.time <=> other.time
29+
end
30+
end
31+
32+
def calculate_schedule_time(intended_time, now = Time.now)
33+
if intended_time.is_a?(Time)
34+
raise SchedulingError.new('schedule time must be in the future') if intended_time <= now
35+
intended_time.to_f
36+
else
37+
raise SchedulingError.new('seconds must be greater than zero') if intended_time.to_f <= 0.0
38+
now.to_f + intended_time.to_f
39+
end
40+
end
41+
42+
def check_processing_thread
43+
if @thread && @thread.status == 'sleep'
44+
@thread.wakeup
45+
elsif @thread.nil? || ! @thread.alive?
46+
@thread = Thread.new do
47+
Thread.current.abort_on_exception = false
48+
process_tasks
49+
end
50+
end
51+
end
52+
53+
def next_task
54+
@mutex.synchronize do
55+
unless @queue.empty? || @queue.peek.time > Time.now.to_f
56+
@queue.pop
57+
else
58+
nil
59+
end
60+
end
61+
end
62+
63+
def next_sleep_interval
64+
@mutex.synchronize do
65+
if @queue.empty?
66+
nil
67+
else
68+
@queue.peek.time - Time.now.to_f
69+
end
70+
end
71+
end
72+
73+
def process_tasks
74+
loop do
75+
while task = next_task do
76+
@executor.post(&task.op)
77+
end
78+
if (interval = next_sleep_interval).nil?
79+
break
80+
else
81+
sleep([interval, 60].min)
82+
end
83+
end
84+
end
85+
end
86+
end

lib/concurrent/options_parser.rb

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,5 +20,6 @@ def get_executor_from(opts = {})
2020
Concurrent.configuration.global_task_pool
2121
end
2222
end
23+
module_function :get_executor_from
2324
end
2425
end

lib/concurrent/utilities/timer.rb

Lines changed: 3 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
require 'rbconfig'
1+
require 'concurrent/configuration'
22
require 'thread'
33

44
module Concurrent
@@ -8,18 +8,11 @@ module Concurrent
88
# @param [Fixnum] seconds the interval in seconds to wait before executing the task
99
# @yield the task to execute
1010
# @return [Boolean] true
11-
def timer(seconds)
11+
def timer(seconds, &block)
1212
raise ArgumentError.new('no block given') unless block_given?
1313
raise ArgumentError.new('interval must be greater than or equal to zero') if seconds < 0
1414

15-
Concurrent.configuration.global_timer_pool.post do
16-
begin
17-
sleep(seconds)
18-
yield
19-
rescue Exception
20-
# suppress
21-
end
22-
end
15+
Concurrent.configuration.global_timer_pool.post(seconds, &block)
2316
true
2417
end
2518
module_function :timer

spec/concurrent/utilities/timer_spec.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ module Concurrent
3434
end
3535

3636
it 'runs the task on the global timer pool' do
37-
Concurrent.configuration.global_timer_pool.should_receive(:post).with(no_args)
37+
Concurrent.configuration.global_timer_pool.should_receive(:post).with(0.1)
3838
Concurrent::timer(0.1){ :foo }
3939
end
4040
end

0 commit comments

Comments
 (0)