Skip to content

Commit e681727

Browse files
authored
Merge pull request swiftlang#38582 from DougGregor/async-stream-catch-up
2 parents 54594aa + ea72a9e commit e681727

File tree

3 files changed

+371
-61
lines changed

3 files changed

+371
-61
lines changed

stdlib/public/Concurrency/AsyncStream.swift

Lines changed: 77 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -49,10 +49,50 @@ import Swift
4949
@available(SwiftStdlib 5.5, *)
5050
public struct AsyncStream<Element> {
5151
public struct Continuation: Sendable {
52+
/// Indication of the type of termination informed to
53+
/// `onTermination`.
5254
public enum Termination {
55+
56+
/// The stream was finished via the `finish` method
5357
case finished
58+
59+
/// The stream was cancelled
5460
case cancelled
5561
}
62+
63+
/// A result of yielding values.
64+
public enum YieldResult {
65+
66+
/// When a value is successfully enqueued, either buffered
67+
/// or immediately consumed to resume a pending call to next
68+
/// and a count of remaining slots available in the buffer at
69+
/// the point in time of yielding. Note: transacting upon the
70+
/// remaining count is only valid when then calls to yield are
71+
/// mutually exclusive.
72+
case enqueued(remaining: Int)
73+
74+
/// Yielding resulted in not buffering an element because the
75+
/// buffer was full. The element is the dropped value.
76+
case dropped(Element)
77+
78+
/// Indication that the continuation was yielded when the
79+
/// stream was already in a terminal state: either by cancel or
80+
/// by finishing.
81+
case terminated
82+
}
83+
84+
/// A strategy that handles exhaustion of a buffer’s capacity.
85+
public enum BufferingPolicy {
86+
case unbounded
87+
88+
/// When the buffer is full, discard the newly received element.
89+
/// This enforces keeping the specified amount of oldest values.
90+
case bufferingOldest(Int)
91+
92+
/// When the buffer is full, discard the oldest element in the buffer.
93+
/// This enforces keeping the specified amount of newest values.
94+
case bufferingNewest(Int)
95+
}
5696

5797
let storage: _Storage
5898

@@ -64,7 +104,8 @@ public struct AsyncStream<Element> {
64104
///
65105
/// This can be called more than once and returns to the caller immediately
66106
/// without blocking for any awaiting consumption from the iteration.
67-
public func yield(_ value: __owned Element) {
107+
@discardableResult
108+
public func yield(_ value: __owned Element) -> YieldResult {
68109
storage.yield(value)
69110
}
70111

@@ -105,7 +146,8 @@ public struct AsyncStream<Element> {
105146
/// - Parameter elementType: The type the AsyncStream will produce.
106147
/// - Parameter maxBufferedElements: The maximum number of elements to
107148
/// hold in the buffer past any checks for continuations being resumed.
108-
/// - Parameter build: The work associated with yielding values to the AsyncStream.
149+
/// - Parameter build: The work associated with yielding values to the
150+
/// AsyncStream.
109151
///
110152
/// The maximum number of pending elements limited by dropping the oldest
111153
/// value when a new value comes in if the buffer would exceed the limit
@@ -117,13 +159,34 @@ public struct AsyncStream<Element> {
117159
/// concurrent contexts could result in out of order delivery.
118160
public init(
119161
_ elementType: Element.Type = Element.self,
120-
maxBufferedElements limit: Int = .max,
162+
bufferingPolicy limit: Continuation.BufferingPolicy = .unbounded,
121163
_ build: (Continuation) -> Void
122164
) {
123165
let storage: _Storage = .create(limit: limit)
124-
produce = storage.next
166+
self.init(unfolding: storage.next)
125167
build(Continuation(storage: storage))
126168
}
169+
170+
171+
public init(
172+
unfolding produce: @escaping () async -> Element?,
173+
onCancel: (@Sendable () -> Void)? = nil
174+
) {
175+
let storage: _AsyncStreamCriticalStorage<Optional<() async -> Element?>>
176+
= .create(produce)
177+
self.produce = {
178+
return await Task.withCancellationHandler {
179+
storage.value = nil
180+
onCancel?()
181+
} operation: {
182+
guard let result = await storage.value?() else {
183+
storage.value = nil
184+
return nil
185+
}
186+
return result
187+
}
188+
}
189+
}
127190
}
128191

129192
@available(SwiftStdlib 5.5, *)
@@ -139,9 +202,9 @@ extension AsyncStream: AsyncSequence {
139202

140203
/// The next value from the AsyncStream.
141204
///
142-
/// When next returns nil this signifies the end of the AsyncStream. Any such
143-
/// case that next is invoked concurrently and contends with another call to
144-
/// next is a programmer error and will fatalError.
205+
/// When next returns nil this signifies the end of the AsyncStream. Any
206+
/// such case that next is invoked concurrently and contends with another
207+
/// call to next is a programmer error and will fatalError.
145208
///
146209
/// If the task this iterator is running in is canceled while next is
147210
/// awaiting a value, this will terminate the AsyncStream and next may return nil
@@ -167,12 +230,13 @@ extension AsyncStream.Continuation {
167230
///
168231
/// This can be called more than once and returns to the caller immediately
169232
/// without blocking for any awaiting consumption from the iteration.
233+
@discardableResult
170234
public func yield(
171235
with result: Result<Element, Never>
172-
) {
236+
) -> YieldResult {
173237
switch result {
174238
case .success(let val):
175-
storage.yield(val)
239+
return storage.yield(val)
176240
}
177241
}
178242

@@ -182,7 +246,9 @@ extension AsyncStream.Continuation {
182246
///
183247
/// This can be called more than once and returns to the caller immediately
184248
/// without blocking for any awaiting consumption from the iteration.
185-
public func yield() where Element == Void {
186-
storage.yield(())
249+
/// without blocking for any awaiting consuption from the iteration.
250+
@discardableResult
251+
public func yield() -> YieldResult where Element == Void {
252+
return storage.yield(())
187253
}
188254
}

0 commit comments

Comments
 (0)