From 70c706ca62a7c2e31c093791e77ba32fb6ea048e Mon Sep 17 00:00:00 2001 From: Woollim Date: Fri, 5 Sep 2025 14:55:48 +0900 Subject: [PATCH 1/3] Add async create methods for Observable and Infallible MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Add Observable.create with async/await support - Add Infallible.create with async/await support - Both methods support detached tasks and custom priority - Include proper Task cancellation handling 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude --- RxSwift/Observable+Concurrency.swift | 46 +++++++++++++++++++ .../Infallible/Infallible+Concurrency.swift | 39 ++++++++++++++++ 2 files changed, 85 insertions(+) diff --git a/RxSwift/Observable+Concurrency.swift b/RxSwift/Observable+Concurrency.swift index af3ec72bc8..d9bcacb3fd 100644 --- a/RxSwift/Observable+Concurrency.swift +++ b/RxSwift/Observable+Concurrency.swift @@ -11,6 +11,52 @@ import Foundation @available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *) public extension ObservableConvertibleType { + + typealias ElementObserver = (Element) -> Void + + /** + Creates an `Observable` from the result of an asynchronous operation + that emits elements via a provided observer closure. + + - seealso: [create operator on reactivex.io](http://reactivex.io/documentation/operators/create.html) + + - parameter work: An `async` closure that takes an `ElementObserver` (a closure used to emit elements), + and may call it multiple times to emit values. + When the closure finishes, a `.completed` event is automatically emitted. + If the closure throws, an `.error` event will be emitted instead. + + - returns: An `Observable` sequence of the element type emitted by the `work` closure. + */ + @_disfavoredOverload + static func create( + detached: Bool = false, + priority: TaskPriority? = nil, + work: @Sendable @escaping (_ observer: ElementObserver) async throws -> Void + ) -> Observable { + .create { rawObserver in + let operation: () async throws -> Void = { + do { + let observer: ElementObserver = { element in + guard !Task.isCancelled else { return } + rawObserver.onNext(element) + } + try await work(observer) + rawObserver.onCompleted() + } catch { + rawObserver.onError(error) + } + } + + let task = if detached { + Task.detached(priority: priority, operation: operation) + } else { + Task(priority: priority, operation: operation) + } + + return Disposables.create { task.cancel() } + } + } + /// Allows iterating over the values of an Observable /// asynchronously via Swift's concurrency features (`async/await`) /// diff --git a/RxSwift/Traits/Infallible/Infallible+Concurrency.swift b/RxSwift/Traits/Infallible/Infallible+Concurrency.swift index ff54be2e12..71e665ea70 100644 --- a/RxSwift/Traits/Infallible/Infallible+Concurrency.swift +++ b/RxSwift/Traits/Infallible/Infallible+Concurrency.swift @@ -10,6 +10,45 @@ // MARK: - Infallible @available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *) public extension InfallibleType { + + /** + Creates an `Infallible` from the result of an asynchronous operation + that emits elements via a provided observer closure. + + - seealso: [create operator on reactivex.io](http://reactivex.io/documentation/operators/create.html) + + - parameter work: An `async` closure that takes an `ElementObserver` (a closure used to emit elements), + and may call it multiple times to emit values. + When the closure finishes, a `.completed` event is automatically emitted. + + - returns: An `Infallible` sequence of the element type emitted by the `work` closure. + */ + @_disfavoredOverload + static func create( + detached: Bool = false, + priority: TaskPriority? = nil, + work: @Sendable @escaping (_ observer: ElementObserver) async -> Void + ) -> Infallible { + .create { rawObserver in + let operation: () async -> Void = { + let observer: ElementObserver = { element in + guard !Task.isCancelled else { return } + rawObserver(.next(element)) + } + await work(observer) + rawObserver(.completed) + } + + let task = if detached { + Task.detached(priority: priority, operation: operation) + } else { + Task(priority: priority, operation: operation) + } + + return Disposables.create { task.cancel() } + } + } + /// Allows iterating over the values of an Infallible /// asynchronously via Swift's concurrency features (`async/await`) /// From def1fca014f1d88e32c71dd76b5011e6940ef952 Mon Sep 17 00:00:00 2001 From: Woollim Date: Fri, 5 Sep 2025 14:56:41 +0900 Subject: [PATCH 2/3] Add tests for async create methods MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Add test for Infallible.create with async support - Add test for Observable.create with async support - Tests verify proper value emission and completion 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude --- .../Infallible+ConcurrencyTests.swift | 18 ++++++++++++++++++ .../Observable+ConcurrencyTests.swift | 19 +++++++++++++++++++ 2 files changed, 37 insertions(+) diff --git a/Tests/RxSwiftTests/Infallible+ConcurrencyTests.swift b/Tests/RxSwiftTests/Infallible+ConcurrencyTests.swift index 1093cf7a70..3e0f01f808 100644 --- a/Tests/RxSwiftTests/Infallible+ConcurrencyTests.swift +++ b/Tests/RxSwiftTests/Infallible+ConcurrencyTests.swift @@ -31,5 +31,23 @@ extension InfallibleConcurrencyTests { XCTAssertEqual(values, Array(1...10)) } + + func testCreateInfalliableFromAsync() async throws { + var expectedValues = [Int]() + let randomValue: () async -> Int = { + let value = Int.random(in: 100...100000) + expectedValues.append(value) + return value + } + + let infallible = Infallible.create { observer in + for _ in 1...10 { + observer(await randomValue()) + } + } + + let values = try infallible.toBlocking().toArray() + XCTAssertEqual(values, expectedValues) + } } #endif diff --git a/Tests/RxSwiftTests/Observable+ConcurrencyTests.swift b/Tests/RxSwiftTests/Observable+ConcurrencyTests.swift index c3c64fc6be..fe9966a35b 100644 --- a/Tests/RxSwiftTests/Observable+ConcurrencyTests.swift +++ b/Tests/RxSwiftTests/Observable+ConcurrencyTests.swift @@ -19,6 +19,25 @@ class ObservableConcurrencyTests: RxTest { @available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *) extension ObservableConcurrencyTests { + + func testCreateObservableFromAsync() async throws { + var expectedValues = [Int]() + let randomValue: () async throws -> Int = { + let value = Int.random(in: 100...100000) + expectedValues.append(value) + return value + } + + let infallible = Infallible.create { observer in + for _ in 1...10 { + observer(try await randomValue()) + } + } + + let values = try infallible.toBlocking().toArray() + XCTAssertEqual(values, expectedValues) + } + func testAwaitsValuesAndFinishes() async { let observable = Observable .from(1...10) From f468fbed3ed6baf263d095759213d0f3aa6af157 Mon Sep 17 00:00:00 2001 From: Woollim Date: Fri, 5 Sep 2025 15:01:02 +0900 Subject: [PATCH 3/3] Update SwiftConcurrency documentation for async create methods MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Add documentation for Observable.create with async/await - Add documentation for Infallible.create with async/await 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude --- Documentation/SwiftConcurrency.md | 33 +++++++++++++++++++++++++++++++ 1 file changed, 33 insertions(+) diff --git a/Documentation/SwiftConcurrency.md b/Documentation/SwiftConcurrency.md index ce400e67a8..fc5ce4c8a9 100644 --- a/Documentation/SwiftConcurrency.md +++ b/Documentation/SwiftConcurrency.md @@ -64,6 +64,39 @@ stream.asObservable() ) ``` +#### Creating an `Observable` with async/await + +Use `Observable.create` with an async closure when you need to emit multiple values from asynchronous operations: + +```swift +let observable = Observable.create { observer in + // Fetch data from multiple async sources + let firstBatch = try await fetchDataFromAPI() + observer(firstBatch) + + let secondBatch = try await fetchMoreDataFromAPI() + observer(secondBatch) + + // Observable automatically completes when the async closure finishes + // If any await throws, an error event is emitted +} +``` + +#### Creating an `Infallible` with async/await + +Use `Infallible.create` for async operations that cannot fail: + +```swift +let infallible = Infallible.create { observer in + // Generate values asynchronously without throwing + for i in 1...5 { + await Task.sleep(nanoseconds: 1_000_000_000) // 1 second delay + observer(i * 10) + } + // Infallible automatically completes when the async closure finishes +} +``` + ### Wrapping an `async` result as a `Single` If you already have an async piece of work that returns a single result you wish to await, you can bridge it back to the Rx world by using `Single.create`, a special overload which takes an `async throws` closure where you can simply await your async work: