Skip to content

Commit 5a4946b

Browse files
committed
Fun
1 parent 67a2795 commit 5a4946b

File tree

5 files changed

+75
-75
lines changed

5 files changed

+75
-75
lines changed

Sources/AsyncAlgorithms/CombineLatest/AsyncCombineLatestManySequence.swift

Lines changed: 18 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -21,21 +21,18 @@
2121
/// ``combineLatestMany(_:)`` throws when one of the bases throws. If one of the bases threw any buffered and not yet consumed
2222
/// values will be dropped.
2323
@available(AsyncAlgorithms 1.1, *)
24-
public func combineLatestMany<Element: Sendable>(_ bases: [any CombineLatestManyBase<Element>]) -> AsyncCombineLatestManySequence<Element>
25-
{
26-
AsyncCombineLatestManySequence(bases)
24+
public func combineLatestMany<Element: Sendable, Failure: Error>(
25+
_ bases: [any (AsyncSequence<Element, Failure> & Sendable)]
26+
) -> some AsyncSequence<[Element], Failure> & Sendable {
27+
AsyncCombineLatestManySequence<Element, Failure>(bases)
2728
}
2829

29-
// TODO: Can we get rid of this typealias?
30-
@available(AsyncAlgorithms 1.1, *)
31-
public typealias CombineLatestManyBase<Element: Sendable> = AsyncSequence<Element, any Error> & Sendable
32-
3330
/// An `AsyncSequence` that combines the latest values produced from many asynchronous sequences into an asynchronous sequence of tuples.
3431
@available(AsyncAlgorithms 1.1, *)
35-
public struct AsyncCombineLatestManySequence<Element: Sendable>: AsyncSequence, Sendable {
32+
public struct AsyncCombineLatestManySequence<Element: Sendable, Failure: Error>: AsyncSequence, Sendable {
3633
public typealias AsyncIterator = Iterator
3734

38-
typealias Base = AsyncSequence<Element, any Error> & Sendable
35+
typealias Base = AsyncSequence<Element, Failure> & Sendable
3936
let bases: [any Base]
4037

4138
init(_ bases: [any Base]) {
@@ -50,33 +47,34 @@ public struct AsyncCombineLatestManySequence<Element: Sendable>: AsyncSequence,
5047

5148
public struct Iterator: AsyncIteratorProtocol {
5249
final class InternalClass {
53-
private let storage: CombineLatestManyStorage<Element>
50+
private let storage: CombineLatestManyStorage<Element, Failure>
5451

55-
fileprivate init(storage: CombineLatestManyStorage<Element>) {
52+
fileprivate init(storage: CombineLatestManyStorage<Element, Failure>) {
5653
self.storage = storage
5754
}
5855

5956
deinit {
6057
self.storage.iteratorDeinitialized()
6158
}
6259

63-
func next() async throws -> [Element]? {
64-
guard let element = try await self.storage.next() else {
65-
return nil
66-
}
67-
68-
// This force unwrap is safe since there must be a third element.
69-
return element
60+
func next() async throws(Failure) -> [Element]? {
61+
fatalError()
62+
// guard let element = try await self.storage.next() else {
63+
// return nil
64+
// }
65+
//
66+
// // This force unwrap is safe since there must be a third element.
67+
// return element
7068
}
7169
}
7270

7371
let internalClass: InternalClass
7472

75-
fileprivate init(storage: CombineLatestManyStorage<Element>) {
73+
fileprivate init(storage: CombineLatestManyStorage<Element, Failure>) {
7674
self.internalClass = InternalClass(storage: storage)
7775
}
7876

79-
public mutating func next() async throws -> [Element]? {
77+
public mutating func next() async throws(Failure) -> [Element]? {
8078
try await self.internalClass.next()
8179
}
8280
}

Sources/AsyncAlgorithms/CombineLatest/CombineLatestManyStateMachine.swift

Lines changed: 24 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -13,17 +13,17 @@ import DequeModule
1313

1414
/// State machine for combine latest
1515
@available(AsyncAlgorithms 1.1, *)
16-
struct CombineLatestManyStateMachine<Element: Sendable>: Sendable {
16+
struct CombineLatestManyStateMachine<Element: Sendable, Failure: Error>: Sendable {
1717
typealias DownstreamContinuation = UnsafeContinuation<
18-
Result<[Element]?, Error>, Never
18+
Result<[Element]?, Failure>, Never
1919
>
20-
typealias Base = AsyncSequence<Element, any Error> & Sendable
20+
typealias Base = AsyncSequence<Element, Failure> & Sendable
2121

2222
private enum State: Sendable {
2323
/// Small wrapper for the state of an upstream sequence.
2424
struct Upstream: Sendable {
2525
/// The upstream continuation.
26-
var continuation: UnsafeContinuation<Void, Error>?
26+
var continuation: UnsafeContinuation<Void, Never>?
2727
/// The produced upstream element.
2828
var element: Element?
2929
/// Indicates wether the upstream finished/threw already
@@ -53,7 +53,7 @@ struct CombineLatestManyStateMachine<Element: Sendable>: Sendable {
5353
)
5454

5555
case upstreamThrew(
56-
error: Error
56+
error: Failure
5757
)
5858

5959
/// The state once the downstream consumer stopped, i.e. by dropping all references
@@ -77,10 +77,10 @@ struct CombineLatestManyStateMachine<Element: Sendable>: Sendable {
7777
/// Actions returned by `iteratorDeinitialized()`.
7878
enum IteratorDeinitializedAction {
7979
/// Indicates that the `Task` needs to be cancelled and
80-
/// the upstream continuations need to be resumed with a `CancellationError`.
80+
/// the upstream continuations need to be resumed with a `CancellationFailure`.
8181
case cancelTaskAndUpstreamContinuations(
8282
task: Task<Void, Never>,
83-
upstreamContinuations: [UnsafeContinuation<Void, Error>]
83+
upstreamContinuations: [UnsafeContinuation<Void, Never>]
8484
)
8585
}
8686

@@ -151,18 +151,13 @@ struct CombineLatestManyStateMachine<Element: Sendable>: Sendable {
151151
enum ChildTaskSuspendedAction {
152152
/// Indicates that the continuation should be resumed which will lead to calling `next` on the upstream.
153153
case resumeContinuation(
154-
upstreamContinuation: UnsafeContinuation<Void, Error>
155-
)
156-
/// Indicates that the continuation should be resumed with an Error because another upstream sequence threw.
157-
case resumeContinuationWithError(
158-
upstreamContinuation: UnsafeContinuation<Void, Error>,
159-
error: Error
154+
upstreamContinuation: UnsafeContinuation<Void, Never>
160155
)
161156
}
162157

163158
mutating func childTaskSuspended(
164159
baseIndex: Int,
165-
continuation: UnsafeContinuation<Void, Error>
160+
continuation: UnsafeContinuation<Void, Never>
166161
) -> ChildTaskSuspendedAction? {
167162
switch self.state {
168163
case .initial:
@@ -193,9 +188,8 @@ struct CombineLatestManyStateMachine<Element: Sendable>: Sendable {
193188
// Since cancellation is cooperative it might be that child tasks are still getting
194189
// suspended even though we already cancelled them. We must tolerate this and just resume
195190
// the continuation with an error.
196-
return .resumeContinuationWithError(
197-
upstreamContinuation: continuation,
198-
error: CancellationError()
191+
return .resumeContinuation(
192+
upstreamContinuation: continuation
199193
)
200194

201195
case .modifying:
@@ -208,7 +202,7 @@ struct CombineLatestManyStateMachine<Element: Sendable>: Sendable {
208202
/// Indicates that the downstream continuation should be resumed with the element.
209203
case resumeContinuation(
210204
downstreamContinuation: DownstreamContinuation,
211-
result: Result<[Element]?, Error>
205+
result: Result<[Element]?, Failure>
212206
)
213207
}
214208

@@ -293,14 +287,14 @@ struct CombineLatestManyStateMachine<Element: Sendable>: Sendable {
293287
/// Indicates the task and the upstream continuations should be cancelled.
294288
case cancelTaskAndUpstreamContinuations(
295289
task: Task<Void, Never>,
296-
upstreamContinuations: [UnsafeContinuation<Void, Error>]
290+
upstreamContinuations: [UnsafeContinuation<Void, Never>]
297291
)
298292
/// Indicates that the downstream continuation should be resumed with `nil` and
299293
/// the task and the upstream continuations should be cancelled.
300294
case resumeContinuationWithNilAndCancelTaskAndUpstreamContinuations(
301295
downstreamContinuation: DownstreamContinuation,
302296
task: Task<Void, Never>,
303-
upstreamContinuations: [UnsafeContinuation<Void, Error>]
297+
upstreamContinuations: [UnsafeContinuation<Void, Never>]
304298
)
305299
}
306300

@@ -394,19 +388,19 @@ struct CombineLatestManyStateMachine<Element: Sendable>: Sendable {
394388
/// Indicates the task and the upstream continuations should be cancelled.
395389
case cancelTaskAndUpstreamContinuations(
396390
task: Task<Void, Never>,
397-
upstreamContinuations: [UnsafeContinuation<Void, Error>]
391+
upstreamContinuations: [UnsafeContinuation<Void, Never>]
398392
)
399393
/// Indicates that the downstream continuation should be resumed with the `error` and
400394
/// the task and the upstream continuations should be cancelled.
401-
case resumeContinuationWithErrorAndCancelTaskAndUpstreamContinuations(
395+
case resumeContinuationWithFailureAndCancelTaskAndUpstreamContinuations(
402396
downstreamContinuation: DownstreamContinuation,
403-
error: Error,
397+
error: Failure,
404398
task: Task<Void, Never>,
405-
upstreamContinuations: [UnsafeContinuation<Void, Error>]
399+
upstreamContinuations: [UnsafeContinuation<Void, Never>]
406400
)
407401
}
408402

409-
mutating func upstreamThrew(_ error: Error) -> UpstreamThrewAction? {
403+
mutating func upstreamThrew(_ error: Failure) -> UpstreamThrewAction? {
410404
switch self.state {
411405
case .initial:
412406
preconditionFailure("Internal inconsistency current state \(self.state) and received upstreamThrew()")
@@ -433,7 +427,7 @@ struct CombineLatestManyStateMachine<Element: Sendable>: Sendable {
433427
// the upstream work.
434428
self.state = .finished
435429

436-
return .resumeContinuationWithErrorAndCancelTaskAndUpstreamContinuations(
430+
return .resumeContinuationWithFailureAndCancelTaskAndUpstreamContinuations(
437431
downstreamContinuation: downstreamContinuation,
438432
error: error,
439433
task: task,
@@ -456,12 +450,12 @@ struct CombineLatestManyStateMachine<Element: Sendable>: Sendable {
456450
case resumeDownstreamContinuationWithNilAndCancelTaskAndUpstreamContinuations(
457451
downstreamContinuation: DownstreamContinuation,
458452
task: Task<Void, Never>,
459-
upstreamContinuations: [UnsafeContinuation<Void, Error>]
453+
upstreamContinuations: [UnsafeContinuation<Void, Never>]
460454
)
461455
/// Indicates that the task and the upstream continuations should be cancelled.
462456
case cancelTaskAndUpstreamContinuations(
463457
task: Task<Void, Never>,
464-
upstreamContinuations: [UnsafeContinuation<Void, Error>]
458+
upstreamContinuations: [UnsafeContinuation<Void, Never>]
465459
)
466460
}
467461

@@ -515,12 +509,12 @@ struct CombineLatestManyStateMachine<Element: Sendable>: Sendable {
515509
case startTask([any Base])
516510
/// Indicates that all upstream continuations should be resumed.
517511
case resumeUpstreamContinuations(
518-
upstreamContinuation: [UnsafeContinuation<Void, Error>]
512+
upstreamContinuation: [UnsafeContinuation<Void, Never>]
519513
)
520514
/// Indicates that the downstream continuation should be resumed with the result.
521515
case resumeContinuation(
522516
downstreamContinuation: DownstreamContinuation,
523-
result: Result<[Element]?, Error>
517+
result: Result<[Element]?, Failure>
524518
)
525519
/// Indicates that the downstream continuation should be resumed with `nil`.
526520
case resumeDownstreamContinuationWithNil(DownstreamContinuation)

Sources/AsyncAlgorithms/CombineLatest/CombineLatestManyStorage.swift

Lines changed: 30 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,8 @@
1010
//===----------------------------------------------------------------------===//
1111

1212
@available(AsyncAlgorithms 1.1, *)
13-
final class CombineLatestManyStorage<Element: Sendable>: Sendable {
14-
typealias StateMachine = CombineLatestManyStateMachine<Element>
13+
final class CombineLatestManyStorage<Element: Sendable, Failure: Error>: Sendable {
14+
typealias StateMachine = CombineLatestManyStateMachine<Element, Failure>
1515

1616
private let stateMachine: ManagedCriticalState<StateMachine>
1717

@@ -27,9 +27,8 @@ final class CombineLatestManyStorage<Element: Sendable>: Sendable {
2727
let task,
2828
let upstreamContinuation
2929
):
30-
upstreamContinuation.forEach { $0.resume(throwing: CancellationError()) }
31-
3230
task.cancel()
31+
upstreamContinuation.forEach { $0.resume() }
3332

3433
case .none:
3534
break
@@ -97,14 +96,14 @@ final class CombineLatestManyStorage<Element: Sendable>: Sendable {
9796
let task,
9897
let upstreamContinuations
9998
):
100-
upstreamContinuations.forEach { $0.resume(throwing: CancellationError()) }
101-
task.cancel()
99+
task.cancel()
100+
upstreamContinuations.forEach { $0.resume() }
102101

103102
downstreamContinuation.resume(returning: .success(nil))
104103

105104
case .cancelTaskAndUpstreamContinuations(let task, let upstreamContinuations):
106-
upstreamContinuations.forEach { $0.resume(throwing: CancellationError()) }
107-
task.cancel()
105+
task.cancel()
106+
upstreamContinuations.forEach { $0.resume() }
108107

109108
case .none:
110109
break
@@ -114,13 +113,13 @@ final class CombineLatestManyStorage<Element: Sendable>: Sendable {
114113

115114
private func startTask(
116115
stateMachine: inout StateMachine,
117-
bases: [any CombineLatestManyStateMachine<Element>.Base],
116+
bases: [any (AsyncSequence<Element, Failure> & Sendable)],
118117
downstreamContinuation: StateMachine.DownstreamContinuation
119118
) {
120119
// This creates a new `Task` that is iterating the upstream
121120
// sequences. We must store it to cancel it at the right times.
122121
let task = Task {
123-
await withThrowingTaskGroup(of: Void.self) { group in
122+
await withTaskGroup(of: Result<Void, Failure>.self) { group in
124123
// For each upstream sequence we are adding a child task that
125124
// is consuming the upstream sequence
126125
for (baseIndex, base) in bases.enumerated() {
@@ -131,7 +130,7 @@ final class CombineLatestManyStorage<Element: Sendable>: Sendable {
131130
// We are creating a continuation before requesting the next
132131
// element from upstream. This continuation is only resumed
133132
// if the downstream consumer called `next` to signal his demand.
134-
try await withUnsafeThrowingContinuation { continuation in
133+
await withUnsafeContinuation { continuation in
135134
let action = self.stateMachine.withCriticalRegion { stateMachine in
136135
stateMachine.childTaskSuspended(baseIndex: baseIndex, continuation: continuation)
137136
}
@@ -140,15 +139,19 @@ final class CombineLatestManyStorage<Element: Sendable>: Sendable {
140139
case .resumeContinuation(let upstreamContinuation):
141140
upstreamContinuation.resume()
142141

143-
case .resumeContinuationWithError(let upstreamContinuation, let error):
144-
upstreamContinuation.resume(throwing: error)
145-
146142
case .none:
147143
break
148144
}
149145
}
146+
147+
let element: Element?
148+
do {
149+
element = try await baseIterator.next(isolation: nil)
150+
} catch {
151+
return .failure(error as! Failure) // Looks like a compiler bug
152+
}
150153

151-
if let element = try await baseIterator.next() {
154+
if let element = element {
152155
let action = self.stateMachine.withCriticalRegion { stateMachine in
153156
stateMachine.elementProduced(value: element, atBaseIndex: baseIndex)
154157
}
@@ -172,15 +175,15 @@ final class CombineLatestManyStorage<Element: Sendable>: Sendable {
172175
let upstreamContinuations
173176
):
174177

175-
upstreamContinuations.forEach { $0.resume(throwing: CancellationError()) }
176178
task.cancel()
179+
upstreamContinuations.forEach { $0.resume() }
177180

178181
downstreamContinuation.resume(returning: .success(nil))
179182
break loop
180183

181184
case .cancelTaskAndUpstreamContinuations(let task, let upstreamContinuations):
182-
upstreamContinuations.forEach { $0.resume(throwing: CancellationError()) }
183185
task.cancel()
186+
upstreamContinuations.forEach { $0.resume() }
184187

185188
break loop
186189

@@ -189,30 +192,34 @@ final class CombineLatestManyStorage<Element: Sendable>: Sendable {
189192
}
190193
}
191194
}
195+
return .success(())
192196
}
193197
}
194198

195199
while !group.isEmpty {
196-
do {
197-
try await group.next()
198-
} catch {
200+
let result = await group.next()
201+
202+
switch result {
203+
case .success, .none:
204+
break
205+
case .failure(let error):
199206
// One of the upstream sequences threw an error
200207
let action = self.stateMachine.withCriticalRegion { stateMachine in
201208
stateMachine.upstreamThrew(error)
202209
}
203210

204211
switch action {
205212
case .cancelTaskAndUpstreamContinuations(let task, let upstreamContinuations):
206-
upstreamContinuations.forEach { $0.resume(throwing: CancellationError()) }
207213
task.cancel()
208-
case .resumeContinuationWithErrorAndCancelTaskAndUpstreamContinuations(
214+
upstreamContinuations.forEach { $0.resume() }
215+
case .resumeContinuationWithFailureAndCancelTaskAndUpstreamContinuations(
209216
let downstreamContinuation,
210217
let error,
211218
let task,
212219
let upstreamContinuations
213220
):
214-
upstreamContinuations.forEach { $0.resume(throwing: CancellationError()) }
215221
task.cancel()
222+
upstreamContinuations.forEach { $0.resume() }
216223
downstreamContinuation.resume(returning: .failure(error))
217224
case .none:
218225
break

Tests/AsyncAlgorithmsTests/Support/GatedSequence.swift

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
//===----------------------------------------------------------------------===//
1111

1212
public struct GatedSequence<Element> {
13+
typealias Failure = Never
1314
let elements: [Element]
1415
let gates: [Gate]
1516
var index = 0

0 commit comments

Comments
 (0)