Skip to content

Commit e5af586

Browse files
Tidy up implementation.
1 parent 0461786 commit e5af586

File tree

6 files changed

+549
-66
lines changed

6 files changed

+549
-66
lines changed

lib/async/promise.rb

Lines changed: 72 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ module Async
1414
class Promise
1515
# Create a new promise.
1616
def initialize
17-
# nil = pending, true = success, :error = failure:
17+
# nil = pending, :completed = success, :failed = failure, :cancelled = cancelled:
1818
@resolved = nil
1919

2020
# Stores either the result value or the exception:
@@ -29,33 +29,55 @@ def initialize
2929

3030
# @returns [Boolean] Whether the promise has been resolved or rejected.
3131
def resolved?
32-
@mutex.synchronize { !!@resolved }
32+
@mutex.synchronize {!!@resolved}
33+
end
34+
35+
# @returns [Symbol | Nil] The internal resolved state (:completed, :failed, :cancelled, or nil if pending).
36+
# @private For internal use by Task.
37+
def resolved
38+
@mutex.synchronize {@resolved}
39+
end
40+
41+
# @returns [Boolean] Whether the promise has been cancelled.
42+
def cancelled?
43+
@mutex.synchronize {@resolved == :cancelled}
44+
end
45+
46+
# @returns [Boolean] Whether the promise failed with an exception.
47+
def failed?
48+
@mutex.synchronize {@resolved == :failed}
49+
end
50+
51+
# @returns [Boolean] Whether the promise has completed successfully.
52+
def completed?
53+
@mutex.synchronize {@resolved == :completed}
3354
end
3455

3556
# @returns [Boolean] Whether any fibers are currently waiting for this promise.
3657
def waiting?
37-
@mutex.synchronize { @waiting > 0 }
58+
@mutex.synchronize {@waiting > 0}
3859
end
3960

4061
# Artificially mark that someone is waiting (useful for suppressing warnings).
4162
# @private Internal use only.
4263
def suppress_warnings!
43-
@mutex.synchronize { @waiting += 1 }
64+
@mutex.synchronize {@waiting += 1}
4465
end
4566

4667
# Non-blocking access to the current value. Returns nil if not yet resolved.
47-
# Does not raise exceptions even if the promise was rejected.
68+
# Does not raise exceptions even if the promise was rejected or cancelled.
69+
# For resolved promises, returns the raw stored value (result, exception, or cancel exception).
4870
#
49-
# @returns [Object | Nil] The resolved value, rejected exception, or nil if pending.
71+
# @returns [Object | Nil] The stored value, or nil if pending.
5072
def value
51-
@mutex.synchronize { @resolved ? @value : nil }
73+
@mutex.synchronize {@resolved ? @value : nil}
5274
end
5375

5476
# Wait for the promise to be resolved and return the value.
5577
# If already resolved, returns immediately. If rejected, raises the stored exception.
5678
#
5779
# @returns [Object] The resolved value.
58-
# @raises [Exception] The rejected exception.
80+
# @raises [Exception] The rejected or cancelled exception.
5981
def wait
6082
@mutex.synchronize do
6183
# Increment waiting count:
@@ -66,10 +88,11 @@ def wait
6688
@condition.wait(@mutex) unless @resolved
6789

6890
# Return value or raise exception based on resolution type:
69-
if @resolved == :error
70-
raise @value
71-
else
91+
if @resolved == :completed
7292
return @value
93+
else
94+
# Both :failed and :cancelled store exceptions in @value
95+
raise @value
7396
end
7497
ensure
7598
# Decrement waiting count when done:
@@ -88,11 +111,13 @@ def resolve(value)
88111
return if @resolved
89112

90113
@value = value
91-
@resolved = true
114+
@resolved = :completed
92115

93116
# Wake up all waiting fibers:
94117
@condition.broadcast
95118
end
119+
120+
return value
96121
end
97122

98123
# Reject the promise with an exception.
@@ -105,11 +130,35 @@ def reject(exception)
105130
return if @resolved
106131

107132
@value = exception
108-
@resolved = :error
133+
@resolved = :failed
109134

110135
# Wake up all waiting fibers:
111136
@condition.broadcast
112137
end
138+
139+
return nil
140+
end
141+
142+
# Exception used to indicate cancellation.
143+
class Cancel < Exception
144+
end
145+
146+
# Cancel the promise, indicating cancellation.
147+
# All current and future waiters will receive nil.
148+
# Can only be called on pending promises - no-op if already resolved.
149+
def cancel(exception = Cancel.new("Promise was cancelled!"))
150+
@mutex.synchronize do
151+
# No-op if already in any final state
152+
return if @resolved
153+
154+
@value = exception
155+
@resolved = :cancelled
156+
157+
# Wake up all waiting fibers:
158+
@condition.broadcast
159+
end
160+
161+
return nil
113162
end
114163

115164
# Resolve the promise with the result of the block.
@@ -121,12 +170,17 @@ def fulfill(&block)
121170
raise "Promise already resolved!" if @resolved
122171

123172
begin
124-
result = yield
125-
resolve(result)
126-
return result
173+
return self.resolve(yield)
174+
rescue Cancel => exception
175+
return self.cancel(exception)
127176
rescue => error
128-
reject(error)
129-
raise # Re-raise so caller knows it failed
177+
return self.reject(error)
178+
rescue Exception => exception
179+
self.reject(exception)
180+
raise
181+
ensure
182+
# Handle non-local exits (throw, etc.) that bypass normal flow:
183+
self.resolve(nil) unless @resolved
130184
end
131185
end
132186

lib/async/task.rb

Lines changed: 43 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -64,19 +64,18 @@ def initialize(parent = Task.current?, finished: nil, **options, &block)
6464

6565
# These instance variables are critical to the state of the task.
6666
# In the initialized state, the @block should be set, but the @fiber should be nil.
67-
# In the running state, the @fiber should be set.
67+
# In the running state, the @fiber should be set, and @block should be nil.
6868
# In a finished state, the @block should be nil, and the @fiber should be nil.
6969
@block = block
7070
@fiber = nil
7171

72-
@status = :initialized
73-
@finished = Promise.new
72+
@promise = Promise.new
7473

7574
# Handle finished: parameter for backward compatibility:
7675
case finished
7776
when false
7877
# `finished: false` suppresses warnings for expected task failures:
79-
@finished.suppress_warnings!
78+
@promise.suppress_warnings!
8079
when nil
8180
# `finished: nil` is the default, no special handling:
8281
else
@@ -121,7 +120,7 @@ def annotation
121120

122121
# @returns [String] A description of the task and it's current status.
123122
def to_s
124-
"\#<#{self.description} (#{@status})>"
123+
"\#<#{self.description} (#{self.status})>"
125124
end
126125

127126
# @deprecated Prefer {Kernel#sleep} except when compatibility with `stable-v1` is required.
@@ -158,22 +157,22 @@ def finished?
158157

159158
# @returns [Boolean] Whether the task is running.
160159
def running?
161-
@status == :running
160+
self.alive?
162161
end
163162

164163
# @returns [Boolean] Whether the task failed with an exception.
165164
def failed?
166-
@status == :failed
165+
@promise.failed?
167166
end
168167

169168
# @returns [Boolean] Whether the task has been stopped.
170169
def stopped?
171-
@status == :stopped
170+
@promise.cancelled?
172171
end
173172

174173
# @returns [Boolean] Whether the task has completed execution and generated a result.
175174
def completed?
176-
@status == :completed
175+
@promise.completed?
177176
end
178177

179178
# Alias for {#completed?}.
@@ -182,20 +181,32 @@ def complete?
182181
end
183182

184183
# @attribute [Symbol] The status of the execution of the task, one of `:initialized`, `:running`, `:complete`, `:stopped` or `:failed`.
185-
attr :status
184+
def status
185+
case @promise.resolved
186+
when :cancelled
187+
:stopped
188+
when :failed
189+
:failed
190+
when :completed
191+
:completed
192+
when nil
193+
self.running? ? :running : :initialized
194+
end
195+
end
186196

187197
# Begin the execution of the task.
188198
#
189199
# @raises [RuntimeError] If the task is already running.
190200
def run(*arguments)
191-
if @status == :initialized
192-
@status = :running
201+
# Move from initialized to running by clearing @block
202+
if block = @block
203+
@block = nil
193204

194205
schedule do
195-
@block.call(self, *arguments)
206+
block.call(self, *arguments)
196207
rescue => error
197208
# I'm not completely happy with this overhead, but the alternative is to not log anything which makes debugging extremely difficult. Maybe we can introduce a debug wrapper which adds extra logging.
198-
if !@finished.waiting?
209+
unless @promise.waiting?
199210
warn(self, "Task may have ended with unhandled exception.", exception: error)
200211
end
201212

@@ -242,13 +253,23 @@ def wait
242253
raise "Cannot wait on own fiber!" if Fiber.current.equal?(@fiber)
243254

244255
# Wait for the task to complete - Promise handles all the complexity:
245-
# It will either return the result or raise the exception automatically
246-
@finished.wait
256+
begin
257+
@promise.wait
258+
rescue Promise::Cancel
259+
# For backward compatibility, stopped tasks return nil
260+
return nil
261+
end
247262
end
248263

249264
# Access the result of the task without waiting. May be nil if the task is not completed. Does not raise exceptions.
250265
def result
251-
@finished.value
266+
value = @promise.value
267+
# For backward compatibility, return nil for stopped tasks
268+
if @promise.cancelled?
269+
nil
270+
else
271+
value
272+
end
252273
end
253274

254275
# Stop the task and all of its children.
@@ -386,26 +407,21 @@ def finish!
386407

387408
# State transition into the completed state.
388409
def completed!(result)
389-
@status = :completed
390-
391410
# Resolve the promise with the result:
392-
@finished&.resolve(result)
411+
@promise&.resolve(result)
393412
end
394413

395414
# State transition into the failed state.
396415
def failed!(exception = false)
397-
@status = :failed
398-
399416
# Reject the promise with the exception:
400-
@finished&.reject(exception)
417+
@promise&.reject(exception)
401418
end
402419

403420
def stopped!
404421
# Console.info(self, status:) {"Task #{self} was stopped with #{@children&.size.inspect} children!"}
405-
@status = :stopped
406422

407-
# Resolve the promise with nil for stopped tasks:
408-
@finished&.resolve(nil)
423+
# Cancel the promise:
424+
@promise&.cancel
409425

410426
stopped = false
411427

@@ -450,7 +466,7 @@ def schedule(&block)
450466

451467
@fiber.async_task = self
452468

453-
self.root.resume(@fiber)
469+
(Fiber.scheduler || self.reactor).resume(@fiber)
454470
end
455471
end
456472
end

0 commit comments

Comments
 (0)