Skip to content

Commit b5d4432

Browse files
committed
Extracted methods into IVar from subclasses.
1 parent 764a395 commit b5d4432

File tree

4 files changed

+66
-55
lines changed

4 files changed

+66
-55
lines changed

lib/concurrent/executor/timer_set.rb

Lines changed: 30 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -16,15 +16,32 @@ module Concurrent
1616
# @!macro monotonic_clock_warning
1717
class TimerSet < RubyExecutorService
1818

19+
# A class for encapsulating a task and its intended execution time.
20+
# It facilitates proper prioritization by overriding the comparison
21+
# (spaceship) operator as a comparison of the intended execution
22+
# times.
23+
#
1924
# @!visibility private
20-
class Job < Concurrent::IVar
25+
class Task < Concurrent::IVar
26+
include Comparable
2127

2228
# @!visibility private
23-
def initialize(args, task)
29+
def initialize(time, args, task)
2430
super()
25-
@args = args
26-
@task = task
27-
ensure_ivar_visibility!
31+
synchronize do
32+
@time = time
33+
@args = args
34+
@task = task
35+
end
36+
end
37+
38+
def time
39+
synchronize { @time }
40+
end
41+
42+
# @!visibility private
43+
def <=>(other)
44+
self.time <=> other.time
2845
end
2946

3047
# @!visibility private
@@ -44,25 +61,7 @@ def cancel
4461

4562
# @!visibility private
4663
def execute
47-
if compare_and_set_state(:processing, :pending)
48-
success, val, reason = SafeTaskExecutor.new(@task, rescue_exception: true).execute(*@args)
49-
complete(success, val, reason)
50-
end
51-
end
52-
end
53-
private_constant :Job
54-
55-
# A struct for encapsulating a task and its intended execution time.
56-
# It facilitates proper prioritization by overriding the comparison
57-
# (spaceship) operator as a comparison of the intended execution
58-
# times.
59-
#
60-
# @!visibility private
61-
Task = Struct.new(:time, :job) do
62-
include Comparable
63-
64-
def <=>(other)
65-
self.time <=> other.time
64+
safe_execute(@task, @args)
6665
end
6766
end
6867
private_constant :Task
@@ -100,18 +99,19 @@ def post(delay, *args, &task)
10099

101100
synchronize do
102101
return false unless running?
103-
104-
job = Job.new(args, task)
102+
103+
time = Concurrent.monotonic_time + delay
104+
task = Task.new(time, args, task)
105105

106106
if (delay) <= 0.01
107-
@task_executor.post{ job.execute }
107+
@task_executor.post{ task.execute }
108108
else
109-
@queue.push(Task.new(Concurrent.monotonic_time + delay, job))
109+
@queue.push(task)
110110
@timer_executor.post(&method(:process_tasks))
111111
end
112112

113113
@condition.set
114-
job
114+
task
115115
end
116116
end
117117

@@ -192,7 +192,7 @@ def process_tasks
192192
# queue now must have the same pop time, or a closer one, as
193193
# when we peeked).
194194
task = synchronize { @queue.pop }
195-
@task_executor.post{ task.job.execute }
195+
@task_executor.post{ task.execute }
196196
else
197197
@condition.wait([diff, 60].min)
198198
end

lib/concurrent/future.rb

Lines changed: 7 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ def initialize(opts = {}, &block)
4545
# future.state #=> :pending
4646
def execute
4747
if compare_and_set_state(:pending, :unscheduled)
48-
@executor.post(@args){ work }
48+
@executor.post{ safe_execute(@task, @args) }
4949
self
5050
end
5151
end
@@ -90,7 +90,12 @@ def set(value = IVar::NO_VALUE, &block)
9090
#
9191
# @return [Boolean] was the operation successfully cancelled.
9292
def cancel
93-
compare_and_set_state(:cancelled, :pending)
93+
if compare_and_set_state(:cancelled, :pending)
94+
complete(false, nil, CancelledOperationError.new)
95+
true
96+
else
97+
false
98+
end
9499
end
95100

96101
# Has the operation been successfully cancelled?
@@ -125,17 +130,5 @@ def ns_initialize(value, opts)
125130
@executor = Executor.executor_from_options(opts) || Concurrent.global_io_executor
126131
@args = get_arguments_from(opts)
127132
end
128-
129-
private
130-
131-
# @!visibility private
132-
def work
133-
if compare_and_set_state(:processing, :pending)
134-
success, val, reason = SafeTaskExecutor.new(@task, rescue_exception: true).execute(*@args)
135-
complete(success, val, reason)
136-
else
137-
complete(false, nil, CancelledOperationError.new)
138-
end
139-
end
140133
end
141134
end

lib/concurrent/ivar.rb

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -148,6 +148,7 @@ def try_set(value = NO_VALUE, &block)
148148

149149
protected
150150

151+
# @!visibility private
151152
def ns_initialize(value, opts)
152153
init_obligation(self)
153154
self.observers = CopyOnWriteObserverSet.new
@@ -160,21 +161,34 @@ def ns_initialize(value, opts)
160161
end
161162
end
162163

164+
# @!visibility private
165+
def safe_execute(task, args = [])
166+
if compare_and_set_state(:processing, :pending)
167+
success, val, reason = SafeTaskExecutor.new(task, rescue_exception: true).execute(*@args)
168+
complete(success, val, reason)
169+
yield(success, val, reason) if block_given?
170+
end
171+
end
172+
173+
# @!visibility private
163174
def complete(success, value, reason)
164175
complete_without_notification(success, value, reason)
165176
notify_observers(self.value, reason)
166177
self
167178
end
168179

180+
# @!visibility private
169181
def complete_without_notification(success, value, reason)
170182
synchronize { ns_complete_without_notification(success, value, reason) }
171183
self
172184
end
173185

186+
# @!visibility private
174187
def notify_observers(value, reason)
175188
observers.notify_and_delete_observers{ [Time.now, value, reason] }
176189
end
177190

191+
# @!visibility private
178192
def ns_complete_without_notification(success, value, reason)
179193
raise MultipleAssignmentError if [:fulfilled, :rejected].include? @state
180194
set_state(success, value, reason)

lib/concurrent/scheduled_task.rb

Lines changed: 15 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -29,13 +29,13 @@ module Concurrent
2929
# `ScheduledTask` mixes in the [Obligation](Obligation) module thus giving it
3030
# "future" behavior. This includes the expected lifecycle states. `ScheduledTask`
3131
# has one additional state, however. While the task (block) is being executed the
32-
# state of the object will be `:in_progress`. This additional state is necessary
32+
# state of the object will be `:processing`. This additional state is necessary
3333
# because it has implications for task cancellation.
3434
#
3535
# **Cancellation**
3636
#
3737
# A `:pending` task can be cancelled using the `#cancel` method. A task in any
38-
# other state, including `:in_progress`, cannot be cancelled. The `#cancel`
38+
# other state, including `:processing`, cannot be cancelled. The `#cancel`
3939
# method returns a boolean indicating the success of the cancellation attempt.
4040
# A cancelled `ScheduledTask` cannot be restarted. It is immutable.
4141
#
@@ -220,8 +220,18 @@ def cancelled?
220220
# In the task execution in progress?
221221
#
222222
# @return [Boolean] true if the task is in the given state else false
223+
def processing?
224+
state == :processing
225+
end
226+
227+
# In the task execution in progress?
228+
#
229+
# @return [Boolean] true if the task is in the given state else false
230+
#
231+
# @deprecated
223232
def in_progress?
224-
state == :in_progress
233+
warn '[DEPRECATED] use #processing? instead'
234+
processing?
225235
end
226236

227237
# Cancel this task and prevent it from executing. A task can only be
@@ -244,14 +254,8 @@ def cancel
244254

245255
# @!visibility private
246256
def process_task
247-
if compare_and_set_state(:in_progress, :pending)
248-
success, val, reason = SafeTaskExecutor.new(@task).execute
249-
250-
mutex.synchronize do
251-
set_state(success, val, reason)
252-
event.set
253-
end
254-
257+
safe_execute(@task) do |success, val, reason|
258+
event.set
255259
time = Time.now
256260
observers.notify_and_delete_observers { [time, self.value, reason] }
257261
end

0 commit comments

Comments
 (0)