diff --git a/Sources/AsyncAlgorithms/CombineLatest/AsyncCombineLatestManySequence.swift b/Sources/AsyncAlgorithms/CombineLatest/AsyncCombineLatestManySequence.swift new file mode 100644 index 00000000..1850e7c6 --- /dev/null +++ b/Sources/AsyncAlgorithms/CombineLatest/AsyncCombineLatestManySequence.swift @@ -0,0 +1,87 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the Swift Async Algorithms open source project +// +// Copyright (c) 2025 Apple Inc. and the Swift project authors +// Licensed under Apache License v2.0 with Runtime Library Exception +// +// See https://swift.org/LICENSE.txt for license information +// +//===----------------------------------------------------------------------===// + +#if compiler(>=6.2) + +/// Creates an asynchronous sequence that combines the latest values from many `AsyncSequence` types +/// by emitting a tuple of the values. ``combineLatestMany(_:)`` only emits a value whenever any of the base `AsyncSequence`s +/// emit a value (so long as each of the bases have emitted at least one value). +/// +/// Finishes: +/// ``combineLatestMany(_:)`` finishes when one of the bases finishes before emitting any value or +/// when all bases finished. +/// +/// Throws: +/// ``combineLatestMany(_:)`` throws when one of the bases throws. If one of the bases threw any buffered and not yet consumed +/// values will be dropped. +@available(AsyncAlgorithms 1.1, *) +public func combineLatestMany( + _ bases: [any (AsyncSequence & Sendable)] +) -> some AsyncSequence<[Element], Failure> & Sendable { + AsyncCombineLatestManySequence(bases) +} + +/// An `AsyncSequence` that combines the latest values produced from many asynchronous sequences into an asynchronous sequence of tuples. +@available(AsyncAlgorithms 1.1, *) +public struct AsyncCombineLatestManySequence: AsyncSequence, Sendable { + public typealias AsyncIterator = Iterator + + typealias Base = AsyncSequence & Sendable + let bases: [any Base] + + init(_ bases: [any Base]) { + self.bases = bases + } + + public func makeAsyncIterator() -> AsyncIterator { + Iterator( + storage: .init(self.bases) + ) + } + + public struct Iterator: AsyncIteratorProtocol { + final class InternalClass { + private let storage: CombineLatestManyStorage + + fileprivate init(storage: CombineLatestManyStorage) { + self.storage = storage + } + + deinit { + self.storage.iteratorDeinitialized() + } + + func next() async throws(Failure) -> [Element]? { + guard let element = try await self.storage.next() else { + return nil + } + + // This force unwrap is safe since there must be a third element. + return element + } + } + + let internalClass: InternalClass + + fileprivate init(storage: CombineLatestManyStorage) { + self.internalClass = InternalClass(storage: storage) + } + + public mutating func next() async throws(Failure) -> [Element]? { + try await self.internalClass.next() + } + } +} + +@available(*, unavailable) +extension AsyncCombineLatestManySequence.Iterator: Sendable {} + +#endif diff --git a/Sources/AsyncAlgorithms/CombineLatest/CombineLatestManyStateMachine.swift b/Sources/AsyncAlgorithms/CombineLatest/CombineLatestManyStateMachine.swift new file mode 100644 index 00000000..ef9bfd04 --- /dev/null +++ b/Sources/AsyncAlgorithms/CombineLatest/CombineLatestManyStateMachine.swift @@ -0,0 +1,601 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the Swift Async Algorithms open source project +// +// Copyright (c) 2025 Apple Inc. and the Swift project authors +// Licensed under Apache License v2.0 with Runtime Library Exception +// +// See https://swift.org/LICENSE.txt for license information +// +//===----------------------------------------------------------------------===// + +#if compiler(>=6.2) + +import DequeModule + +/// State machine for combine latest +@available(AsyncAlgorithms 1.1, *) +struct CombineLatestManyStateMachine: Sendable { + typealias DownstreamContinuation = UnsafeContinuation< + Result<[Element]?, Failure>, Never + > + typealias Base = AsyncSequence & Sendable + + private enum State: Sendable { + /// Small wrapper for the state of an upstream sequence. + struct Upstream: Sendable { + /// The upstream continuation. + var continuation: UnsafeContinuation? + /// The produced upstream element. + var element: Element? + /// Indicates wether the upstream finished/threw already + var isFinished: Bool + } + + /// The initial state before a call to `next` happened. + case initial([any Base]) + + /// The state while we are waiting for downstream demand. + case waitingForDemand( + task: Task, + upstreams: ([Upstream]), + buffer: Deque<[Element]> + ) + + /// The state while we are consuming the upstream and waiting until we get a result from all upstreams. + case combining( + task: Task, + upstreams: ([Upstream]), + downstreamContinuation: DownstreamContinuation, + buffer: Deque<[Element]> + ) + + case upstreamsFinished( + buffer: Deque<[Element]> + ) + + case upstreamThrew( + error: Failure + ) + + /// The state once the downstream consumer stopped, i.e. by dropping all references + /// or by getting their `Task` cancelled. + case finished + + /// Internal state to avoid CoW. + case modifying + } + + private var state: State + + private let numberOfUpstreamSequences: Int + + /// Initializes a new `StateMachine`. + init(bases: [any Base]) { + self.state = .initial(bases) + self.numberOfUpstreamSequences = bases.count + } + + /// Actions returned by `iteratorDeinitialized()`. + enum IteratorDeinitializedAction { + /// Indicates that the `Task` needs to be cancelled and + /// the upstream continuations need to be resumed with a `CancellationFailure`. + case cancelTaskAndUpstreamContinuations( + task: Task, + upstreamContinuations: [UnsafeContinuation] + ) + } + + mutating func iteratorDeinitialized() -> IteratorDeinitializedAction? { + switch self.state { + case .initial: + // Nothing to do here. No demand was signalled until now + return .none + + case .combining: + // An iterator was deinitialized while we have a suspended continuation. + preconditionFailure( + "Internal inconsistency current state \(self.state) and received iteratorDeinitialized()" + ) + + case .waitingForDemand(let task, let upstreams, _): + // The iterator was dropped which signals that the consumer is finished. + // We can transition to finished now and need to clean everything up. + self.state = .finished + + return .cancelTaskAndUpstreamContinuations( + task: task, + upstreamContinuations: upstreams.map { $0.continuation } + .compactMap { $0 } + ) + + case .upstreamThrew, .upstreamsFinished: + // The iterator was dropped so we can transition to finished now. + self.state = .finished + + return .none + + case .finished: + // We are already finished so there is nothing left to clean up. + // This is just the references dropping afterwards. + return .none + + case .modifying: + preconditionFailure("Invalid state") + } + } + + mutating func taskIsStarted( + task: Task, + downstreamContinuation: DownstreamContinuation + ) { + switch self.state { + case .initial: + // The user called `next` and we are starting the `Task` + // to consume the upstream sequences + self.state = .combining( + task: task, + upstreams: Array(repeating: .init(isFinished: false), count: self.numberOfUpstreamSequences), + downstreamContinuation: downstreamContinuation, + buffer: .init() + ) + + case .combining, .waitingForDemand, .upstreamThrew, .upstreamsFinished, .finished: + // We only allow a single task to be created so this must never happen. + preconditionFailure("Internal inconsistency current state \(self.state) and received taskStarted()") + + case .modifying: + preconditionFailure("Invalid state") + } + } + + /// Actions returned by `childTaskSuspended()`. + enum ChildTaskSuspendedAction { + /// Indicates that the continuation should be resumed which will lead to calling `next` on the upstream. + case resumeContinuation( + upstreamContinuation: UnsafeContinuation + ) + } + + mutating func childTaskSuspended( + baseIndex: Int, + continuation: UnsafeContinuation + ) -> ChildTaskSuspendedAction? { + switch self.state { + case .initial: + // Child tasks are only created after we transitioned to `zipping` + preconditionFailure("Internal inconsistency current state \(self.state) and received childTaskSuspended()") + + case .upstreamsFinished: + preconditionFailure("Internal inconsistency current state \(self.state) and received upstreamThrew()") + + case .waitingForDemand(let task, var upstreams, let buffer): + self.state = .modifying + upstreams[baseIndex].continuation = continuation + + self.state = .waitingForDemand( + task: task, + upstreams: upstreams, + buffer: buffer + ) + + return .none + + case .combining: + // We are currently combining and need to resume any upstream until we transition to waitingForDemand + + return .resumeContinuation(upstreamContinuation: continuation) + + case .upstreamThrew, .finished: + // Since cancellation is cooperative it might be that child tasks are still getting + // suspended even though we already cancelled them. We must tolerate this and just resume + // the continuation with an error. + return .resumeContinuation( + upstreamContinuation: continuation + ) + + case .modifying: + preconditionFailure("Invalid state") + } + } + + /// Actions returned by `elementProduced()`. + enum ElementProducedAction { + /// Indicates that the downstream continuation should be resumed with the element. + case resumeContinuation( + downstreamContinuation: DownstreamContinuation, + result: Result<[Element]?, Failure> + ) + } + + mutating func elementProduced(value: Element, atBaseIndex baseIndex: Int) -> ElementProducedAction? { + switch self.state { + case .initial: + // Child tasks that are producing elements are only created after we transitioned to `zipping` + preconditionFailure("Internal inconsistency current state \(self.state) and received elementProduced()") + + case .upstreamsFinished: + preconditionFailure("Internal inconsistency current state \(self.state) and received upstreamThrew()") + + case .waitingForDemand(let task, var upstreams, var buffer): + // We got an element in late. This can happen since we race the upstreams. + // We have to store the new tuple in our buffer and remember the upstream states. + + var upstreamValues = upstreams.compactMap { $0.element } + guard upstreamValues.count == self.numberOfUpstreamSequences else { + preconditionFailure("Internal inconsistency current state \(self.state) and received elementProduced()") + } + + self.state = .modifying + + upstreamValues[baseIndex] = value + buffer.append(upstreamValues) + upstreams[baseIndex].element = value + + self.state = .waitingForDemand( + task: task, + upstreams: upstreams, + buffer: buffer + ) + + return .none + + case .combining(let task, var upstreams, let downstreamContinuation, let buffer): + precondition( + buffer.isEmpty, + "Internal inconsistency current state \(self.state) and the buffer is not empty" + ) + self.state = .modifying + upstreams[baseIndex].element = value + + let nonNilElements = upstreams.compactMap(\.element) + if nonNilElements.count == self.numberOfUpstreamSequences { + // We got an element from each upstream so we can resume the downstream now + self.state = .waitingForDemand( + task: task, + upstreams: upstreams, + buffer: buffer + ) + + return .resumeContinuation( + downstreamContinuation: downstreamContinuation, + result: .success(nonNilElements) + ) + } else { + // We are still waiting for one of the upstreams to produce an element + self.state = .combining( + task: task, + upstreams: upstreams, + downstreamContinuation: downstreamContinuation, + buffer: buffer + ) + + return .none + } + + case .upstreamThrew, .finished: + // Since cancellation is cooperative it might be that child tasks + // are still producing elements after we finished. + // We are just going to drop them since there is nothing we can do + return .none + + case .modifying: + preconditionFailure("Invalid state") + } + } + + /// Actions returned by `upstreamFinished()`. + enum UpstreamFinishedAction { + /// Indicates the task and the upstream continuations should be cancelled. + case cancelTaskAndUpstreamContinuations( + task: Task, + upstreamContinuations: [UnsafeContinuation] + ) + /// Indicates that the downstream continuation should be resumed with `nil` and + /// the task and the upstream continuations should be cancelled. + case resumeContinuationWithNilAndCancelTaskAndUpstreamContinuations( + downstreamContinuation: DownstreamContinuation, + task: Task, + upstreamContinuations: [UnsafeContinuation] + ) + } + + mutating func upstreamFinished(baseIndex: Int) -> UpstreamFinishedAction? { + switch self.state { + case .initial: + preconditionFailure("Internal inconsistency current state \(self.state) and received upstreamFinished()") + + case .upstreamsFinished: + preconditionFailure("Internal inconsistency current state \(self.state) and received upstreamFinished()") + + case .waitingForDemand(let task, var upstreams, let buffer): + // One of the upstreams finished. + + self.state = .modifying + upstreams[0].isFinished = true + + if upstreams.allSatisfy(\.isFinished) { + // All upstreams finished we can transition to either finished or upstreamsFinished now + if buffer.isEmpty { + self.state = .finished + } else { + self.state = .upstreamsFinished(buffer: buffer) + } + + return .cancelTaskAndUpstreamContinuations( + task: task, + upstreamContinuations: upstreams.map(\.continuation).compactMap { $0 } + ) + } else { + self.state = .waitingForDemand( + task: task, + upstreams: upstreams, + buffer: buffer + ) + return .none + } + + case .combining(let task, var upstreams, let downstreamContinuation, let buffer): + // One of the upstreams finished. + + self.state = .modifying + + // We need to track if an empty upstream finished. + // If that happens we can transition to finish right away. + let emptyUpstreamFinished = upstreams[baseIndex].element == nil + upstreams[baseIndex].isFinished = true + + // Implementing this for the two arities without variadic generics is a bit awkward sadly. + if emptyUpstreamFinished { + // All upstreams finished + self.state = .finished + + return .resumeContinuationWithNilAndCancelTaskAndUpstreamContinuations( + downstreamContinuation: downstreamContinuation, + task: task, + upstreamContinuations: upstreams.map(\.continuation).compactMap { $0 } + ) + + } else if upstreams.allSatisfy(\.isFinished) { + // All upstreams finished + self.state = .finished + + return .resumeContinuationWithNilAndCancelTaskAndUpstreamContinuations( + downstreamContinuation: downstreamContinuation, + task: task, + upstreamContinuations: upstreams.map(\.continuation).compactMap { $0 } + ) + + } else { + self.state = .combining( + task: task, + upstreams: upstreams, + downstreamContinuation: downstreamContinuation, + buffer: buffer + ) + return .none + } + + case .upstreamThrew, .finished: + // This is just everything finishing up, nothing to do here + return .none + + case .modifying: + preconditionFailure("Invalid state") + } + } + + /// Actions returned by `upstreamThrew()`. + enum UpstreamThrewAction { + /// Indicates the task and the upstream continuations should be cancelled. + case cancelTaskAndUpstreamContinuations( + task: Task, + upstreamContinuations: [UnsafeContinuation] + ) + /// Indicates that the downstream continuation should be resumed with the `error` and + /// the task and the upstream continuations should be cancelled. + case resumeContinuationWithFailureAndCancelTaskAndUpstreamContinuations( + downstreamContinuation: DownstreamContinuation, + error: Failure, + task: Task, + upstreamContinuations: [UnsafeContinuation] + ) + } + + mutating func upstreamThrew(_ error: Failure) -> UpstreamThrewAction? { + switch self.state { + case .initial: + preconditionFailure("Internal inconsistency current state \(self.state) and received upstreamThrew()") + + case .upstreamsFinished: + // We need to tolerate multiple upstreams failing + return .none + + case .waitingForDemand(let task, let upstreams, _): + // An upstream threw. We can cancel everything now and transition to finished. + // We just need to store the error for the next downstream demand + self.state = .upstreamThrew( + error: error + ) + + return .cancelTaskAndUpstreamContinuations( + task: task, + upstreamContinuations: upstreams.map(\.continuation).compactMap { $0 } + ) + + case .combining(let task, let upstreams, let downstreamContinuation, _): + // One of our upstreams threw. We need to transition to finished ourselves now + // and resume the downstream continuation with the error. Furthermore, we need to cancel all of + // the upstream work. + self.state = .finished + + return .resumeContinuationWithFailureAndCancelTaskAndUpstreamContinuations( + downstreamContinuation: downstreamContinuation, + error: error, + task: task, + upstreamContinuations: upstreams.map(\.continuation).compactMap { $0 } + ) + + case .upstreamThrew, .finished: + // This is just everything finishing up, nothing to do here + return .none + + case .modifying: + preconditionFailure("Invalid state") + } + } + + /// Actions returned by `cancelled()`. + enum CancelledAction { + /// Indicates that the downstream continuation needs to be resumed and + /// task and the upstream continuations should be cancelled. + case resumeDownstreamContinuationWithNilAndCancelTaskAndUpstreamContinuations( + downstreamContinuation: DownstreamContinuation, + task: Task, + upstreamContinuations: [UnsafeContinuation] + ) + /// Indicates that the task and the upstream continuations should be cancelled. + case cancelTaskAndUpstreamContinuations( + task: Task, + upstreamContinuations: [UnsafeContinuation] + ) + } + + mutating func cancelled() -> CancelledAction? { + switch self.state { + case .initial: + state = .finished + + return .none + + case .waitingForDemand(let task, let upstreams, _): + // The downstream task got cancelled so we need to cancel our upstream Task + // and resume all continuations. We can also transition to finished. + self.state = .finished + + return .cancelTaskAndUpstreamContinuations( + task: task, + upstreamContinuations: upstreams.map(\.continuation).compactMap { $0 } + ) + + case .combining(let task, let upstreams, let downstreamContinuation, _): + // The downstream Task got cancelled so we need to cancel our upstream Task + // and resume all continuations. We can also transition to finished. + self.state = .finished + + return .resumeDownstreamContinuationWithNilAndCancelTaskAndUpstreamContinuations( + downstreamContinuation: downstreamContinuation, + task: task, + upstreamContinuations: upstreams.map(\.continuation).compactMap { $0 } + ) + + case .upstreamsFinished: + // We can transition to finished now + self.state = .finished + + return .none + + case .upstreamThrew, .finished: + // We are already finished so nothing to do here: + + return .none + + case .modifying: + preconditionFailure("Invalid state") + } + } + + /// Actions returned by `next()`. + enum NextAction { + /// Indicates that a new `Task` should be created that consumes the sequence. + case startTask([any Base]) + /// Indicates that all upstream continuations should be resumed. + case resumeUpstreamContinuations( + upstreamContinuation: [UnsafeContinuation] + ) + /// Indicates that the downstream continuation should be resumed with the result. + case resumeContinuation( + downstreamContinuation: DownstreamContinuation, + result: Result<[Element]?, Failure> + ) + /// Indicates that the downstream continuation should be resumed with `nil`. + case resumeDownstreamContinuationWithNil(DownstreamContinuation) + } + + mutating func next(for continuation: DownstreamContinuation) -> NextAction { + switch self.state { + case .initial(let bases): + // This is the first time we get demand singalled so we have to start the task + // The transition to the next state is done in the taskStarted method + return .startTask(bases) + + case .combining: + // We already got demand signalled and have suspended the downstream task + // Getting a second next calls means the iterator was transferred across Tasks which is not allowed + preconditionFailure("Internal inconsistency current state \(self.state) and received next()") + + case .waitingForDemand(let task, var upstreams, var buffer): + // We got demand signalled now we have to check if there is anything buffered. + // If not we have to transition to combining and need to resume all upstream continuations now + self.state = .modifying + + guard let element = buffer.popFirst() else { + let upstreamContinuations = upstreams.map(\.continuation).compactMap { $0 } + for index in 0..=6.2) + +@available(AsyncAlgorithms 1.1, *) +final class CombineLatestManyStorage: Sendable { + typealias StateMachine = CombineLatestManyStateMachine + + private let stateMachine: ManagedCriticalState + + init(_ bases: [any StateMachine.Base]) { + self.stateMachine = .init(.init(bases: bases)) + } + + func iteratorDeinitialized() { + let action = self.stateMachine.withCriticalRegion { $0.iteratorDeinitialized() } + + switch action { + case .cancelTaskAndUpstreamContinuations( + let task, + let upstreamContinuation + ): + task.cancel() + upstreamContinuation.forEach { $0.resume() } + + case .none: + break + } + } + + func next() async throws(Failure) -> [Element]? { + let result = await withTaskCancellationHandler { + await withUnsafeContinuation { continuation in + let action: StateMachine.NextAction? = self.stateMachine.withCriticalRegion { stateMachine in + let action = stateMachine.next(for: continuation) + switch action { + case .startTask(let bases): + // first iteration, we start one child task per base to iterate over them + self.startTask( + stateMachine: &stateMachine, + bases: bases, + downstreamContinuation: continuation + ) + return nil + + case .resumeContinuation: + return action + + case .resumeUpstreamContinuations: + return action + + case .resumeDownstreamContinuationWithNil: + return action + } + } + + switch action { + case .startTask: + // We are handling the startTask in the lock already because we want to avoid + // other inputs interleaving while starting the task + fatalError("Internal inconsistency") + + case .resumeContinuation(let downstreamContinuation, let result): + downstreamContinuation.resume(returning: result) + + case .resumeUpstreamContinuations(let upstreamContinuations): + // bases can be iterated over for 1 iteration so their next value can be retrieved + upstreamContinuations.forEach { $0.resume() } + + case .resumeDownstreamContinuationWithNil(let continuation): + // the async sequence is already finished, immediately resuming + continuation.resume(returning: .success(nil)) + + case .none: + break + } + } + } onCancel: { + let action = self.stateMachine.withCriticalRegion { stateMachine in + stateMachine.cancelled() + } + + switch action { + case .resumeDownstreamContinuationWithNilAndCancelTaskAndUpstreamContinuations( + let downstreamContinuation, + let task, + let upstreamContinuations + ): + task.cancel() + upstreamContinuations.forEach { $0.resume() } + + downstreamContinuation.resume(returning: .success(nil)) + + case .cancelTaskAndUpstreamContinuations(let task, let upstreamContinuations): + task.cancel() + upstreamContinuations.forEach { $0.resume() } + + case .none: + break + } + } + return try result.get() + } + + private func startTask( + stateMachine: inout StateMachine, + bases: [any (AsyncSequence & Sendable)], + downstreamContinuation: StateMachine.DownstreamContinuation + ) { + // This creates a new `Task` that is iterating the upstream + // sequences. We must store it to cancel it at the right times. + let task = Task { + await withTaskGroup(of: Result.self) { group in + // For each upstream sequence we are adding a child task that + // is consuming the upstream sequence + for (baseIndex, base) in bases.enumerated() { + group.addTask { + var baseIterator = base.makeAsyncIterator() + + loop: while true { + // We are creating a continuation before requesting the next + // element from upstream. This continuation is only resumed + // if the downstream consumer called `next` to signal his demand. + await withUnsafeContinuation { continuation in + let action = self.stateMachine.withCriticalRegion { stateMachine in + stateMachine.childTaskSuspended(baseIndex: baseIndex, continuation: continuation) + } + + switch action { + case .resumeContinuation(let upstreamContinuation): + upstreamContinuation.resume() + + case .none: + break + } + } + + let element: Element? + do { + element = try await baseIterator.next(isolation: nil) + } catch { + return .failure(error as! Failure) // Looks like a compiler bug + } + + if let element = element { + let action = self.stateMachine.withCriticalRegion { stateMachine in + stateMachine.elementProduced(value: element, atBaseIndex: baseIndex) + } + + switch action { + case .resumeContinuation(let downstreamContinuation, let result): + downstreamContinuation.resume(returning: result) + + case .none: + break + } + } else { + let action = self.stateMachine.withCriticalRegion { stateMachine in + stateMachine.upstreamFinished(baseIndex: baseIndex) + } + + switch action { + case .resumeContinuationWithNilAndCancelTaskAndUpstreamContinuations( + let downstreamContinuation, + let task, + let upstreamContinuations + ): + + task.cancel() + upstreamContinuations.forEach { $0.resume() } + + downstreamContinuation.resume(returning: .success(nil)) + break loop + + case .cancelTaskAndUpstreamContinuations(let task, let upstreamContinuations): + task.cancel() + upstreamContinuations.forEach { $0.resume() } + + break loop + + case .none: + break loop + } + } + } + return .success(()) + } + } + + while !group.isEmpty { + let result = await group.next() + + switch result { + case .success, .none: + break + case .failure(let error): + // One of the upstream sequences threw an error + let action = self.stateMachine.withCriticalRegion { stateMachine in + stateMachine.upstreamThrew(error) + } + + switch action { + case .cancelTaskAndUpstreamContinuations(let task, let upstreamContinuations): + task.cancel() + upstreamContinuations.forEach { $0.resume() } + case .resumeContinuationWithFailureAndCancelTaskAndUpstreamContinuations( + let downstreamContinuation, + let error, + let task, + let upstreamContinuations + ): + task.cancel() + upstreamContinuations.forEach { $0.resume() } + downstreamContinuation.resume(returning: .failure(error)) + case .none: + break + } + + group.cancelAll() + } + } + } + } + + stateMachine.taskIsStarted(task: task, downstreamContinuation: downstreamContinuation) + } +} + +#endif diff --git a/Tests/AsyncAlgorithmsTests/TestCombineLatestMany.swift b/Tests/AsyncAlgorithmsTests/TestCombineLatestMany.swift new file mode 100644 index 00000000..09c7e5fc --- /dev/null +++ b/Tests/AsyncAlgorithmsTests/TestCombineLatestMany.swift @@ -0,0 +1,89 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the Swift Async Algorithms open source project +// +// Copyright (c) 2025 Apple Inc. and the Swift project authors +// Licensed under Apache License v2.0 with Runtime Library Exception +// +// See https://swift.org/LICENSE.txt for license information +// +//===----------------------------------------------------------------------===// + +#if compiler(>=6.2) + +import XCTest +import AsyncAlgorithms + +@available(AsyncAlgorithms 1.1, *) +final class TestCombineLatestMany: XCTestCase { + func test_combineLatest() async throws { + let a = [1, 2, 3].async + let b = [4, 5, 6].async + let c = [7, 8, 9].async + let sequence = combineLatestMany([a, b, c]) + let actual = await Array(sequence) + XCTAssertGreaterThanOrEqual(actual.count, 3) + } + + func test_ordering1() async { + var a = GatedSequence([1, 2, 3]) + var b = GatedSequence([4, 5, 6]) + var c = GatedSequence([7, 8, 9]) + let finished = expectation(description: "finished") + let sequence = combineLatestMany([a, b, c]) + let validator = Validator<[Int]>() + validator.test(sequence) { iterator in + let pastEnd = await iterator.next(isolation: nil) + XCTAssertNil(pastEnd) + finished.fulfill() + } + var value = await validator.validate() + XCTAssertEqual(value, []) + a.advance() + value = validator.current + XCTAssertEqual(value, []) + b.advance() + value = validator.current + XCTAssertEqual(value, []) + c.advance() + + value = await validator.validate() + XCTAssertEqual(value, [[1, 4, 7]]) + a.advance() + + value = await validator.validate() + XCTAssertEqual(value, [[1, 4, 7], [2, 4, 7]]) + b.advance() + + value = await validator.validate() + XCTAssertEqual(value, [[1, 4, 7], [2, 4, 7], [2, 5, 7]]) + c.advance() + + value = await validator.validate() + XCTAssertEqual(value, [[1, 4, 7], [2, 4, 7], [2, 5, 7], [2, 5, 8]]) + a.advance() + + value = await validator.validate() + XCTAssertEqual(value, [[1, 4, 7], [2, 4, 7], [2, 5, 7], [2, 5, 8], [3, 5, 8]]) + b.advance() + + value = await validator.validate() + XCTAssertEqual(value, [[1, 4, 7], [2, 4, 7], [2, 5, 7], [2, 5, 8], [3, 5, 8], [3, 6, 8]]) + c.advance() + + value = await validator.validate() + XCTAssertEqual( + value, + [[1, 4, 7], [2, 4, 7], [2, 5, 7], [2, 5, 8], [3, 5, 8], [3, 6, 8], [3, 6, 9]] + ) + + await fulfillment(of: [finished], timeout: 1.0) + value = validator.current + XCTAssertEqual( + value, + [[1, 4, 7], [2, 4, 7], [2, 5, 7], [2, 5, 8], [3, 5, 8], [3, 6, 8], [3, 6, 9]] + ) + } +} + +#endif