Skip to content

Commit 914775a

Browse files
committed
More precise modelling of state transitions.
1 parent 2e1115a commit 914775a

File tree

5 files changed

+177
-62
lines changed

5 files changed

+177
-62
lines changed

lib/async/task.rb

Lines changed: 98 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,25 @@ def initialize(message = "execution expired")
3838
end
3939

4040
# Encapsulates the state of a running task and it's result.
41+
#
42+
# ```mermaid
43+
# stateDiagram-v2
44+
# [*] --> Initialized
45+
# Initialized --> Running : Run
46+
#
47+
# Running --> Completed : Return Value
48+
# Running --> Failed : Exception
49+
#
50+
# Completed --> [*]
51+
# Failed --> [*]
52+
#
53+
# Running --> Stopped : Stop
54+
# Stopped --> [*]
55+
# Completed --> Stopped : Stop
56+
# Failed --> Stopped : Stop
57+
# Initialized --> Stopped : Stop
58+
# ```
59+
#
4160
# @public Since `stable-v1`.
4261
class Task < Node
4362
# @deprecated With no replacement.
@@ -51,12 +70,16 @@ def self.yield
5170
def initialize(parent = Task.current?, finished: nil, **options, &block)
5271
super(parent, **options)
5372

73+
# These instance variables are critical to the state of the task.
74+
# In the initialized state, the @block should be set, but the @fiber should be nil.
75+
# In the running state, the @fiber should be set.
76+
# In a finished state, the @block should be nil, and the @fiber should be nil.
77+
@block = block
78+
@fiber = nil
79+
5480
@status = :initialized
5581
@result = nil
5682
@finished = finished
57-
58-
@block = block
59-
@fiber = nil
6083
end
6184

6285
def reactor
@@ -89,10 +112,40 @@ def yield
89112
# @attr fiber [Fiber] The fiber which is being used for the execution of this task.
90113
attr :fiber
91114

115+
# Whether the internal fiber is alive, i.e. it
92116
def alive?
93117
@fiber&.alive?
94118
end
95119

120+
# Whether we can remove this node from the reactor graph.
121+
# @returns [Boolean]
122+
def finished?
123+
# If the block is nil and the fiber is nil, it means the task has finished execution. This becomes true after `finish!` is called.
124+
super && @block.nil? && @fiber.nil?
125+
end
126+
127+
# Whether the task is running.
128+
# @returns [Boolean]
129+
def running?
130+
@status == :running
131+
end
132+
133+
def failed?
134+
@status == :failed
135+
end
136+
137+
# The task has been stopped
138+
def stopped?
139+
@status == :stopped
140+
end
141+
142+
# The task has completed execution and generated a result.
143+
def completed?
144+
@status == :completed
145+
end
146+
147+
alias complete? completed?
148+
96149
# @attr status [Symbol] The status of the execution of the fiber, one of `:initialized`, `:running`, `:complete`, `:stopped` or `:failed`.
97150
attr :status
98151

@@ -129,7 +182,8 @@ def async(*arguments, **options, &block)
129182
def wait
130183
raise "Cannot wait on own fiber!" if Fiber.current.equal?(@fiber)
131184

132-
if running?
185+
# `finish!` will set both of these to nil before signaling the condition:
186+
if @block || @fiber
133187
@finished ||= Condition.new
134188
@finished.wait
135189
end
@@ -151,17 +205,22 @@ def stop(later = false)
151205
return
152206
end
153207

154-
if self.running?
208+
# If the fiber is alive, we need to stop it:
209+
if @fiber&.alive?
155210
if self.current?
156211
if later
212+
# If the fiber is the current fiber and we want to stop it later, schedule it:
157213
Fiber.scheduler.push(Stop::Later.new(self))
158214
else
215+
# Otherwise, raise the exception directly:
159216
raise Stop, "Stopping current task!"
160217
end
161-
elsif @fiber&.alive?
218+
else
219+
# If the fiber is not curent, we can raise the exception directly:
162220
begin
163221
Fiber.scheduler.raise(@fiber, Stop)
164222
rescue FiberError
223+
# In some cases, this can cause a FiberError (it might be resumed already), so we schedule it to be stopped later:
165224
Fiber.scheduler.push(Stop::Later.new(self))
166225
end
167226
end
@@ -188,36 +247,34 @@ def current?
188247
self.equal?(Thread.current[:async_task])
189248
end
190249

191-
# Check if the task is running.
192-
# @returns [Boolean]
193-
def running?
194-
@status == :running
195-
end
196-
197-
# Whether we can remove this node from the reactor graph.
198-
# @returns [Boolean]
199-
def finished?
200-
super && @fiber.nil?
201-
end
202-
203-
def failed?
204-
@status == :failed
205-
end
250+
private
206251

207-
def stopped?
208-
@status == :stopped
252+
# Finish the current task, moving any children to the parent.
253+
def finish!
254+
# Don't hold references to the fiber or block after the task has finished:
255+
@fiber = nil
256+
@block = nil # If some how we went directly from initialized to finished.
257+
258+
# Attempt to remove this node from the task tree.
259+
consume
260+
261+
# If this task was being used as a future, signal completion here:
262+
if @finished
263+
@finished.signal(self)
264+
@finished = nil
265+
end
209266
end
210267

211-
def complete?
212-
@status == :complete
268+
# State transition into the completed state.
269+
def completed!(result)
270+
@result = result
271+
@status = :completed
213272
end
214273

215-
private
216-
217274
# This is a very tricky aspect of tasks to get right. I've modelled it after `Thread` but it's slightly different in that the exception can propagate back up through the reactor. If the user writes code which raises an exception, that exception should always be visible, i.e. cause a failure. If it's not visible, such code fails silently and can be very difficult to debug.
218-
def fail!(exception = false, propagate = true)
219-
@status = :failed
275+
def failed!(exception = false, propagate = true)
220276
@result = exception
277+
@status = :failed
221278

222279
if exception
223280
if propagate
@@ -231,27 +288,33 @@ def fail!(exception = false, propagate = true)
231288
end
232289
end
233290

234-
def stop!
291+
def stopped!
235292
# Console.logger.info(self, self.annotation) {"Task was stopped with #{@children&.size.inspect} children!"}
236293
@status = :stopped
237294

295+
# We are not running, but children might be so we should stop them:
238296
stop_children(true)
239297
end
240298

299+
def stop!
300+
stopped!
301+
302+
finish!
303+
end
304+
241305
def schedule(&block)
242306
@fiber = Fiber.new do
243307
set!
244308

245309
begin
246-
@result = yield
247-
@status = :complete
310+
completed!(yield)
248311
# Console.logger.debug(self) {"Task was completed with #{@children.size} children!"}
249312
rescue Stop
250-
stop!
313+
stopped!
251314
rescue StandardError => error
252-
fail!(error, false)
315+
failed!(error, false)
253316
rescue Exception => exception
254-
fail!(exception, true)
317+
failed!(exception, true)
255318
ensure
256319
# Console.logger.info(self) {"Task ensure $! = #{$!} with #{@children&.size.inspect} children!"}
257320
finish!
@@ -261,20 +324,6 @@ def schedule(&block)
261324
self.root.resume(@fiber)
262325
end
263326

264-
# Finish the current task, moving any children to the parent.
265-
def finish!
266-
# Allow the fiber to be recycled.
267-
@fiber = nil
268-
269-
# Attempt to remove this node from the task tree.
270-
consume
271-
272-
# If this task was being used as a future, signal completion here:
273-
if @finished
274-
@finished.signal(self)
275-
end
276-
end
277-
278327
# Set the current fiber's `:async_task` to this task.
279328
def set!
280329
# This is actually fiber-local:

test/async/condition.rb

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,12 +19,12 @@
1919
condition.wait
2020
end
2121

22-
expect(task.status).to be == :running
22+
expect(task).to be(:running?)
2323

2424
# This will cause the task to exit:
2525
condition.signal
2626

27-
expect(task.status).to be == :complete
27+
expect(task).to be(:completed?)
2828
end
2929

3030
it 'can stop nested task' do
@@ -49,7 +49,7 @@
4949
producer.wait
5050

5151
expect(producer.status).to be == :stopped
52-
expect(consumer.status).to be == :complete
52+
expect(consumer.status).to be == :completed
5353
end
5454

5555
it_behaves_like ACondition

test/async/notification.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@
3535
reactor.yield
3636
sequence << :finished
3737

38-
expect(task.status).to be == :complete
38+
expect(task.status).to be == :completed
3939

4040
expect(sequence).to be == [
4141
:waiting,

test/async/reactor.rb

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,17 @@
4343
reactor.run_once
4444
reactor.close
4545
end
46+
47+
it "terminates nested tasks" do
48+
top = reactor.async do |parent|
49+
parent.async do |child|
50+
child.sleep(1)
51+
end
52+
end
53+
54+
reactor.run_once
55+
reactor.close
56+
end
4657
end
4758

4859
with '#run' do

test/async/task.rb

Lines changed: 64 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,11 @@
1212
describe Async::Task do
1313
let(:reactor) {Async::Reactor.new}
1414

15+
def after
16+
reactor.close
17+
super
18+
end
19+
1520
with '.yield' do
1621
it "can yield back to scheduler" do
1722
state = nil
@@ -173,6 +178,53 @@
173178
end
174179

175180
with '#stop' do
181+
it "can't stop finished tasks" do
182+
task = reactor.async{}
183+
184+
expect(task).to be(:finished?)
185+
expect(task).to be(:completed?)
186+
187+
task.stop
188+
189+
expect(task).to be(:finished?)
190+
expect(task.status).to be == :stopped
191+
end
192+
193+
it "can stop a task in the initialized state" do
194+
task = Async::Task.new(reactor) do |task|
195+
sleep
196+
end
197+
198+
expect(task.status).to be == :initialized
199+
expect(reactor.children).not.to be(:empty?)
200+
expect(task).not.to be(:finished?)
201+
202+
task.stop
203+
204+
expect(task.status).to be == :stopped
205+
expect(reactor.children).to be(:empty?)
206+
end
207+
208+
it "can stop a task in the initialized state with children" do
209+
parent = Async::Task.new(reactor) do |task|
210+
sleep
211+
end
212+
213+
child = parent.async do |task|
214+
sleep
215+
end
216+
217+
expect(parent.status).to be == :initialized
218+
# expect(child.status).to be == :running
219+
220+
parent.stop
221+
222+
expect(parent.status).to be == :stopped
223+
expect(child.status).to be == :stopped
224+
225+
expect(reactor.children).to be(:empty?)
226+
end
227+
176228
it "can be stopped" do
177229
state = nil
178230

@@ -625,15 +677,17 @@ def sleep_forever
625677
end
626678

627679
it 'does not wait for task completion' do
628-
reactor.async do
629-
task = reactor.async do |task|
630-
task.sleep(1)
631-
end
632-
633-
expect(task.result).to be_nil
634-
635-
task.stop
680+
task = reactor.async do |task|
681+
task.sleep(1)
636682
end
683+
684+
expect(task.result).to be_nil
685+
686+
Console.logger.debug(self) {"Stopping task..."}
687+
task.stop
688+
689+
expect(task.result).to be_nil
690+
expect(task).to be(:stopped?)
637691
end
638692
end
639693

@@ -671,8 +725,9 @@ def sleep_forever
671725
end
672726

673727
child.stop
728+
674729
expect(child).to be(:stopped?)
675-
end
730+
end.wait
676731
end
677732
end
678733
end

0 commit comments

Comments
 (0)