Skip to content

Commit 53c44d0

Browse files
committed
[WIP] Add combineLatestMany
1 parent 6dcb3ac commit 53c44d0

File tree

4 files changed

+1026
-0
lines changed

4 files changed

+1026
-0
lines changed
Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
//===----------------------------------------------------------------------===//
2+
//
3+
// This source file is part of the Swift Async Algorithms open source project
4+
//
5+
// Copyright (c) 2025 Apple Inc. and the Swift project authors
6+
// Licensed under Apache License v2.0 with Runtime Library Exception
7+
//
8+
// See https://swift.org/LICENSE.txt for license information
9+
//
10+
//===----------------------------------------------------------------------===//
11+
12+
/// Creates an asynchronous sequence that combines the latest values from many `AsyncSequence` types
13+
/// by emitting a tuple of the values. ``combineLatestMany(_:)`` only emits a value whenever any of the base `AsyncSequence`s
14+
/// emit a value (so long as each of the bases have emitted at least one value).
15+
///
16+
/// Finishes:
17+
/// ``combineLatestMany(_:)`` finishes when one of the bases finishes before emitting any value or
18+
/// when all bases finished.
19+
///
20+
/// Throws:
21+
/// ``combineLatestMany(_:)`` throws when one of the bases throws. If one of the bases threw any buffered and not yet consumed
22+
/// values will be dropped.
23+
@available(AsyncAlgorithms 1.1, *)
24+
public func combineLatestMany<Element: Sendable>(_ bases: [any CombineLatestManyBase<Element>]) -> AsyncCombineLatestManySequence<Element>
25+
{
26+
AsyncCombineLatestManySequence(bases)
27+
}
28+
29+
// TODO: Can we get rid of this typealias?
30+
@available(AsyncAlgorithms 1.1, *)
31+
public typealias CombineLatestManyBase<Element: Sendable> = AsyncSequence<Element, any Error> & Sendable
32+
33+
/// An `AsyncSequence` that combines the latest values produced from many asynchronous sequences into an asynchronous sequence of tuples.
34+
@available(AsyncAlgorithms 1.1, *)
35+
public struct AsyncCombineLatestManySequence<Element: Sendable>: AsyncSequence, Sendable {
36+
public typealias AsyncIterator = Iterator
37+
38+
typealias Base = AsyncSequence<Element, any Error> & Sendable
39+
let bases: [any Base]
40+
41+
init(_ bases: [any Base]) {
42+
self.bases = bases
43+
}
44+
45+
public func makeAsyncIterator() -> AsyncIterator {
46+
Iterator(
47+
storage: .init(self.bases)
48+
)
49+
}
50+
51+
public struct Iterator: AsyncIteratorProtocol {
52+
final class InternalClass {
53+
private let storage: CombineLatestManyStorage<Element>
54+
55+
fileprivate init(storage: CombineLatestManyStorage<Element>) {
56+
self.storage = storage
57+
}
58+
59+
deinit {
60+
self.storage.iteratorDeinitialized()
61+
}
62+
63+
func next() async throws -> [Element]? {
64+
guard let element = try await self.storage.next() else {
65+
return nil
66+
}
67+
68+
// This force unwrap is safe since there must be a third element.
69+
return element
70+
}
71+
}
72+
73+
let internalClass: InternalClass
74+
75+
fileprivate init(storage: CombineLatestManyStorage<Element>) {
76+
self.internalClass = InternalClass(storage: storage)
77+
}
78+
79+
public mutating func next() async throws -> [Element]? {
80+
try await self.internalClass.next()
81+
}
82+
}
83+
}
84+
85+
@available(*, unavailable)
86+
extension AsyncCombineLatestManySequence.Iterator: Sendable {}

0 commit comments

Comments
 (0)