Skip to content

Commit 764a395

Browse files
committed
TimerSet#post now returns an IVar.
1 parent 911f357 commit 764a395

File tree

3 files changed

+329
-193
lines changed

3 files changed

+329
-193
lines changed

lib/concurrent/executor/timer_set.rb

Lines changed: 60 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
1-
require 'thread'
1+
require 'concurrent/errors'
2+
require 'concurrent/ivar'
23
require 'concurrent/atomic/event'
34
require 'concurrent/collection/priority_queue'
45
require 'concurrent/executor/executor'
@@ -15,6 +16,57 @@ module Concurrent
1516
# @!macro monotonic_clock_warning
1617
class TimerSet < RubyExecutorService
1718

19+
# @!visibility private
20+
class Job < Concurrent::IVar
21+
22+
# @!visibility private
23+
def initialize(args, task)
24+
super()
25+
@args = args
26+
@task = task
27+
ensure_ivar_visibility!
28+
end
29+
30+
# @!visibility private
31+
def cancelled?
32+
state == :cancelled
33+
end
34+
35+
# @!visibility private
36+
def cancel
37+
if compare_and_set_state(:cancelled, :pending)
38+
complete(false, nil, CancelledOperationError.new)
39+
true
40+
else
41+
false
42+
end
43+
end
44+
45+
# @!visibility private
46+
def execute
47+
if compare_and_set_state(:processing, :pending)
48+
success, val, reason = SafeTaskExecutor.new(@task, rescue_exception: true).execute(*@args)
49+
complete(success, val, reason)
50+
end
51+
end
52+
end
53+
private_constant :Job
54+
55+
# A struct for encapsulating a task and its intended execution time.
56+
# It facilitates proper prioritization by overriding the comparison
57+
# (spaceship) operator as a comparison of the intended execution
58+
# times.
59+
#
60+
# @!visibility private
61+
Task = Struct.new(:time, :job) do
62+
include Comparable
63+
64+
def <=>(other)
65+
self.time <=> other.time
66+
end
67+
end
68+
private_constant :Task
69+
1870
# Create a new set of timed tasks.
1971
#
2072
# @!macro [attach] executor_options
@@ -48,16 +100,19 @@ def post(delay, *args, &task)
48100

49101
synchronize do
50102
return false unless running?
103+
104+
job = Job.new(args, task)
51105

52106
if (delay) <= 0.01
53-
@task_executor.post(*args, &task)
107+
@task_executor.post{ job.execute }
54108
else
55-
@queue.push(Task.new(Concurrent.monotonic_time + delay, args, task))
109+
@queue.push(Task.new(Concurrent.monotonic_time + delay, job))
56110
@timer_executor.post(&method(:process_tasks))
57111
end
112+
58113
@condition.set
114+
job
59115
end
60-
true
61116
end
62117

63118
# Begin an immediate shutdown. In-progress tasks will be allowed to
@@ -94,22 +149,6 @@ def self.calculate_delay!(delay)
94149

95150
protected
96151

97-
# A struct for encapsulating a task and its intended execution time.
98-
# It facilitates proper prioritization by overriding the comparison
99-
# (spaceship) operator as a comparison of the intended execution
100-
# times.
101-
#
102-
# @!visibility private
103-
Task = Struct.new(:time, :args, :op) do
104-
include Comparable
105-
106-
def <=>(other)
107-
self.time <=> other.time
108-
end
109-
end
110-
111-
private_constant :Task
112-
113152
def ns_initialize(opts)
114153
@queue = PriorityQueue.new(order: :min)
115154
@task_executor = Executor.executor_from_options(opts) || Concurrent.global_io_executor
@@ -153,7 +192,7 @@ def process_tasks
153192
# queue now must have the same pop time, or a closer one, as
154193
# when we peeked).
155194
task = synchronize { @queue.pop }
156-
@task_executor.post(*task.args, &task.op)
195+
@task_executor.post{ task.job.execute }
157196
else
158197
@condition.wait([diff, 60].min)
159198
end

lib/concurrent/obligation.rb

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -136,23 +136,23 @@ def exception(*args)
136136
protected
137137

138138
# @!visibility private
139-
def get_arguments_from(opts = {}) # :nodoc:
139+
def get_arguments_from(opts = {})
140140
[*opts.fetch(:args, [])]
141141
end
142142

143143
# @!visibility private
144-
def init_obligation(*args) # :nodoc:
144+
def init_obligation(*args)
145145
init_mutex(*args)
146146
@event = Event.new
147147
end
148148

149149
# @!visibility private
150-
def event # :nodoc:
150+
def event
151151
@event
152152
end
153153

154154
# @!visibility private
155-
def set_state(success, value, reason) # :nodoc:
155+
def set_state(success, value, reason)
156156
if success
157157
@value = value
158158
@state = :fulfilled
@@ -163,7 +163,7 @@ def set_state(success, value, reason) # :nodoc:
163163
end
164164

165165
# @!visibility private
166-
def state=(value) # :nodoc:
166+
def state=(value)
167167
mutex.synchronize { @state = value }
168168
end
169169

@@ -176,7 +176,7 @@ def state=(value) # :nodoc:
176176
# @return [Boolean] true is state is changed, false otherwise
177177
#
178178
# @!visibility private
179-
def compare_and_set_state(next_state, expected_current) # :nodoc:
179+
def compare_and_set_state(next_state, expected_current)
180180
mutex.synchronize do
181181
if @state == expected_current
182182
@state = next_state
@@ -192,7 +192,7 @@ def compare_and_set_state(next_state, expected_current) # :nodoc:
192192
# @return block value if executed, false otherwise
193193
#
194194
# @!visibility private
195-
def if_state(*expected_states) # :nodoc:
195+
def if_state(*expected_states)
196196
mutex.synchronize do
197197
raise ArgumentError.new('no block given') unless block_given?
198198

0 commit comments

Comments
 (0)