Skip to content

Commit 791b2a0

Browse files
FranzBuschktoso
andauthored
Implement convenience factory methods for Async[Throwing]Stream (#62968)
* Implement convenience factory methods for Async[Throwing]Stream This is the implementation for swiftlang/swift-evolution#1824 * Apply suggestions from code review Co-authored-by: Konrad `ktoso` Malawski <[email protected]> --------- Co-authored-by: Konrad `ktoso` Malawski <[email protected]>
1 parent 7324307 commit 791b2a0

File tree

3 files changed

+74
-14
lines changed

3 files changed

+74
-14
lines changed

stdlib/public/Concurrency/AsyncStream.swift

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -428,6 +428,27 @@ extension AsyncStream.Continuation {
428428
}
429429
}
430430

431+
@available(SwiftStdlib 5.1, *)
432+
extension AsyncStream {
433+
/// Initializes a new ``AsyncStream`` and an ``AsyncStream/Continuation``.
434+
///
435+
/// - Parameters:
436+
/// - elementType: The element type of the stream.
437+
/// - limit: The buffering policy that the stream should use.
438+
/// - Returns: A tuple containing the stream and its continuation. The continuation should be passed to the
439+
/// producer while the stream should be passed to the consumer.
440+
@available(SwiftStdlib 5.1, *)
441+
@backDeployed(before: SwiftStdlib 5.9)
442+
public static func makeStream(
443+
of elementType: Element.Type = Element.self,
444+
bufferingPolicy limit: Continuation.BufferingPolicy = .unbounded
445+
) -> (stream: AsyncStream<Element>, continuation: AsyncStream<Element>.Continuation) {
446+
var continuation: AsyncStream<Element>.Continuation!
447+
let stream = AsyncStream<Element>(bufferingPolicy: limit) { continuation = $0 }
448+
return (stream: stream, continuation: continuation!)
449+
}
450+
}
451+
431452
@available(SwiftStdlib 5.1, *)
432453
extension AsyncStream: @unchecked Sendable where Element: Sendable { }
433454
#else

stdlib/public/Concurrency/AsyncThrowingStream.swift

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -473,6 +473,29 @@ extension AsyncThrowingStream.Continuation {
473473
}
474474
}
475475

476+
@available(SwiftStdlib 5.1, *)
477+
extension AsyncThrowingStream {
478+
/// Initializes a new ``AsyncThrowingStream`` and an ``AsyncThrowingStream/Continuation``.
479+
///
480+
/// - Parameters:
481+
/// - elementType: The element type of the stream.
482+
/// - failureType: The failure type of the stream.
483+
/// - limit: The buffering policy that the stream should use.
484+
/// - Returns: A tuple containing the stream and its continuation. The continuation should be passed to the
485+
/// producer while the stream should be passed to the consumer.
486+
@available(SwiftStdlib 5.1, *)
487+
@backDeployed(before: SwiftStdlib 5.9)
488+
public static func makeStream(
489+
of elementType: Element.Type = Element.self,
490+
throwing failureType: Failure.Type = Failure.self,
491+
bufferingPolicy limit: Continuation.BufferingPolicy = .unbounded
492+
) -> (stream: AsyncThrowingStream<Element, Failure>, continuation: AsyncThrowingStream<Element, Failure>.Continuation) where Failure == Error {
493+
var continuation: AsyncThrowingStream<Element, Failure>.Continuation!
494+
let stream = AsyncThrowingStream<Element, Failure>(bufferingPolicy: limit) { continuation = $0 }
495+
return (stream: stream, continuation: continuation!)
496+
}
497+
}
498+
476499
@available(SwiftStdlib 5.1, *)
477500
extension AsyncThrowingStream: @unchecked Sendable where Element: Sendable { }
478501
#else

test/Concurrency/Runtime/async_stream.swift

Lines changed: 30 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -7,12 +7,8 @@
77
// rdar://78109470
88
// UNSUPPORTED: back_deployment_runtime
99

10-
// Race condition
11-
// REQUIRES: rdar78033828
12-
1310
import _Concurrency
1411
import StdlibUnittest
15-
import Dispatch
1612

1713
struct SomeError: Error, Equatable {
1814
var value = Int.random(in: 0..<100)
@@ -27,14 +23,34 @@ var tests = TestSuite("AsyncStream")
2723
var fulfilled = false
2824
}
2925

26+
tests.test("factory method") {
27+
let (stream, continuation) = AsyncStream.makeStream(of: String.self)
28+
continuation.yield("hello")
29+
30+
var iterator = stream.makeAsyncIterator()
31+
expectEqual(await iterator.next(), "hello")
32+
}
33+
34+
tests.test("throwing factory method") {
35+
let (stream, continuation) = AsyncThrowingStream.makeStream(of: String.self, throwing: Error.self)
36+
continuation.yield("hello")
37+
38+
var iterator = stream.makeAsyncIterator()
39+
do {
40+
expectEqual(try await iterator.next(), "hello")
41+
} catch {
42+
expectUnreachable("unexpected error thrown")
43+
}
44+
}
45+
3046
tests.test("yield with no awaiting next") {
31-
let series = AsyncStream(String.self) { continuation in
47+
_ = AsyncStream(String.self) { continuation in
3248
continuation.yield("hello")
3349
}
3450
}
3551

3652
tests.test("yield with no awaiting next throwing") {
37-
let series = AsyncThrowingStream(String.self) { continuation in
53+
_ = AsyncThrowingStream(String.self) { continuation in
3854
continuation.yield("hello")
3955
}
4056
}
@@ -122,7 +138,7 @@ var tests = TestSuite("AsyncStream")
122138
do {
123139
expectEqual(try await iterator.next(), "hello")
124140
expectEqual(try await iterator.next(), "world")
125-
try await iterator.next()
141+
_ = try await iterator.next()
126142
expectUnreachable("expected thrown error")
127143
} catch {
128144
if let failure = error as? SomeError {
@@ -134,15 +150,15 @@ var tests = TestSuite("AsyncStream")
134150
}
135151

136152
tests.test("yield with no awaiting next detached") {
137-
let series = AsyncStream(String.self) { continuation in
153+
_ = AsyncStream(String.self) { continuation in
138154
detach {
139155
continuation.yield("hello")
140156
}
141157
}
142158
}
143159

144160
tests.test("yield with no awaiting next detached throwing") {
145-
let series = AsyncThrowingStream(String.self) { continuation in
161+
_ = AsyncThrowingStream(String.self) { continuation in
146162
detach {
147163
continuation.yield("hello")
148164
}
@@ -246,7 +262,7 @@ var tests = TestSuite("AsyncStream")
246262
do {
247263
expectEqual(try await iterator.next(), "hello")
248264
expectEqual(try await iterator.next(), "world")
249-
try await iterator.next()
265+
_ = try await iterator.next()
250266
expectUnreachable("expected thrown error")
251267
} catch {
252268
if let failure = error as? SomeError {
@@ -337,7 +353,7 @@ var tests = TestSuite("AsyncStream")
337353
let expectation = Expectation()
338354

339355
func scopedLifetime(_ expectation: Expectation) {
340-
let series = AsyncStream(String.self) { continuation in
356+
_ = AsyncStream(String.self) { continuation in
341357
continuation.onTermination = { @Sendable _ in expectation.fulfilled = true }
342358
}
343359
}
@@ -351,7 +367,7 @@ var tests = TestSuite("AsyncStream")
351367
let expectation = Expectation()
352368

353369
func scopedLifetime(_ expectation: Expectation) {
354-
let series = AsyncStream(String.self) { continuation in
370+
_ = AsyncStream(String.self) { continuation in
355371
continuation.onTermination = { @Sendable _ in expectation.fulfilled = true }
356372
continuation.finish()
357373
}
@@ -366,7 +382,7 @@ var tests = TestSuite("AsyncStream")
366382
let expectation = Expectation()
367383

368384
func scopedLifetime(_ expectation: Expectation) {
369-
let series = AsyncStream(String.self) { continuation in
385+
_ = AsyncStream(String.self) { continuation in
370386
continuation.onTermination = { @Sendable terminal in
371387
switch terminal {
372388
case .cancelled:
@@ -386,7 +402,7 @@ var tests = TestSuite("AsyncStream")
386402
let expectation = Expectation()
387403

388404
func scopedLifetime(_ expectation: Expectation) {
389-
let series = AsyncThrowingStream(String.self) { continuation in
405+
_ = AsyncThrowingStream(String.self) { continuation in
390406
continuation.onTermination = { @Sendable terminal in
391407
switch terminal {
392408
case .cancelled:

0 commit comments

Comments
 (0)