Skip to content

Commit 3e5ca38

Browse files
authored
Merge pull request swiftlang#37974 from phausler/phausler/release5.5/asyncstream_rev1_1
[SE-0314] `AsyncStream` and `AsyncThrowingStream` Updates
2 parents cd3d8b3 + d80ac6b commit 3e5ca38

File tree

8 files changed

+2167
-0
lines changed

8 files changed

+2167
-0
lines changed
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
//===--- AsyncStream.cpp - Multi-resume locking interface -----------------===//
2+
//
3+
// This source file is part of the Swift.org open source project
4+
//
5+
// Copyright (c) 2014 - 2020 Apple Inc. and the Swift project authors
6+
// Licensed under Apache License v2.0 with Runtime Library Exception
7+
//
8+
// See https://swift.org/LICENSE.txt for license information
9+
// See https://swift.org/CONTRIBUTORS.txt for the list of Swift project authors
10+
//
11+
//===----------------------------------------------------------------------===//
12+
13+
#include "swift/Runtime/Mutex.h"
14+
15+
namespace swift {
16+
// return the size in words for the given mutex primitive
17+
extern "C"
18+
size_t _swift_async_stream_lock_size() {
19+
size_t words = sizeof(MutexHandle) / sizeof(void *);
20+
if (words < 1) { return 1; }
21+
return words;
22+
}
23+
24+
extern "C"
25+
void _swift_async_stream_lock_init(MutexHandle &lock) {
26+
MutexPlatformHelper::init(lock);
27+
}
28+
29+
extern "C"
30+
void _swift_async_stream_lock_lock(MutexHandle &lock) {
31+
MutexPlatformHelper::lock(lock);
32+
}
33+
34+
extern "C"
35+
void _swift_async_stream_lock_unlock(MutexHandle &lock) {
36+
MutexPlatformHelper::unlock(lock);
37+
}
38+
39+
};
Lines changed: 253 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,253 @@
1+
//===----------------------------------------------------------------------===//
2+
//
3+
// This source file is part of the Swift.org open source project
4+
//
5+
// Copyright (c) 2020-2021 Apple Inc. and the Swift project authors
6+
// Licensed under Apache License v2.0 with Runtime Library Exception
7+
//
8+
// See https://swift.org/LICENSE.txt for license information
9+
// See https://swift.org/CONTRIBUTORS.txt for the list of Swift project authors
10+
//
11+
//===----------------------------------------------------------------------===//
12+
13+
import Swift
14+
15+
/// An ordered, asynchronously generated sequence of elements.
16+
///
17+
/// AsyncStream is an interface type to adapt from code producing values to an
18+
/// asynchronous context iterating them. This is itended to be used to allow
19+
/// callback or delegation based APIs to participate with async/await.
20+
///
21+
/// When values are produced from a non async/await source there is a
22+
/// consideration that must be made on behavioral characteristics of how that
23+
/// production of values interacts with the iteration. AsyncStream offers a
24+
/// initialization strategy that provides a method of yielding values into
25+
/// iteration.
26+
///
27+
/// AsyncStream can be initialized with the option to buffer to a given limit.
28+
/// The default value for this limit is Int.max. The buffering is only for
29+
/// values that have yet to be consumed by iteration. Values can be yielded in
30+
/// case to the continuation passed into the build closure. That continuation
31+
/// is Sendable, in that it is intended to be used from concurrent contexts
32+
/// external to the iteration of the AsyncStream.
33+
///
34+
/// A trivial use case producing values from a detached task would work as such:
35+
///
36+
/// let digits = AsyncStream(Int.self) { continuation in
37+
/// detach {
38+
/// for digit in 0..<10 {
39+
/// continuation.yield(digit)
40+
/// }
41+
/// continuation.finish()
42+
/// }
43+
/// }
44+
///
45+
/// for await digit in digits {
46+
/// print(digit)
47+
/// }
48+
///
49+
@available(SwiftStdlib 5.5, *)
50+
public struct AsyncStream<Element> {
51+
public struct Continuation: Sendable {
52+
/// Indication of the type of termination informed to
53+
/// `onTermination`.
54+
public enum Termination {
55+
56+
/// The stream was finished via the `finish` method
57+
case finished
58+
59+
/// The stream was cancelled
60+
case cancelled
61+
}
62+
63+
/// A result of yielding values.
64+
public enum YieldResult {
65+
66+
/// When a value is successfully enqueued, either buffered
67+
/// or immediately consumed to resume a pending call to next
68+
/// and a count of remaining slots available in the buffer at
69+
/// the point in time of yielding. Note: transacting upon the
70+
/// remaining count is only valid when then calls to yield are
71+
/// mutually exclusive.
72+
case enqueued(remaining: Int)
73+
74+
/// Yielding resulted in not buffering an element because the
75+
/// buffer was full. The element is the dropped value.
76+
case dropped(Element)
77+
78+
/// Indication that the continuation was yielded when the
79+
/// stream was already in a terminal state: either by cancel or
80+
/// by finishing.
81+
case terminated
82+
}
83+
84+
/// A strategy that handles exhaustion of a buffer’s capacity.
85+
public enum BufferingPolicy {
86+
case unbounded
87+
88+
/// When the buffer is full, discard the newly received element.
89+
/// This enforces keeping the specified amount of oldest values.
90+
case bufferingOldest(Int)
91+
92+
/// When the buffer is full, discard the oldest element in the buffer.
93+
/// This enforces keeping the specified amount of newest values.
94+
case bufferingNewest(Int)
95+
}
96+
97+
let storage: _Storage
98+
99+
/// Resume the task awaiting the next iteration point by having it return
100+
/// nomally from its suspension point or buffer the value if no awaiting
101+
/// next iteration is active.
102+
///
103+
/// - Parameter value: The value to yield from the continuation.
104+
///
105+
/// This can be called more than once and returns to the caller immediately
106+
/// without blocking for any awaiting consuption from the iteration.
107+
@discardableResult
108+
public func yield(_ value: __owned Element) -> YieldResult {
109+
storage.yield(value)
110+
}
111+
112+
/// Resume the task awaiting the next iteration point by having it return
113+
/// nil which signifies the end of the iteration.
114+
///
115+
/// Calling this function more than once is idempotent; i.e. finishing more
116+
/// than once does not alter the state beyond the requirements of
117+
/// AsyncSequence; which claims that all values past a terminal state are
118+
/// nil.
119+
public func finish() {
120+
storage.finish()
121+
}
122+
123+
/// A callback to invoke when iteration of a AsyncStream is cancelled.
124+
///
125+
/// If an `onTermination` callback is set, when iteration of a AsyncStream is
126+
/// cancelled via task cancellation that callback is invoked. The callback
127+
/// is disposed of after any terminal state is reached.
128+
///
129+
/// Cancelling an active iteration will first invoke the onTermination
130+
/// callback and then resume yeilding nil. This means that any cleanup state
131+
/// can be emitted accordingly in the cancellation handler.
132+
public var onTermination: (@Sendable (Termination) -> Void)? {
133+
get {
134+
return storage.getOnTermination()
135+
}
136+
nonmutating set {
137+
storage.setOnTermination(newValue)
138+
}
139+
}
140+
}
141+
142+
let produce: () async -> Element?
143+
144+
/// Construct a AsyncStream buffering given an Element type.
145+
///
146+
/// - Parameter elementType: The type the AsyncStream will produce.
147+
/// - Parameter maxBufferedElements: The maximum number of elements to
148+
/// hold in the buffer past any checks for continuations being resumed.
149+
/// - Parameter build: The work associated with yielding values to the
150+
/// AsyncStream.
151+
///
152+
/// The maximum number of pending elements limited by dropping the oldest
153+
/// value when a new value comes in if the buffer would excede the limit
154+
/// placed upon it. By default this limit is unlimited.
155+
///
156+
/// The build closure passes in a Continuation which can be used in
157+
/// concurrent contexts. It is thread safe to send and finish; all calls are
158+
/// to the continuation are serialized, however calling this from multiple
159+
/// concurrent contexts could result in out of order delivery.
160+
public init(
161+
_ elementType: Element.Type = Element.self,
162+
bufferingPolicy limit: Continuation.BufferingPolicy = .unbounded,
163+
_ build: (Continuation) -> Void
164+
) {
165+
let storage: _Storage = .create(limit: limit)
166+
self.init(unfolding: storage.next)
167+
build(Continuation(storage: storage))
168+
}
169+
170+
171+
public init(
172+
unfolding produce: @escaping () async -> Element?,
173+
onCancel: (@Sendable () -> Void)? = nil
174+
) {
175+
let storage: _AsyncStreamCriticalStorage<Optional<() async -> Element?>>
176+
= .create(produce)
177+
self.produce = {
178+
return await Task.withCancellationHandler {
179+
storage.value = nil
180+
onCancel?()
181+
} operation: {
182+
guard let result = await storage.value?() else {
183+
storage.value = nil
184+
return nil
185+
}
186+
return result
187+
}
188+
}
189+
}
190+
}
191+
192+
@available(SwiftStdlib 5.5, *)
193+
extension AsyncStream: AsyncSequence {
194+
/// The asynchronous iterator for iterating a AsyncStream.
195+
///
196+
/// This type is specificially not Sendable. It is not intended to be used
197+
/// from multiple concurrent contexts. Any such case that next is invoked
198+
/// concurrently and contends with another call to next is a programmer error
199+
/// and will fatalError.
200+
public struct Iterator: AsyncIteratorProtocol {
201+
let produce: () async -> Element?
202+
203+
/// The next value from the AsyncStream.
204+
///
205+
/// When next returns nil this signifies the end of the AsyncStream. Any
206+
/// such case that next is invoked concurrently and contends with another
207+
/// call to next is a programmer error and will fatalError.
208+
///
209+
/// If the task this iterator is running in is canceled while next is
210+
/// awaiting a value, this will terminate the AsyncStream and next may
211+
/// return nil immediately (or will return nil on subseuqent calls)
212+
public mutating func next() async -> Element? {
213+
await produce()
214+
}
215+
}
216+
217+
/// Construct an iterator.
218+
public func makeAsyncIterator() -> Iterator {
219+
return Iterator(produce: produce)
220+
}
221+
}
222+
223+
@available(SwiftStdlib 5.5, *)
224+
extension AsyncStream.Continuation {
225+
/// Resume the task awaiting the next iteration point by having it return
226+
/// normally from its suspension point or buffer the value if no awaiting
227+
/// next iteration is active.
228+
///
229+
/// - Parameter result: A result to yield from the continuation.
230+
///
231+
/// This can be called more than once and returns to the caller immediately
232+
/// without blocking for any awaiting consuption from the iteration.
233+
@discardableResult
234+
public func yield(
235+
with result: Result<Element, Never>
236+
) -> YieldResult {
237+
switch result {
238+
case .success(let val):
239+
return storage.yield(val)
240+
}
241+
}
242+
243+
/// Resume the task awaiting the next iteration point by having it return
244+
/// normally from its suspension point or buffer the value if no awaiting
245+
/// next iteration is active where the `Element` is `Void`.
246+
///
247+
/// This can be called more than once and returns to the caller immediately
248+
/// without blocking for any awaiting consuption from the iteration.
249+
@discardableResult
250+
public func yield() -> YieldResult where Element == Void {
251+
return storage.yield(())
252+
}
253+
}

0 commit comments

Comments
 (0)