diff --git a/Documentation/SwiftConcurrency.md b/Documentation/SwiftConcurrency.md index ce400e67a..fc5ce4c8a 100644 --- a/Documentation/SwiftConcurrency.md +++ b/Documentation/SwiftConcurrency.md @@ -64,6 +64,39 @@ stream.asObservable() ) ``` +#### Creating an `Observable` with async/await + +Use `Observable.create` with an async closure when you need to emit multiple values from asynchronous operations: + +```swift +let observable = Observable.create { observer in + // Fetch data from multiple async sources + let firstBatch = try await fetchDataFromAPI() + observer(firstBatch) + + let secondBatch = try await fetchMoreDataFromAPI() + observer(secondBatch) + + // Observable automatically completes when the async closure finishes + // If any await throws, an error event is emitted +} +``` + +#### Creating an `Infallible` with async/await + +Use `Infallible.create` for async operations that cannot fail: + +```swift +let infallible = Infallible.create { observer in + // Generate values asynchronously without throwing + for i in 1...5 { + await Task.sleep(nanoseconds: 1_000_000_000) // 1 second delay + observer(i * 10) + } + // Infallible automatically completes when the async closure finishes +} +``` + ### Wrapping an `async` result as a `Single` If you already have an async piece of work that returns a single result you wish to await, you can bridge it back to the Rx world by using `Single.create`, a special overload which takes an `async throws` closure where you can simply await your async work: diff --git a/RxSwift/Observable+Concurrency.swift b/RxSwift/Observable+Concurrency.swift index af3ec72bc..d9bcacb3f 100644 --- a/RxSwift/Observable+Concurrency.swift +++ b/RxSwift/Observable+Concurrency.swift @@ -11,6 +11,52 @@ import Foundation @available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *) public extension ObservableConvertibleType { + + typealias ElementObserver = (Element) -> Void + + /** + Creates an `Observable` from the result of an asynchronous operation + that emits elements via a provided observer closure. + + - seealso: [create operator on reactivex.io](http://reactivex.io/documentation/operators/create.html) + + - parameter work: An `async` closure that takes an `ElementObserver` (a closure used to emit elements), + and may call it multiple times to emit values. + When the closure finishes, a `.completed` event is automatically emitted. + If the closure throws, an `.error` event will be emitted instead. + + - returns: An `Observable` sequence of the element type emitted by the `work` closure. + */ + @_disfavoredOverload + static func create( + detached: Bool = false, + priority: TaskPriority? = nil, + work: @Sendable @escaping (_ observer: ElementObserver) async throws -> Void + ) -> Observable { + .create { rawObserver in + let operation: () async throws -> Void = { + do { + let observer: ElementObserver = { element in + guard !Task.isCancelled else { return } + rawObserver.onNext(element) + } + try await work(observer) + rawObserver.onCompleted() + } catch { + rawObserver.onError(error) + } + } + + let task = if detached { + Task.detached(priority: priority, operation: operation) + } else { + Task(priority: priority, operation: operation) + } + + return Disposables.create { task.cancel() } + } + } + /// Allows iterating over the values of an Observable /// asynchronously via Swift's concurrency features (`async/await`) /// diff --git a/RxSwift/Traits/Infallible/Infallible+Concurrency.swift b/RxSwift/Traits/Infallible/Infallible+Concurrency.swift index ff54be2e1..71e665ea7 100644 --- a/RxSwift/Traits/Infallible/Infallible+Concurrency.swift +++ b/RxSwift/Traits/Infallible/Infallible+Concurrency.swift @@ -10,6 +10,45 @@ // MARK: - Infallible @available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *) public extension InfallibleType { + + /** + Creates an `Infallible` from the result of an asynchronous operation + that emits elements via a provided observer closure. + + - seealso: [create operator on reactivex.io](http://reactivex.io/documentation/operators/create.html) + + - parameter work: An `async` closure that takes an `ElementObserver` (a closure used to emit elements), + and may call it multiple times to emit values. + When the closure finishes, a `.completed` event is automatically emitted. + + - returns: An `Infallible` sequence of the element type emitted by the `work` closure. + */ + @_disfavoredOverload + static func create( + detached: Bool = false, + priority: TaskPriority? = nil, + work: @Sendable @escaping (_ observer: ElementObserver) async -> Void + ) -> Infallible { + .create { rawObserver in + let operation: () async -> Void = { + let observer: ElementObserver = { element in + guard !Task.isCancelled else { return } + rawObserver(.next(element)) + } + await work(observer) + rawObserver(.completed) + } + + let task = if detached { + Task.detached(priority: priority, operation: operation) + } else { + Task(priority: priority, operation: operation) + } + + return Disposables.create { task.cancel() } + } + } + /// Allows iterating over the values of an Infallible /// asynchronously via Swift's concurrency features (`async/await`) /// diff --git a/Tests/RxSwiftTests/Infallible+ConcurrencyTests.swift b/Tests/RxSwiftTests/Infallible+ConcurrencyTests.swift index 1093cf7a7..3e0f01f80 100644 --- a/Tests/RxSwiftTests/Infallible+ConcurrencyTests.swift +++ b/Tests/RxSwiftTests/Infallible+ConcurrencyTests.swift @@ -31,5 +31,23 @@ extension InfallibleConcurrencyTests { XCTAssertEqual(values, Array(1...10)) } + + func testCreateInfalliableFromAsync() async throws { + var expectedValues = [Int]() + let randomValue: () async -> Int = { + let value = Int.random(in: 100...100000) + expectedValues.append(value) + return value + } + + let infallible = Infallible.create { observer in + for _ in 1...10 { + observer(await randomValue()) + } + } + + let values = try infallible.toBlocking().toArray() + XCTAssertEqual(values, expectedValues) + } } #endif diff --git a/Tests/RxSwiftTests/Observable+ConcurrencyTests.swift b/Tests/RxSwiftTests/Observable+ConcurrencyTests.swift index c3c64fc6b..fe9966a35 100644 --- a/Tests/RxSwiftTests/Observable+ConcurrencyTests.swift +++ b/Tests/RxSwiftTests/Observable+ConcurrencyTests.swift @@ -19,6 +19,25 @@ class ObservableConcurrencyTests: RxTest { @available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *) extension ObservableConcurrencyTests { + + func testCreateObservableFromAsync() async throws { + var expectedValues = [Int]() + let randomValue: () async throws -> Int = { + let value = Int.random(in: 100...100000) + expectedValues.append(value) + return value + } + + let infallible = Infallible.create { observer in + for _ in 1...10 { + observer(try await randomValue()) + } + } + + let values = try infallible.toBlocking().toArray() + XCTAssertEqual(values, expectedValues) + } + func testAwaitsValuesAndFinishes() async { let observable = Observable .from(1...10)