From 7e7a63e6773cb17cd03d60ac5d1587d500edff8d Mon Sep 17 00:00:00 2001 From: Jinwoo Kim Date: Thu, 31 Mar 2022 01:23:06 +0900 Subject: [PATCH 1/6] [Concurrency] async closure to Traits --- Rx.xcodeproj/project.pbxproj | 24 +++++ .../Traits/Driver/Driver+Concurrency.swift | 29 ++++++ .../Traits/Signal/Signal+Concurrency.swift | 29 ++++++ .../Infallible/Infallible+Concurrency.swift | 15 ++++ .../PrimitiveSequence+Concurrency.swift | 59 ++++++++++++ .../Driver+ConcurrencyTests.swift | 39 ++++++++ .../Signal+ConcurrencyTests.swift | 39 ++++++++ .../Infallible+ConcurrencyTests.swift | 15 ++++ .../PrimitiveSequence+ConcurrencyTests.swift | 90 +++++++++++++++++++ 9 files changed, 339 insertions(+) create mode 100644 RxCocoa/Traits/Driver/Driver+Concurrency.swift create mode 100644 RxCocoa/Traits/Signal/Signal+Concurrency.swift create mode 100644 Tests/RxCocoaTests/Driver+ConcurrencyTests.swift create mode 100644 Tests/RxCocoaTests/Signal+ConcurrencyTests.swift diff --git a/Rx.xcodeproj/project.pbxproj b/Rx.xcodeproj/project.pbxproj index 8438f08d3e..97e0359d4c 100644 --- a/Rx.xcodeproj/project.pbxproj +++ b/Rx.xcodeproj/project.pbxproj @@ -37,6 +37,14 @@ 4C8DE0E220D54545003E2D8A /* DisposeBagTest.swift in Sources */ = {isa = PBXBuildFile; fileRef = 4C8DE0E120D54545003E2D8A /* DisposeBagTest.swift */; }; 4C8DE0E320D54545003E2D8A /* DisposeBagTest.swift in Sources */ = {isa = PBXBuildFile; fileRef = 4C8DE0E120D54545003E2D8A /* DisposeBagTest.swift */; }; 4C8DE0E420D54545003E2D8A /* DisposeBagTest.swift in Sources */ = {isa = PBXBuildFile; fileRef = 4C8DE0E120D54545003E2D8A /* DisposeBagTest.swift */; }; + 4F4124C227F4A36B00ADF55A /* Driver+Concurrency.swift in Sources */ = {isa = PBXBuildFile; fileRef = 4F4124C127F4A36B00ADF55A /* Driver+Concurrency.swift */; }; + 4F4124C427F4A54200ADF55A /* Signal+Concurrency.swift in Sources */ = {isa = PBXBuildFile; fileRef = 4F4124C327F4A54200ADF55A /* Signal+Concurrency.swift */; }; + 4F4124C727F4B85500ADF55A /* Driver+ConcurrencyTests.swift in Sources */ = {isa = PBXBuildFile; fileRef = 4F4124C527F4B83500ADF55A /* Driver+ConcurrencyTests.swift */; }; + 4F4124C827F4B85500ADF55A /* Driver+ConcurrencyTests.swift in Sources */ = {isa = PBXBuildFile; fileRef = 4F4124C527F4B83500ADF55A /* Driver+ConcurrencyTests.swift */; }; + 4F4124C927F4B85600ADF55A /* Driver+ConcurrencyTests.swift in Sources */ = {isa = PBXBuildFile; fileRef = 4F4124C527F4B83500ADF55A /* Driver+ConcurrencyTests.swift */; }; + 4F4124CC27F4BA2E00ADF55A /* Signal+ConcurrencyTests.swift in Sources */ = {isa = PBXBuildFile; fileRef = 4F4124CA27F4BA1B00ADF55A /* Signal+ConcurrencyTests.swift */; }; + 4F4124CD27F4BA2F00ADF55A /* Signal+ConcurrencyTests.swift in Sources */ = {isa = PBXBuildFile; fileRef = 4F4124CA27F4BA1B00ADF55A /* Signal+ConcurrencyTests.swift */; }; + 4F4124CE27F4BA2F00ADF55A /* Signal+ConcurrencyTests.swift in Sources */ = {isa = PBXBuildFile; fileRef = 4F4124CA27F4BA1B00ADF55A /* Signal+ConcurrencyTests.swift */; }; 504540C924196D960098665F /* WKWebView+Rx.swift in Sources */ = {isa = PBXBuildFile; fileRef = 504540C824196D960098665F /* WKWebView+Rx.swift */; }; 504540CB24196EB10098665F /* WKWebView+RxTests.swift in Sources */ = {isa = PBXBuildFile; fileRef = 504540CA24196EB10098665F /* WKWebView+RxTests.swift */; }; 504540CC24196EB10098665F /* WKWebView+RxTests.swift in Sources */ = {isa = PBXBuildFile; fileRef = 504540CA24196EB10098665F /* WKWebView+RxTests.swift */; }; @@ -968,6 +976,10 @@ 4C5213A9225D41E60079FC77 /* CompactMap.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = CompactMap.swift; sourceTree = ""; }; 4C5213AB225E20350079FC77 /* Observable+CompactMapTests.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = "Observable+CompactMapTests.swift"; sourceTree = ""; }; 4C8DE0E120D54545003E2D8A /* DisposeBagTest.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = DisposeBagTest.swift; sourceTree = ""; }; + 4F4124C127F4A36B00ADF55A /* Driver+Concurrency.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = "Driver+Concurrency.swift"; sourceTree = ""; }; + 4F4124C327F4A54200ADF55A /* Signal+Concurrency.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = "Signal+Concurrency.swift"; sourceTree = ""; }; + 4F4124C527F4B83500ADF55A /* Driver+ConcurrencyTests.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = "Driver+ConcurrencyTests.swift"; sourceTree = ""; }; + 4F4124CA27F4BA1B00ADF55A /* Signal+ConcurrencyTests.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = "Signal+ConcurrencyTests.swift"; sourceTree = ""; }; 504540C824196D960098665F /* WKWebView+Rx.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = "WKWebView+Rx.swift"; sourceTree = ""; }; 504540CA24196EB10098665F /* WKWebView+RxTests.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = "WKWebView+RxTests.swift"; sourceTree = ""; }; 504540CD2419701D0098665F /* RxWKNavigationDelegateProxy.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = RxWKNavigationDelegateProxy.swift; sourceTree = ""; }; @@ -1920,9 +1932,11 @@ C8D970DF1F532FD20058F2FE /* TestImplementations */, C8561B651DFE1169005E97F1 /* ExampleTests.swift */, C8D970DC1F532FD10058F2FE /* Signal+Test.swift */, + 4F4124CA27F4BA1B00ADF55A /* Signal+ConcurrencyTests.swift */, C8D970DD1F532FD10058F2FE /* SharedSequence+Test.swift */, C8D970E11F532FD20058F2FE /* SharedSequence+Extensions.swift */, C8D970DE1F532FD20058F2FE /* Driver+Test.swift */, + 4F4124C527F4B83500ADF55A /* Driver+ConcurrencyTests.swift */, C8D970E21F532FD30058F2FE /* SharedSequence+OperatorTest.swift */, DB08833826FB07CB005805BE /* SharedSequence+ConcurrencyTests.swift */, C8091C521FAA3588001DB32A /* ObservableConvertibleType+SharedSequence.swift */, @@ -2172,6 +2186,7 @@ C8091C561FAA39C1001DB32A /* ControlEvent+Signal.swift */, C8B0F7211F53135100548EBE /* ObservableConvertibleType+Signal.swift */, C8D970CD1F5324D90058F2FE /* Signal+Subscription.swift */, + 4F4124C327F4A54200ADF55A /* Signal+Concurrency.swift */, A2897D65225D0182004EA481 /* PublishRelay+Signal.swift */, ); path = Signal; @@ -2335,6 +2350,7 @@ C89AB1AE1DAAC3350065FBE6 /* ControlEvent+Driver.swift */, C89AB1AF1DAAC3350065FBE6 /* ControlProperty+Driver.swift */, C89AB1B01DAAC3350065FBE6 /* Driver+Subscription.swift */, + 4F4124C127F4A36B00ADF55A /* Driver+Concurrency.swift */, C89AB1B11DAAC3350065FBE6 /* Driver.swift */, CD8F7AC427BA9187001574EB /* Infallible+Driver.swift */, C89AB1B21DAAC3350065FBE6 /* ObservableConvertibleType+Driver.swift */, @@ -3023,6 +3039,7 @@ C88254171B8A752B00B02D69 /* RxTableViewReactiveArrayDataSource.swift in Sources */, C8C8BCD41F89459300501D4D /* BehaviorRelay+Driver.swift in Sources */, C882541E1B8A752B00B02D69 /* RxCollectionViewDataSourceProxy.swift in Sources */, + 4F4124C427F4A54200ADF55A /* Signal+Concurrency.swift in Sources */, C85E6FBE1F53025700C5681E /* SchedulerType+SharedSequence.swift in Sources */, 84C225A31C33F00B008724EC /* RxTextStorageDelegateProxy.swift in Sources */, C89AB1DA1DAAC3350065FBE6 /* Driver.swift in Sources */, @@ -3086,6 +3103,7 @@ D9080ACF1EA05AE0002B433B /* RxNavigationControllerDelegateProxy.swift in Sources */, C88254271B8A752B00B02D69 /* UIBarButtonItem+Rx.swift in Sources */, C89AB2161DAAC3350065FBE6 /* NSObject+Rx+KVORepresentable.swift in Sources */, + 4F4124C227F4A36B00ADF55A /* Driver+Concurrency.swift in Sources */, C882542B1B8A752B00B02D69 /* UIDatePicker+Rx.swift in Sources */, C88254221B8A752B00B02D69 /* RxTableViewDataSourceProxy.swift in Sources */, C882542C1B8A752B00B02D69 /* UIGestureRecognizer+Rx.swift in Sources */, @@ -3214,8 +3232,10 @@ C820A97E1EB4FA5A00D431BC /* Observable+RepeatTests.swift in Sources */, C820A94A1EB4E75E00D431BC /* Observable+AmbTests.swift in Sources */, 1AF67DA21CED420A00C310FA /* PublishSubjectTest.swift in Sources */, + 4F4124C727F4B85500ADF55A /* Driver+ConcurrencyTests.swift in Sources */, C820A9C61EB50A4200D431BC /* Observable+SkipWhileTests.swift in Sources */, C835093E1C38706E0027C24C /* UIView+RxTests.swift in Sources */, + 4F4124CC27F4BA2E00ADF55A /* Signal+ConcurrencyTests.swift in Sources */, 7EDBAEB41C89B1A6006CBE67 /* UITabBarItem+RxTests.swift in Sources */, C83509411C38706E0027C24C /* BackgroundThreadPrimitiveHotObservable.swift in Sources */, C8379EF41D1DD326003EF8FC /* UIButton+RxTests.swift in Sources */, @@ -3330,6 +3350,7 @@ C83509EE1C3875580027C24C /* Observable.Extensions.swift in Sources */, C83509BD1C38750D0027C24C /* ControlPropertyTests.swift in Sources */, 4C5213AF225E22500079FC77 /* Observable+CompactMapTests.swift in Sources */, + 4F4124CD27F4BA2F00ADF55A /* Signal+ConcurrencyTests.swift in Sources */, C83509E11C3875500027C24C /* TestVirtualScheduler.swift in Sources */, C820A94F1EB4EC3C00D431BC /* Observable+ReduceTests.swift in Sources */, C8B2908A1C94D64700E923D0 /* RxTest+Controls.swift in Sources */, @@ -3355,6 +3376,7 @@ C820A94B1EB4E75E00D431BC /* Observable+AmbTests.swift in Sources */, C834F6C31DB394E100C29244 /* Observable+BlockingTest.swift in Sources */, C83509D41C38753C0027C24C /* RxObjCRuntimeState.swift in Sources */, + 4F4124C827F4B85500ADF55A /* Driver+ConcurrencyTests.swift in Sources */, C822BACF1DB424EC00F98810 /* Reactive+Tests.swift in Sources */, C8C4F17E1DE9DF0200003FA7 /* UILabel+RxTests.swift in Sources */, C83509C01C3875220027C24C /* DelegateProxyTest.swift in Sources */, @@ -3545,6 +3567,7 @@ C8845ADC1EDB607800B36836 /* Observable+ShareReplayScopeTests.swift in Sources */, C834F6C61DB3950600C29244 /* NSControl+RxTests.swift in Sources */, C83509D61C3875420027C24C /* SentMessageTest.swift in Sources */, + 4F4124C927F4B85600ADF55A /* Driver+ConcurrencyTests.swift in Sources */, C81A097F1E6C27A100900B3B /* Observable+ZipTests.swift in Sources */, C820AA0C1EB513C800D431BC /* Observable+WindowTests.swift in Sources */, C8350A021C38755E0027C24C /* BagTest.swift in Sources */, @@ -3553,6 +3576,7 @@ C820A9FC1EB510D500D431BC /* Observable+MaterializeTests.swift in Sources */, C83509E81C3875580027C24C /* PrimitiveMockObserver.swift in Sources */, C83509BE1C3875100027C24C /* DelegateProxyTest+Cocoa.swift in Sources */, + 4F4124CE27F4BA2F00ADF55A /* Signal+ConcurrencyTests.swift in Sources */, C820A9F41EB5109300D431BC /* Observable+DefaultIfEmpty.swift in Sources */, C820AA041EB5134000D431BC /* Observable+DelaySubscriptionTests.swift in Sources */, C8E3906A1F379386004FC993 /* Observable+EnumeratedTests.swift in Sources */, diff --git a/RxCocoa/Traits/Driver/Driver+Concurrency.swift b/RxCocoa/Traits/Driver/Driver+Concurrency.swift new file mode 100644 index 0000000000..06dd494880 --- /dev/null +++ b/RxCocoa/Traits/Driver/Driver+Concurrency.swift @@ -0,0 +1,29 @@ +// +// Driver+Concurrency.swift +// RxCocoa +// +// Created by Jinwoo Kim on 3/30/22. +// Copyright © 2022 Krunoslav Zaher. All rights reserved. +// + +import RxSwift + +#if swift(>=5.5.2) && canImport(_Concurrency) && !os(Linux) +@available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *) +public func asDriver(_ fn: @escaping () async throws -> Element, onErrorJustReturn: Element) -> Driver { + return asSingle(fn) + .asDriver(onErrorJustReturn: onErrorJustReturn) +} + +@available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *) +public func asDriver(_ fn: @escaping () async throws -> Element, onErrorDriveWith: Driver) -> Driver { + return asSingle(fn) + .asDriver(onErrorDriveWith: onErrorDriveWith) +} + +@available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *) +public func asDriver(_ fn: @escaping () async throws -> Element, onErrorRecover: @escaping (Error) -> Driver) ->Driver { + return asSingle(fn) + .asDriver(onErrorRecover: onErrorRecover) +} +#endif diff --git a/RxCocoa/Traits/Signal/Signal+Concurrency.swift b/RxCocoa/Traits/Signal/Signal+Concurrency.swift new file mode 100644 index 0000000000..35079f0bb2 --- /dev/null +++ b/RxCocoa/Traits/Signal/Signal+Concurrency.swift @@ -0,0 +1,29 @@ +// +// Signal+Concurrency.swift +// RxCocoa +// +// Created by Jinwoo Kim on 3/30/22. +// Copyright © 2022 Krunoslav Zaher. All rights reserved. +// + +import RxSwift + +#if swift(>=5.5.2) && canImport(_Concurrency) && !os(Linux) +@available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *) +public func asSignal(_ fn: @escaping () async throws -> Element, onErrorJustReturn: Element) -> Signal { + return asSingle(fn) + .asSignal(onErrorJustReturn: onErrorJustReturn) +} + +@available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *) +public func asSignal(_ fn: @escaping () async throws -> Element, onErrorSignalWith: Signal) -> Signal { + return asSingle(fn) + .asSignal(onErrorSignalWith: onErrorSignalWith) +} + +@available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *) +public func asSignal(_ fn: @escaping () async throws -> Element, onErrorRecover: @escaping (_ error: Swift.Error) -> Signal) -> Signal { + return asSingle(fn) + .asSignal(onErrorRecover: onErrorRecover) +} +#endif diff --git a/RxSwift/Traits/Infallible/Infallible+Concurrency.swift b/RxSwift/Traits/Infallible/Infallible+Concurrency.swift index 63747d556e..c02c773e8b 100644 --- a/RxSwift/Traits/Infallible/Infallible+Concurrency.swift +++ b/RxSwift/Traits/Infallible/Infallible+Concurrency.swift @@ -34,4 +34,19 @@ public extension InfallibleType { } } } + +@available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *) +public func asInfailable(_ fn: @escaping () async -> Element) -> Infallible { + return .create { observer in + let task = Task { + let element = await fn() + observer(.next(element)) + observer(.completed) + } + + return Disposables.create { + task.cancel() + } + } +} #endif diff --git a/RxSwift/Traits/PrimitiveSequence/PrimitiveSequence+Concurrency.swift b/RxSwift/Traits/PrimitiveSequence/PrimitiveSequence+Concurrency.swift index b5db5fc738..ac97f1e5cc 100644 --- a/RxSwift/Traits/PrimitiveSequence/PrimitiveSequence+Concurrency.swift +++ b/RxSwift/Traits/PrimitiveSequence/PrimitiveSequence+Concurrency.swift @@ -129,4 +129,63 @@ public extension PrimitiveSequenceType where Trait == CompletableTrait, Element } } } + +@available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *) +public func asSingle(_ fn: @escaping () async throws -> Element) -> Single { + return .create { observer in + let task = Task { + do { + let element = try await fn() + observer(.success(element)) + } catch { + observer(.failure(error)) + } + } + + return Disposables.create { + task.cancel() + } + } +} + +@available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *) +public func asMaybe(_ fn: (() async throws -> Element)?) -> Maybe { + return .create { observer in + let task = Task { + do { + guard let fn = fn else { + observer(.completed) + return + } + + let element = try await fn() + observer(.success(element)) + } catch { + observer(.error(error)) + } + } + + return Disposables.create { + task.cancel() + } + } +} + +@available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *) +public func asCompletable(_ fn: @escaping () async throws -> ()) -> Completable { + return .create { observer in + let task = Task { + do { + try await fn() + observer(.completed) + } catch { + observer(.error(error)) + } + } + + return Disposables.create { + task.cancel() + } + } +} #endif diff --git a/Tests/RxCocoaTests/Driver+ConcurrencyTests.swift b/Tests/RxCocoaTests/Driver+ConcurrencyTests.swift new file mode 100644 index 0000000000..c7f67be0e8 --- /dev/null +++ b/Tests/RxCocoaTests/Driver+ConcurrencyTests.swift @@ -0,0 +1,39 @@ +// +// Driver+ConcurrencyTests.swift +// RxCocoa +// +// Created by Jinwoo Kim on 3/31/22. +// Copyright © 2022 Krunoslav Zaher. All rights reserved. +// + +#if swift(>=5.5.2) && canImport(_Concurrency) && !os(Linux) +import Dispatch +import RxSwift +import RxCocoa +import XCTest +import RxTest + +@available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *) +class DriverConcurrencyTests: RxTest { + let scheduler = TestScheduler(initialClock: 0) +} + +@available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *) +extension DriverConcurrencyTests { + @MainActor func testAwaitsValuesAndFinishes() async { + let driver = asDriver({ + "Hello" + }, onErrorJustReturn: nil) + + var didLoop = false + + for await value in driver.values { + XCTAssertEqual(value, "Hello") + didLoop = true + } + + XCTAssertTrue(didLoop) + } +} + +#endif diff --git a/Tests/RxCocoaTests/Signal+ConcurrencyTests.swift b/Tests/RxCocoaTests/Signal+ConcurrencyTests.swift new file mode 100644 index 0000000000..0cdb9c3525 --- /dev/null +++ b/Tests/RxCocoaTests/Signal+ConcurrencyTests.swift @@ -0,0 +1,39 @@ +// +// Signal+ConcurrencyTests.swift +// RxCocoa +// +// Created by Jinwoo Kim on 3/31/22. +// Copyright © 2022 Krunoslav Zaher. All rights reserved. +// + +#if swift(>=5.5.2) && canImport(_Concurrency) && !os(Linux) +import Dispatch +import RxSwift +import RxCocoa +import XCTest +import RxTest + +@available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *) +class SignalConcurrencyTests: RxTest { + let scheduler = TestScheduler(initialClock: 0) +} + +@available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *) +extension SignalConcurrencyTests { + @MainActor func testAwaitsValuesAndFinishes() async { + let signal = asSignal({ + "Hello" + }, onErrorJustReturn: nil) + + var didLoop = false + + for await value in signal.values { + XCTAssertEqual(value, "Hello") + didLoop = true + } + + XCTAssertTrue(didLoop) + } +} + +#endif diff --git a/Tests/RxSwiftTests/Infallible+ConcurrencyTests.swift b/Tests/RxSwiftTests/Infallible+ConcurrencyTests.swift index a0852443f7..e36e214cad 100644 --- a/Tests/RxSwiftTests/Infallible+ConcurrencyTests.swift +++ b/Tests/RxSwiftTests/Infallible+ConcurrencyTests.swift @@ -35,5 +35,20 @@ extension InfallibleConcurrencyTests { } } } + + func testAsInfailableEmitsElement() async throws { + let infailable = asInfailable { + return "Hello" + } + + var didLoop = false + + for try await value in infailable.values { + XCTAssertEqual(value, "Hello") + didLoop = true + } + + XCTAssertTrue(didLoop) + } } #endif diff --git a/Tests/RxSwiftTests/PrimitiveSequence+ConcurrencyTests.swift b/Tests/RxSwiftTests/PrimitiveSequence+ConcurrencyTests.swift index e7fdfafcc7..402cc1b630 100644 --- a/Tests/RxSwiftTests/PrimitiveSequence+ConcurrencyTests.swift +++ b/Tests/RxSwiftTests/PrimitiveSequence+ConcurrencyTests.swift @@ -41,6 +41,32 @@ extension PrimitiveSequenceConcurrencyTests { XCTAssertTrue(true) } } + + func testAsSingleEmitsElement() async throws { + let single = asSingle { + "Hello" + } + + do { + let value = try await single.value + XCTAssertEqual(value, "Hello") + } catch { + XCTFail("Should not throw an error") + } + } + + func testAsSingleThrowsError() async throws { + let single = asSingle { + throw RxError.unknown + } + + do { + _ = try await single.value + XCTFail("Should not proceed beyond try") + } catch { + XCTAssertTrue(true) + } + } } // MARK: - Maybe @@ -79,6 +105,44 @@ extension PrimitiveSequenceConcurrencyTests { XCTAssertTrue(true) } } + + func testAsMaybeEmitsElement() async throws { + let maybe = asMaybe { + "Hello" + } + + do { + let value = try await maybe.value + XCTAssertNotNil(value) + XCTAssertEqual(value, "Hello") + } catch { + XCTFail("Should not throw an error") + } + } + + func testsAsMaybeEmitsWithoutValue() async throws { + let maybe: Maybe = asMaybe(nil) + + do { + let value = try await maybe.value + XCTAssertNil(value) + } catch { + XCTFail("Should not throw an error") + } + } + + func testAsMaybeThrowsError() async throws { + let maybe = asMaybe { + throw RxError.unknown + } + + do { + _ = try await maybe.value + XCTFail("Should not proceed beyond try") + } catch { + XCTAssertTrue(true) + } + } } // MARK: - Completable @@ -105,6 +169,32 @@ extension PrimitiveSequenceConcurrencyTests { XCTAssertTrue(true) } } + + func testAsCompletableEmitsVoidOnCompletion() async throws { + let completable = asCompletable { + + } + + do { + let value: Void = try await completable.value + XCTAssert(value == ()) + } catch { + XCTFail("Should not throw an error") + } + } + + func testAsCompletableThrowsError() async throws { + let completable = asCompletable { + throw RxError.unknown + } + + do { + _ = try await completable.value + XCTFail("Should not proceed beyond try") + } catch { + XCTAssertTrue(true) + } + } } #endif From fa53bf6b0906509130a3e9fb2892c8c5aff6034f Mon Sep 17 00:00:00 2001 From: Jinwoo Kim Date: Thu, 31 Mar 2022 01:38:05 +0900 Subject: [PATCH 2/6] [Concurrency] Added descriptions --- .../Traits/Driver/Driver+Concurrency.swift | 33 +++++++++++++++---- .../Traits/Signal/Signal+Concurrency.swift | 33 +++++++++++++++---- .../Infallible/Infallible+Concurrency.swift | 10 ++++-- .../PrimitiveSequence+Concurrency.swift | 30 +++++++++++++---- 4 files changed, 86 insertions(+), 20 deletions(-) diff --git a/RxCocoa/Traits/Driver/Driver+Concurrency.swift b/RxCocoa/Traits/Driver/Driver+Concurrency.swift index 06dd494880..21ebe34fe8 100644 --- a/RxCocoa/Traits/Driver/Driver+Concurrency.swift +++ b/RxCocoa/Traits/Driver/Driver+Concurrency.swift @@ -9,21 +9,42 @@ import RxSwift #if swift(>=5.5.2) && canImport(_Concurrency) && !os(Linux) +/** + Allows converting asynchronous block to `Driver` trait. + + - parameter block: An asynchronous block + - parameter parameter onErrorJustReturn: Element to return in case of error and after that complete the sequence. + - returns: An Driver emits value from `block` parameter. + */ @available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *) -public func asDriver(_ fn: @escaping () async throws -> Element, onErrorJustReturn: Element) -> Driver { - return asSingle(fn) +public func asDriver(_ block: @escaping () async throws -> Element, onErrorJustReturn: Element) -> Driver { + return asSingle(block) .asDriver(onErrorJustReturn: onErrorJustReturn) } +/** + Allows converting asynchronous block to `Driver` trait. + + - parameter block: An asynchronous block + - parameter onErrorJustReturn: Element to return in case of error and after that complete the sequence. + - returns: An Driver emits value from `block` parameter. + */ @available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *) -public func asDriver(_ fn: @escaping () async throws -> Element, onErrorDriveWith: Driver) -> Driver { - return asSingle(fn) +public func asDriver(_ block: @escaping () async throws -> Element, onErrorDriveWith: Driver) -> Driver { + return asSingle(block) .asDriver(onErrorDriveWith: onErrorDriveWith) } +/** + Allows converting asynchronous block to `Driver` trait. + + - parameter block: An asynchronous block + - parameter onErrorRecover: Calculates driver that continues to drive the sequence in case of error. + - returns: An Driver emits value from `block` parameter. + */ @available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *) -public func asDriver(_ fn: @escaping () async throws -> Element, onErrorRecover: @escaping (Error) -> Driver) ->Driver { - return asSingle(fn) +public func asDriver(_ block: @escaping () async throws -> Element, onErrorRecover: @escaping (Error) -> Driver) ->Driver { + return asSingle(block) .asDriver(onErrorRecover: onErrorRecover) } #endif diff --git a/RxCocoa/Traits/Signal/Signal+Concurrency.swift b/RxCocoa/Traits/Signal/Signal+Concurrency.swift index 35079f0bb2..54ce0a1278 100644 --- a/RxCocoa/Traits/Signal/Signal+Concurrency.swift +++ b/RxCocoa/Traits/Signal/Signal+Concurrency.swift @@ -9,21 +9,42 @@ import RxSwift #if swift(>=5.5.2) && canImport(_Concurrency) && !os(Linux) +/** + Allows converting asynchronous block to `Signal` trait. + + - parameter block: An asynchronous block + - parameter onErrorJustReturn: Element to return in case of error and after that complete the sequence. + - returns: An Signal emits value from `block` parameter. + */ @available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *) -public func asSignal(_ fn: @escaping () async throws -> Element, onErrorJustReturn: Element) -> Signal { - return asSingle(fn) +public func asSignal(_ block: @escaping () async throws -> Element, onErrorJustReturn: Element) -> Signal { + return asSingle(block) .asSignal(onErrorJustReturn: onErrorJustReturn) } +/** + Allows converting asynchronous block to `Signal` trait. + + - parameter block: An asynchronous block + - parameter onErrorSignalWith: Signal that continues to emit the sequence in case of error. + - returns: An Signal emits value from `block` parameter. + */ @available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *) -public func asSignal(_ fn: @escaping () async throws -> Element, onErrorSignalWith: Signal) -> Signal { - return asSingle(fn) +public func asSignal(_ block: @escaping () async throws -> Element, onErrorSignalWith: Signal) -> Signal { + return asSingle(block) .asSignal(onErrorSignalWith: onErrorSignalWith) } +/** + Allows converting asynchronous block to `Signal` trait. + + - parameter block: An asynchronous block + - parameter onErrorRecover: Calculates signal that continues to emit the sequence in case of error. + - returns: An Signal emits value from `block` parameter. + */ @available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *) -public func asSignal(_ fn: @escaping () async throws -> Element, onErrorRecover: @escaping (_ error: Swift.Error) -> Signal) -> Signal { - return asSingle(fn) +public func asSignal(_ block: @escaping () async throws -> Element, onErrorRecover: @escaping (_ error: Swift.Error) -> Signal) -> Signal { + return asSingle(block) .asSignal(onErrorRecover: onErrorRecover) } #endif diff --git a/RxSwift/Traits/Infallible/Infallible+Concurrency.swift b/RxSwift/Traits/Infallible/Infallible+Concurrency.swift index c02c773e8b..8abdbd1420 100644 --- a/RxSwift/Traits/Infallible/Infallible+Concurrency.swift +++ b/RxSwift/Traits/Infallible/Infallible+Concurrency.swift @@ -35,11 +35,17 @@ public extension InfallibleType { } } +/** + Allows converting asynchronous block to `Infailable` trait. + + - parameter block: An asynchronous block + - returns: An Infailable emits value from `block` parameter. + */ @available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *) -public func asInfailable(_ fn: @escaping () async -> Element) -> Infallible { +public func asInfailable(_ block: @escaping () async -> Element) -> Infallible { return .create { observer in let task = Task { - let element = await fn() + let element = await block() observer(.next(element)) observer(.completed) } diff --git a/RxSwift/Traits/PrimitiveSequence/PrimitiveSequence+Concurrency.swift b/RxSwift/Traits/PrimitiveSequence/PrimitiveSequence+Concurrency.swift index ac97f1e5cc..be07ddf060 100644 --- a/RxSwift/Traits/PrimitiveSequence/PrimitiveSequence+Concurrency.swift +++ b/RxSwift/Traits/PrimitiveSequence/PrimitiveSequence+Concurrency.swift @@ -130,12 +130,18 @@ public extension PrimitiveSequenceType where Trait == CompletableTrait, Element } } +/** + Allows converting asynchronous block to `Single` trait. + + - parameter block: An asynchronous block + - returns: An Single emits value from `block` parameter. + */ @available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *) -public func asSingle(_ fn: @escaping () async throws -> Element) -> Single { +public func asSingle(_ block: @escaping () async throws -> Element) -> Single { return .create { observer in let task = Task { do { - let element = try await fn() + let element = try await block() observer(.success(element)) } catch { observer(.failure(error)) @@ -148,12 +154,18 @@ public func asSingle(_ fn: @escaping () async throws -> Element) -> Sin } } +/** + Allows converting asynchronous block to `Maybe` trait. + + - parameter block: An asynchronous block + - returns: An Maybe emits value from `block` parameter. + */ @available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *) -public func asMaybe(_ fn: (() async throws -> Element)?) -> Maybe { +public func asMaybe(_ block: (() async throws -> Element)?) -> Maybe { return .create { observer in let task = Task { do { - guard let fn = fn else { + guard let fn = block else { observer(.completed) return } @@ -171,12 +183,18 @@ public func asMaybe(_ fn: (() async throws -> Element)?) -> Maybe ()) -> Completable { +public func asCompletable(_ block: @escaping () async throws -> ()) -> Completable { return .create { observer in let task = Task { do { - try await fn() + try await block() observer(.completed) } catch { observer(.error(error)) From 1802c931aeae54998b4f972ddacde2b43b55184a Mon Sep 17 00:00:00 2001 From: Jinwoo Kim Date: Thu, 31 Mar 2022 10:55:55 +0900 Subject: [PATCH 3/6] [Concurrency] Changed concepts --- .../Traits/Driver/Driver+Concurrency.swift | 72 ++++----- .../Traits/Signal/Signal+Concurrency.swift | 71 ++++----- .../Infallible/Infallible+Concurrency.swift | 39 +++-- .../PrimitiveSequence+Concurrency.swift | 140 +++++++++--------- .../Driver+ConcurrencyTests.swift | 6 +- .../Signal+ConcurrencyTests.swift | 4 +- .../Infallible+ConcurrencyTests.swift | 4 +- .../PrimitiveSequence+ConcurrencyTests.swift | 28 ++-- 8 files changed, 184 insertions(+), 180 deletions(-) diff --git a/RxCocoa/Traits/Driver/Driver+Concurrency.swift b/RxCocoa/Traits/Driver/Driver+Concurrency.swift index 21ebe34fe8..84349f7362 100644 --- a/RxCocoa/Traits/Driver/Driver+Concurrency.swift +++ b/RxCocoa/Traits/Driver/Driver+Concurrency.swift @@ -9,42 +9,46 @@ import RxSwift #if swift(>=5.5.2) && canImport(_Concurrency) && !os(Linux) -/** - Allows converting asynchronous block to `Driver` trait. - - - parameter block: An asynchronous block - - parameter parameter onErrorJustReturn: Element to return in case of error and after that complete the sequence. - - returns: An Driver emits value from `block` parameter. - */ +// MARK: - Driver @available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *) -public func asDriver(_ block: @escaping () async throws -> Element, onErrorJustReturn: Element) -> Driver { - return asSingle(block) - .asDriver(onErrorJustReturn: onErrorJustReturn) -} +public extension Driver { + /** + Allows converting asynchronous block to `Driver` trait. + + - parameter block: An asynchronous block + - parameter parameter onErrorJustReturn: Element to return in case of error and after that complete the sequence. + - returns: An Driver emits value from `block` parameter. + */ + @available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *) + func from(_ block: @escaping () async throws -> Element, onErrorJustReturn: Element) -> Driver { + return Single.from(block) + .asDriver(onErrorJustReturn: onErrorJustReturn) + } -/** - Allows converting asynchronous block to `Driver` trait. - - - parameter block: An asynchronous block - - parameter onErrorJustReturn: Element to return in case of error and after that complete the sequence. - - returns: An Driver emits value from `block` parameter. - */ -@available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *) -public func asDriver(_ block: @escaping () async throws -> Element, onErrorDriveWith: Driver) -> Driver { - return asSingle(block) - .asDriver(onErrorDriveWith: onErrorDriveWith) -} + /** + Allows converting asynchronous block to `Driver` trait. + + - parameter block: An asynchronous block + - parameter onErrorJustReturn: Element to return in case of error and after that complete the sequence. + - returns: An Driver emits value from `block` parameter. + */ + @available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *) + func from(_ block: @escaping () async throws -> Element, onErrorDriveWith: Driver) -> Driver { + return Single.from(block) + .asDriver(onErrorDriveWith: onErrorDriveWith) + } -/** - Allows converting asynchronous block to `Driver` trait. - - - parameter block: An asynchronous block - - parameter onErrorRecover: Calculates driver that continues to drive the sequence in case of error. - - returns: An Driver emits value from `block` parameter. - */ -@available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *) -public func asDriver(_ block: @escaping () async throws -> Element, onErrorRecover: @escaping (Error) -> Driver) ->Driver { - return asSingle(block) - .asDriver(onErrorRecover: onErrorRecover) + /** + Allows converting asynchronous block to `Driver` trait. + + - parameter block: An asynchronous block + - parameter onErrorRecover: Calculates driver that continues to drive the sequence in case of error. + - returns: An Driver emits value from `block` parameter. + */ + @available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *) + func from(_ block: @escaping () async throws -> Element, onErrorRecover: @escaping (Error) -> Driver) ->Driver { + return Single.from(block) + .asDriver(onErrorRecover: onErrorRecover) + } } #endif diff --git a/RxCocoa/Traits/Signal/Signal+Concurrency.swift b/RxCocoa/Traits/Signal/Signal+Concurrency.swift index 54ce0a1278..b74df3f8ab 100644 --- a/RxCocoa/Traits/Signal/Signal+Concurrency.swift +++ b/RxCocoa/Traits/Signal/Signal+Concurrency.swift @@ -9,42 +9,45 @@ import RxSwift #if swift(>=5.5.2) && canImport(_Concurrency) && !os(Linux) -/** - Allows converting asynchronous block to `Signal` trait. - - - parameter block: An asynchronous block - - parameter onErrorJustReturn: Element to return in case of error and after that complete the sequence. - - returns: An Signal emits value from `block` parameter. - */ +// MARK: - Signal @available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *) -public func asSignal(_ block: @escaping () async throws -> Element, onErrorJustReturn: Element) -> Signal { - return asSingle(block) - .asSignal(onErrorJustReturn: onErrorJustReturn) -} +public extension Signal { + /** + Allows converting asynchronous block to `Signal` trait. + + - parameter block: An asynchronous block + - parameter onErrorJustReturn: Element to return in case of error and after that complete the sequence. + - returns: An Signal emits value from `block` parameter. + */ + static func from(_ block: @escaping () async throws -> Element, onErrorJustReturn: Element) -> Signal { + return Single.from(block) + .asSignal(onErrorJustReturn: onErrorJustReturn) + } -/** - Allows converting asynchronous block to `Signal` trait. - - - parameter block: An asynchronous block - - parameter onErrorSignalWith: Signal that continues to emit the sequence in case of error. - - returns: An Signal emits value from `block` parameter. - */ -@available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *) -public func asSignal(_ block: @escaping () async throws -> Element, onErrorSignalWith: Signal) -> Signal { - return asSingle(block) - .asSignal(onErrorSignalWith: onErrorSignalWith) -} + /** + Allows converting asynchronous block to `Signal` trait. + + - parameter block: An asynchronous block + - parameter onErrorSignalWith: Signal that continues to emit the sequence in case of error. + - returns: An Signal emits value from `block` parameter. + */ + @available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *) + static func from(_ block: @escaping () async throws -> Element, onErrorSignalWith: Signal) -> Signal { + return Single.from(block) + .asSignal(onErrorSignalWith: onErrorSignalWith) + } -/** - Allows converting asynchronous block to `Signal` trait. - - - parameter block: An asynchronous block - - parameter onErrorRecover: Calculates signal that continues to emit the sequence in case of error. - - returns: An Signal emits value from `block` parameter. - */ -@available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *) -public func asSignal(_ block: @escaping () async throws -> Element, onErrorRecover: @escaping (_ error: Swift.Error) -> Signal) -> Signal { - return asSingle(block) - .asSignal(onErrorRecover: onErrorRecover) + /** + Allows converting asynchronous block to `Signal` trait. + + - parameter block: An asynchronous block + - parameter onErrorRecover: Calculates signal that continues to emit the sequence in case of error. + - returns: An Signal emits value from `block` parameter. + */ + @available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *) + static func from(_ block: @escaping () async throws -> Element, onErrorRecover: @escaping (_ error: Swift.Error) -> Signal) -> Signal { + return Single.from(block) + .asSignal(onErrorRecover: onErrorRecover) + } } #endif diff --git a/RxSwift/Traits/Infallible/Infallible+Concurrency.swift b/RxSwift/Traits/Infallible/Infallible+Concurrency.swift index 8abdbd1420..61cddc06a1 100644 --- a/RxSwift/Traits/Infallible/Infallible+Concurrency.swift +++ b/RxSwift/Traits/Infallible/Infallible+Concurrency.swift @@ -33,26 +33,25 @@ public extension InfallibleType { } } } -} - -/** - Allows converting asynchronous block to `Infailable` trait. - - - parameter block: An asynchronous block - - returns: An Infailable emits value from `block` parameter. - */ -@available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *) -public func asInfailable(_ block: @escaping () async -> Element) -> Infallible { - return .create { observer in - let task = Task { - let element = await block() - observer(.next(element)) - observer(.completed) - } - - return Disposables.create { - task.cancel() - } + + /** + Allows converting asynchronous block to `Infailable` trait. + + - parameter block: An asynchronous block + - returns: An Infailable emits value from `block` parameter. + */ + static func from(_ block: @escaping () async -> Element) -> Infallible { + return .create { observer in + let task = Task { + let element = await block() + observer(.next(element)) + observer(.completed) + } + + return Disposables.create { + task.cancel() + } + } } } #endif diff --git a/RxSwift/Traits/PrimitiveSequence/PrimitiveSequence+Concurrency.swift b/RxSwift/Traits/PrimitiveSequence/PrimitiveSequence+Concurrency.swift index be07ddf060..26a86a75b0 100644 --- a/RxSwift/Traits/PrimitiveSequence/PrimitiveSequence+Concurrency.swift +++ b/RxSwift/Traits/PrimitiveSequence/PrimitiveSequence+Concurrency.swift @@ -43,6 +43,29 @@ public extension PrimitiveSequenceType where Trait == SingleTrait { ) } } + + /** + Allows converting asynchronous block to `Single` trait. + + - parameter block: An asynchronous block + - returns: An Single emits value from `block` parameter. + */ + static func from(_ block: @escaping () async throws -> Element) -> Single { + return .create { observer in + let task = Task { + do { + let element = try await block() + observer(.success(element)) + } catch { + observer(.failure(error)) + } + } + + return Disposables.create { + task.cancel() + } + } + } } @available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *) @@ -90,6 +113,35 @@ public extension PrimitiveSequenceType where Trait == MaybeTrait { ) } } + + /** + Allows converting asynchronous block to `Maybe` trait. + + - parameter block: An asynchronous block + - returns: An Maybe emits value from `block` parameter. + */ + static func from(_ block: (() async throws -> Element)?) -> Maybe { + return .create { observer in + let task = Task { + do { + guard let fn = block else { + observer(.completed) + return + } + + let element = try await fn() + observer(.success(element)) + } catch { + observer(.error(error)) + } + } + + return Disposables.create { + task.cancel() + } + } + } + } @available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *) @@ -128,82 +180,28 @@ public extension PrimitiveSequenceType where Trait == CompletableTrait, Element ) } } -} - -/** - Allows converting asynchronous block to `Single` trait. - - - parameter block: An asynchronous block - - returns: An Single emits value from `block` parameter. - */ -@available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *) -public func asSingle(_ block: @escaping () async throws -> Element) -> Single { - return .create { observer in - let task = Task { - do { - let element = try await block() - observer(.success(element)) - } catch { - observer(.failure(error)) - } - } - - return Disposables.create { - task.cancel() - } - } -} - -/** - Allows converting asynchronous block to `Maybe` trait. - - - parameter block: An asynchronous block - - returns: An Maybe emits value from `block` parameter. - */ -@available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *) -public func asMaybe(_ block: (() async throws -> Element)?) -> Maybe { - return .create { observer in - let task = Task { - do { - guard let fn = block else { + + /** + Allows converting asynchronous block to `Completable` trait. + + - parameter block: An asynchronous block + - returns: An Completable emits value from `block` parameter. + */ + static func from(_ block: @escaping () async throws -> ()) -> Completable { + return .create { observer in + let task = Task { + do { + try await block() observer(.completed) - return + } catch { + observer(.error(error)) } - - let element = try await fn() - observer(.success(element)) - } catch { - observer(.error(error)) } - } - - return Disposables.create { - task.cancel() - } - } -} - -/** - Allows converting asynchronous block to `Completable` trait. - - - parameter block: An asynchronous block - - returns: An Completable emits value from `block` parameter. - */ -@available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *) -public func asCompletable(_ block: @escaping () async throws -> ()) -> Completable { - return .create { observer in - let task = Task { - do { - try await block() - observer(.completed) - } catch { - observer(.error(error)) + + return Disposables.create { + task.cancel() } } - - return Disposables.create { - task.cancel() - } } } #endif diff --git a/Tests/RxCocoaTests/Driver+ConcurrencyTests.swift b/Tests/RxCocoaTests/Driver+ConcurrencyTests.swift index c7f67be0e8..eb9b983111 100644 --- a/Tests/RxCocoaTests/Driver+ConcurrencyTests.swift +++ b/Tests/RxCocoaTests/Driver+ConcurrencyTests.swift @@ -20,9 +20,9 @@ class DriverConcurrencyTests: RxTest { @available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *) extension DriverConcurrencyTests { - @MainActor func testAwaitsValuesAndFinishes() async { - let driver = asDriver({ - "Hello" + @MainActor func testDriverEmitsElementFromAwait() async { + let driver = Driver.from({ + return "Hello" }, onErrorJustReturn: nil) var didLoop = false diff --git a/Tests/RxCocoaTests/Signal+ConcurrencyTests.swift b/Tests/RxCocoaTests/Signal+ConcurrencyTests.swift index 0cdb9c3525..896d8040df 100644 --- a/Tests/RxCocoaTests/Signal+ConcurrencyTests.swift +++ b/Tests/RxCocoaTests/Signal+ConcurrencyTests.swift @@ -20,8 +20,8 @@ class SignalConcurrencyTests: RxTest { @available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *) extension SignalConcurrencyTests { - @MainActor func testAwaitsValuesAndFinishes() async { - let signal = asSignal({ + @MainActor func testSignalEmitsElementFromAwait() async { + let signal = Signal.from({ "Hello" }, onErrorJustReturn: nil) diff --git a/Tests/RxSwiftTests/Infallible+ConcurrencyTests.swift b/Tests/RxSwiftTests/Infallible+ConcurrencyTests.swift index e36e214cad..326daf71e2 100644 --- a/Tests/RxSwiftTests/Infallible+ConcurrencyTests.swift +++ b/Tests/RxSwiftTests/Infallible+ConcurrencyTests.swift @@ -36,8 +36,8 @@ extension InfallibleConcurrencyTests { } } - func testAsInfailableEmitsElement() async throws { - let infailable = asInfailable { + func testInfailablelEmitsElementFromAwait() async throws { + let infailable = Infallible.from { return "Hello" } diff --git a/Tests/RxSwiftTests/PrimitiveSequence+ConcurrencyTests.swift b/Tests/RxSwiftTests/PrimitiveSequence+ConcurrencyTests.swift index 402cc1b630..98303ea327 100644 --- a/Tests/RxSwiftTests/PrimitiveSequence+ConcurrencyTests.swift +++ b/Tests/RxSwiftTests/PrimitiveSequence+ConcurrencyTests.swift @@ -42,9 +42,9 @@ extension PrimitiveSequenceConcurrencyTests { } } - func testAsSingleEmitsElement() async throws { - let single = asSingle { - "Hello" + func testSingleEmitsElementFromAwait() async throws { + let single = Single.from { + return "Hello" } do { @@ -55,8 +55,8 @@ extension PrimitiveSequenceConcurrencyTests { } } - func testAsSingleThrowsError() async throws { - let single = asSingle { + func testSingleThrowsErrorFromAwait() async throws { + let single = Single.from { throw RxError.unknown } @@ -106,8 +106,8 @@ extension PrimitiveSequenceConcurrencyTests { } } - func testAsMaybeEmitsElement() async throws { - let maybe = asMaybe { + func testMaybeEmitsElementFromAwait() async throws { + let maybe = Maybe.from { "Hello" } @@ -121,7 +121,7 @@ extension PrimitiveSequenceConcurrencyTests { } func testsAsMaybeEmitsWithoutValue() async throws { - let maybe: Maybe = asMaybe(nil) + let maybe = Maybe.from(nil) do { let value = try await maybe.value @@ -131,8 +131,8 @@ extension PrimitiveSequenceConcurrencyTests { } } - func testAsMaybeThrowsError() async throws { - let maybe = asMaybe { + func testMaybeThrowsErrorFromAwait() async throws { + let maybe = Maybe.from { throw RxError.unknown } @@ -170,8 +170,8 @@ extension PrimitiveSequenceConcurrencyTests { } } - func testAsCompletableEmitsVoidOnCompletion() async throws { - let completable = asCompletable { + func testCompletableEmitsElementFromAwait() async throws { + let completable = Completable.from { } @@ -183,8 +183,8 @@ extension PrimitiveSequenceConcurrencyTests { } } - func testAsCompletableThrowsError() async throws { - let completable = asCompletable { + func testCompletableThrowsErrorFromAwait() async throws { + let completable = Completable.from { throw RxError.unknown } From 4b4605d897a67cdd57b590193c35a232f626c55e Mon Sep 17 00:00:00 2001 From: Jinwoo Kim Date: Thu, 31 Mar 2022 11:14:42 +0900 Subject: [PATCH 4/6] [Concurrency] Added priority and detached parameters. --- .../Traits/Driver/Driver+Concurrency.swift | 39 ++++++++----- .../Traits/Signal/Signal+Concurrency.swift | 39 ++++++++----- .../Infallible/Infallible+Concurrency.swift | 38 ++++++++----- .../PrimitiveSequence+Concurrency.swift | 56 ++++++++++++++----- 4 files changed, 115 insertions(+), 57 deletions(-) diff --git a/RxCocoa/Traits/Driver/Driver+Concurrency.swift b/RxCocoa/Traits/Driver/Driver+Concurrency.swift index 84349f7362..27baae22a4 100644 --- a/RxCocoa/Traits/Driver/Driver+Concurrency.swift +++ b/RxCocoa/Traits/Driver/Driver+Concurrency.swift @@ -15,39 +15,48 @@ public extension Driver { /** Allows converting asynchronous block to `Driver` trait. - - parameter block: An asynchronous block - - parameter parameter onErrorJustReturn: Element to return in case of error and after that complete the sequence. - - returns: An Driver emits value from `block` parameter. + - Parameters: + - priority: The priority of the task. + - detached: Detach when creating the task. + - onErrorJustReturn: Element to return in case of error and after that complete the sequence. + - block: An asynchronous block + - Returns: An Driver emits value from `block` parameter. */ @available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *) - func from(_ block: @escaping () async throws -> Element, onErrorJustReturn: Element) -> Driver { - return Single.from(block) + func from(priority: TaskPriority? = nil, detached: Bool = false, _ block: @escaping () async throws -> Element, onErrorJustReturn: Element) -> Driver { + return Single.from(priority: priority, detached: detached, block) .asDriver(onErrorJustReturn: onErrorJustReturn) } /** Allows converting asynchronous block to `Driver` trait. - - parameter block: An asynchronous block - - parameter onErrorJustReturn: Element to return in case of error and after that complete the sequence. - - returns: An Driver emits value from `block` parameter. + - Parameters: + - priority: The priority of the task. + - detached: Detach when creating the task. + - onErrorDriveWith: Driver that continues to drive the sequence in case of error. + - block: An asynchronous block + - Returns: An Driver emits value from `block` parameter. */ @available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *) - func from(_ block: @escaping () async throws -> Element, onErrorDriveWith: Driver) -> Driver { - return Single.from(block) + func from(priority: TaskPriority? = nil, detached: Bool = false, _ block: @escaping () async throws -> Element, onErrorDriveWith: Driver) -> Driver { + return Single.from(priority: priority, detached: detached, block) .asDriver(onErrorDriveWith: onErrorDriveWith) } /** Allows converting asynchronous block to `Driver` trait. - - parameter block: An asynchronous block - - parameter onErrorRecover: Calculates driver that continues to drive the sequence in case of error. - - returns: An Driver emits value from `block` parameter. + - Parameters: + - priority: The priority of the task. + - detached: Detach when creating the task. + - onErrorRecover: Calculates driver that continues to drive the sequence in case of error. + - block: An asynchronous block + - Returns: An Driver emits value from `block` parameter. */ @available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *) - func from(_ block: @escaping () async throws -> Element, onErrorRecover: @escaping (Error) -> Driver) ->Driver { - return Single.from(block) + func from(priority: TaskPriority? = nil, detached: Bool = false, _ block: @escaping () async throws -> Element, onErrorRecover: @escaping (Error) -> Driver) ->Driver { + return Single.from(priority: priority, detached: detached, block) .asDriver(onErrorRecover: onErrorRecover) } } diff --git a/RxCocoa/Traits/Signal/Signal+Concurrency.swift b/RxCocoa/Traits/Signal/Signal+Concurrency.swift index b74df3f8ab..30dfaa7920 100644 --- a/RxCocoa/Traits/Signal/Signal+Concurrency.swift +++ b/RxCocoa/Traits/Signal/Signal+Concurrency.swift @@ -15,38 +15,47 @@ public extension Signal { /** Allows converting asynchronous block to `Signal` trait. - - parameter block: An asynchronous block - - parameter onErrorJustReturn: Element to return in case of error and after that complete the sequence. - - returns: An Signal emits value from `block` parameter. + - Parameters: + - priority: The priority of the task. + - detached: Detach when creating the task. + - onErrorJustReturn: Element to return in case of error and after that complete the sequence. + - block: An asynchronous block + - Returns: An Signal emits value from `block` parameter. */ - static func from(_ block: @escaping () async throws -> Element, onErrorJustReturn: Element) -> Signal { - return Single.from(block) + static func from(priority: TaskPriority? = nil, detached: Bool = false, _ block: @escaping () async throws -> Element, onErrorJustReturn: Element) -> Signal { + return Single.from(priority: priority, detached: detached, block) .asSignal(onErrorJustReturn: onErrorJustReturn) } /** Allows converting asynchronous block to `Signal` trait. - - parameter block: An asynchronous block - - parameter onErrorSignalWith: Signal that continues to emit the sequence in case of error. - - returns: An Signal emits value from `block` parameter. + - Parameters: + - priority: The priority of the task. + - detached: Detach when creating the task. + - onErrorSignalWith: Signal that continues to emit the sequence in case of error. + - block: An asynchronous block + - Returns: An Signal emits value from `block` parameter. */ @available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *) - static func from(_ block: @escaping () async throws -> Element, onErrorSignalWith: Signal) -> Signal { - return Single.from(block) + static func from(priority: TaskPriority? = nil, detached: Bool = false, _ block: @escaping () async throws -> Element, onErrorSignalWith: Signal) -> Signal { + return Single.from(priority: priority, detached: detached, block) .asSignal(onErrorSignalWith: onErrorSignalWith) } /** Allows converting asynchronous block to `Signal` trait. - - parameter block: An asynchronous block - - parameter onErrorRecover: Calculates signal that continues to emit the sequence in case of error. - - returns: An Signal emits value from `block` parameter. + - Parameters: + - priority: The priority of the task. + - detached: Detach when creating the task. + - onErrorRecover: Calculates signal that continues to emit the sequence in case of error. + - block: An asynchronous block + - Returns: An Signal emits value from `block` parameter. */ @available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *) - static func from(_ block: @escaping () async throws -> Element, onErrorRecover: @escaping (_ error: Swift.Error) -> Signal) -> Signal { - return Single.from(block) + static func from(priority: TaskPriority? = nil, detached: Bool = false, _ block: @escaping () async throws -> Element, onErrorRecover: @escaping (_ error: Swift.Error) -> Signal) -> Signal { + return Single.from(priority: priority, detached: detached, block) .asSignal(onErrorRecover: onErrorRecover) } } diff --git a/RxSwift/Traits/Infallible/Infallible+Concurrency.swift b/RxSwift/Traits/Infallible/Infallible+Concurrency.swift index 61cddc06a1..625366e923 100644 --- a/RxSwift/Traits/Infallible/Infallible+Concurrency.swift +++ b/RxSwift/Traits/Infallible/Infallible+Concurrency.swift @@ -27,7 +27,7 @@ public extension InfallibleType { onCompleted: { continuation.finish() }, onDisposed: { continuation.onTermination?(.cancelled) } ) - + continuation.onTermination = { @Sendable _ in disposable.dispose() } @@ -37,21 +37,31 @@ public extension InfallibleType { /** Allows converting asynchronous block to `Infailable` trait. - - parameter block: An asynchronous block - - returns: An Infailable emits value from `block` parameter. + - Parameters: + - priority: The priority of the task. + - detached: Detach when creating the task. + - block: An asynchronous block + - Returns: An Infailable emits value from `block` parameter. */ - static func from(_ block: @escaping () async -> Element) -> Infallible { + static func from(priority: TaskPriority? = nil, detached: Bool = false, _ block: @escaping () async -> Element) -> Infallible { return .create { observer in - let task = Task { - let element = await block() - observer(.next(element)) - observer(.completed) - } - - return Disposables.create { - task.cancel() - } - } + let operation: @Sendable () async throws -> Void = { + let element = await block() + observer(.next(element)) + observer(.completed) + } + let task: Task + + if detached { + task = Task.detached(priority: priority, operation: operation) + } else { + task = Task(priority: priority, operation: operation) + } + + return Disposables.create { + task.cancel() + } + } } } #endif diff --git a/RxSwift/Traits/PrimitiveSequence/PrimitiveSequence+Concurrency.swift b/RxSwift/Traits/PrimitiveSequence/PrimitiveSequence+Concurrency.swift index 26a86a75b0..f8d938fc5d 100644 --- a/RxSwift/Traits/PrimitiveSequence/PrimitiveSequence+Concurrency.swift +++ b/RxSwift/Traits/PrimitiveSequence/PrimitiveSequence+Concurrency.swift @@ -47,12 +47,15 @@ public extension PrimitiveSequenceType where Trait == SingleTrait { /** Allows converting asynchronous block to `Single` trait. - - parameter block: An asynchronous block - - returns: An Single emits value from `block` parameter. + - Parameters: + - priority: The priority of the task. + - detached: Detach when creating the task. + - block: An asynchronous block + - Returns: An Single emits value from `block` parameter. */ - static func from(_ block: @escaping () async throws -> Element) -> Single { + static func from(priority: TaskPriority? = nil, detached: Bool = false, _ block: @escaping () async throws -> Element) -> Single { return .create { observer in - let task = Task { + let operation: @Sendable () async throws -> Void = { do { let element = try await block() observer(.success(element)) @@ -60,6 +63,13 @@ public extension PrimitiveSequenceType where Trait == SingleTrait { observer(.failure(error)) } } + let task: Task + + if detached { + task = Task.detached(priority: priority, operation: operation) + } else { + task = Task(priority: priority, operation: operation) + } return Disposables.create { task.cancel() @@ -117,12 +127,15 @@ public extension PrimitiveSequenceType where Trait == MaybeTrait { /** Allows converting asynchronous block to `Maybe` trait. - - parameter block: An asynchronous block - - returns: An Maybe emits value from `block` parameter. + - Parameters: + - priority: The priority of the task. + - detached: Detach when creating the task. + - block: An asynchronous block + - Returns: An Maybe emits value from `block` parameter. */ - static func from(_ block: (() async throws -> Element)?) -> Maybe { + static func from(priority: TaskPriority? = nil, detached: Bool = false, _ block: (() async throws -> Element)?) -> Maybe { return .create { observer in - let task = Task { + let operation: @Sendable () async throws -> Void = { do { guard let fn = block else { observer(.completed) @@ -135,13 +148,20 @@ public extension PrimitiveSequenceType where Trait == MaybeTrait { observer(.error(error)) } } + let task: Task + + if detached { + task = Task.detached(priority: priority, operation: operation) + } else { + task = Task(priority: priority, operation: operation) + } return Disposables.create { task.cancel() } } } - + } @available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *) @@ -184,12 +204,15 @@ public extension PrimitiveSequenceType where Trait == CompletableTrait, Element /** Allows converting asynchronous block to `Completable` trait. - - parameter block: An asynchronous block - - returns: An Completable emits value from `block` parameter. + - Parameters: + - priority: The priority of the task. + - detached: Detach when creating the task. + - block: An asynchronous block + - Returns: An Completable emits value from `block` parameter. */ - static func from(_ block: @escaping () async throws -> ()) -> Completable { + static func from(priority: TaskPriority? = nil, detached: Bool = false, _ block: @escaping () async throws -> ()) -> Completable { return .create { observer in - let task = Task { + let operation: @Sendable () async throws -> Void = { do { try await block() observer(.completed) @@ -197,6 +220,13 @@ public extension PrimitiveSequenceType where Trait == CompletableTrait, Element observer(.error(error)) } } + let task: Task + + if detached { + task = Task.detached(priority: priority, operation: operation) + } else { + task = Task(priority: priority, operation: operation) + } return Disposables.create { task.cancel() From 791539e7aae20ecf8be47956e0b86f02417f4286 Mon Sep 17 00:00:00 2001 From: Jinwoo Kim Date: Thu, 31 Mar 2022 11:19:45 +0900 Subject: [PATCH 5/6] Fixed typo. --- RxCocoa/Traits/Driver/Driver+Concurrency.swift | 6 +++--- RxCocoa/Traits/Signal/Signal+Concurrency.swift | 6 +++--- RxSwift/Traits/Infallible/Infallible+Concurrency.swift | 2 +- .../PrimitiveSequence/PrimitiveSequence+Concurrency.swift | 6 +++--- 4 files changed, 10 insertions(+), 10 deletions(-) diff --git a/RxCocoa/Traits/Driver/Driver+Concurrency.swift b/RxCocoa/Traits/Driver/Driver+Concurrency.swift index 27baae22a4..afed81fc14 100644 --- a/RxCocoa/Traits/Driver/Driver+Concurrency.swift +++ b/RxCocoa/Traits/Driver/Driver+Concurrency.swift @@ -19,7 +19,7 @@ public extension Driver { - priority: The priority of the task. - detached: Detach when creating the task. - onErrorJustReturn: Element to return in case of error and after that complete the sequence. - - block: An asynchronous block + - block: An asynchronous block. - Returns: An Driver emits value from `block` parameter. */ @available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *) @@ -35,7 +35,7 @@ public extension Driver { - priority: The priority of the task. - detached: Detach when creating the task. - onErrorDriveWith: Driver that continues to drive the sequence in case of error. - - block: An asynchronous block + - block: An asynchronous block. - Returns: An Driver emits value from `block` parameter. */ @available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *) @@ -51,7 +51,7 @@ public extension Driver { - priority: The priority of the task. - detached: Detach when creating the task. - onErrorRecover: Calculates driver that continues to drive the sequence in case of error. - - block: An asynchronous block + - block: An asynchronous block. - Returns: An Driver emits value from `block` parameter. */ @available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *) diff --git a/RxCocoa/Traits/Signal/Signal+Concurrency.swift b/RxCocoa/Traits/Signal/Signal+Concurrency.swift index 30dfaa7920..f2d1ecb9df 100644 --- a/RxCocoa/Traits/Signal/Signal+Concurrency.swift +++ b/RxCocoa/Traits/Signal/Signal+Concurrency.swift @@ -19,7 +19,7 @@ public extension Signal { - priority: The priority of the task. - detached: Detach when creating the task. - onErrorJustReturn: Element to return in case of error and after that complete the sequence. - - block: An asynchronous block + - block: An asynchronous block. - Returns: An Signal emits value from `block` parameter. */ static func from(priority: TaskPriority? = nil, detached: Bool = false, _ block: @escaping () async throws -> Element, onErrorJustReturn: Element) -> Signal { @@ -34,7 +34,7 @@ public extension Signal { - priority: The priority of the task. - detached: Detach when creating the task. - onErrorSignalWith: Signal that continues to emit the sequence in case of error. - - block: An asynchronous block + - block: An asynchronous block. - Returns: An Signal emits value from `block` parameter. */ @available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *) @@ -50,7 +50,7 @@ public extension Signal { - priority: The priority of the task. - detached: Detach when creating the task. - onErrorRecover: Calculates signal that continues to emit the sequence in case of error. - - block: An asynchronous block + - block: An asynchronous block. - Returns: An Signal emits value from `block` parameter. */ @available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *) diff --git a/RxSwift/Traits/Infallible/Infallible+Concurrency.swift b/RxSwift/Traits/Infallible/Infallible+Concurrency.swift index 625366e923..15f511df76 100644 --- a/RxSwift/Traits/Infallible/Infallible+Concurrency.swift +++ b/RxSwift/Traits/Infallible/Infallible+Concurrency.swift @@ -40,7 +40,7 @@ public extension InfallibleType { - Parameters: - priority: The priority of the task. - detached: Detach when creating the task. - - block: An asynchronous block + - block: An asynchronous block. - Returns: An Infailable emits value from `block` parameter. */ static func from(priority: TaskPriority? = nil, detached: Bool = false, _ block: @escaping () async -> Element) -> Infallible { diff --git a/RxSwift/Traits/PrimitiveSequence/PrimitiveSequence+Concurrency.swift b/RxSwift/Traits/PrimitiveSequence/PrimitiveSequence+Concurrency.swift index f8d938fc5d..fd3b8db723 100644 --- a/RxSwift/Traits/PrimitiveSequence/PrimitiveSequence+Concurrency.swift +++ b/RxSwift/Traits/PrimitiveSequence/PrimitiveSequence+Concurrency.swift @@ -50,7 +50,7 @@ public extension PrimitiveSequenceType where Trait == SingleTrait { - Parameters: - priority: The priority of the task. - detached: Detach when creating the task. - - block: An asynchronous block + - block: An asynchronous block. - Returns: An Single emits value from `block` parameter. */ static func from(priority: TaskPriority? = nil, detached: Bool = false, _ block: @escaping () async throws -> Element) -> Single { @@ -130,7 +130,7 @@ public extension PrimitiveSequenceType where Trait == MaybeTrait { - Parameters: - priority: The priority of the task. - detached: Detach when creating the task. - - block: An asynchronous block + - block: An asynchronous block. - Returns: An Maybe emits value from `block` parameter. */ static func from(priority: TaskPriority? = nil, detached: Bool = false, _ block: (() async throws -> Element)?) -> Maybe { @@ -207,7 +207,7 @@ public extension PrimitiveSequenceType where Trait == CompletableTrait, Element - Parameters: - priority: The priority of the task. - detached: Detach when creating the task. - - block: An asynchronous block + - block: An asynchronous block. - Returns: An Completable emits value from `block` parameter. */ static func from(priority: TaskPriority? = nil, detached: Bool = false, _ block: @escaping () async throws -> ()) -> Completable { From e74cc89fccdb98c7ec2f827c45a580ad0909b623 Mon Sep 17 00:00:00 2001 From: Jinwoo Kim Date: Thu, 31 Mar 2022 11:21:53 +0900 Subject: [PATCH 6/6] [Concurrency] Removed throws mark on block definition. --- RxSwift/Traits/Infallible/Infallible+Concurrency.swift | 2 +- .../PrimitiveSequence/PrimitiveSequence+Concurrency.swift | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/RxSwift/Traits/Infallible/Infallible+Concurrency.swift b/RxSwift/Traits/Infallible/Infallible+Concurrency.swift index 15f511df76..12b9cef502 100644 --- a/RxSwift/Traits/Infallible/Infallible+Concurrency.swift +++ b/RxSwift/Traits/Infallible/Infallible+Concurrency.swift @@ -45,7 +45,7 @@ public extension InfallibleType { */ static func from(priority: TaskPriority? = nil, detached: Bool = false, _ block: @escaping () async -> Element) -> Infallible { return .create { observer in - let operation: @Sendable () async throws -> Void = { + let operation: @Sendable () async -> Void = { let element = await block() observer(.next(element)) observer(.completed) diff --git a/RxSwift/Traits/PrimitiveSequence/PrimitiveSequence+Concurrency.swift b/RxSwift/Traits/PrimitiveSequence/PrimitiveSequence+Concurrency.swift index fd3b8db723..b06396f561 100644 --- a/RxSwift/Traits/PrimitiveSequence/PrimitiveSequence+Concurrency.swift +++ b/RxSwift/Traits/PrimitiveSequence/PrimitiveSequence+Concurrency.swift @@ -55,7 +55,7 @@ public extension PrimitiveSequenceType where Trait == SingleTrait { */ static func from(priority: TaskPriority? = nil, detached: Bool = false, _ block: @escaping () async throws -> Element) -> Single { return .create { observer in - let operation: @Sendable () async throws -> Void = { + let operation: @Sendable () async -> Void = { do { let element = try await block() observer(.success(element)) @@ -135,7 +135,7 @@ public extension PrimitiveSequenceType where Trait == MaybeTrait { */ static func from(priority: TaskPriority? = nil, detached: Bool = false, _ block: (() async throws -> Element)?) -> Maybe { return .create { observer in - let operation: @Sendable () async throws -> Void = { + let operation: @Sendable () async -> Void = { do { guard let fn = block else { observer(.completed) @@ -212,7 +212,7 @@ public extension PrimitiveSequenceType where Trait == CompletableTrait, Element */ static func from(priority: TaskPriority? = nil, detached: Bool = false, _ block: @escaping () async throws -> ()) -> Completable { return .create { observer in - let operation: @Sendable () async throws -> Void = { + let operation: @Sendable () async -> Void = { do { try await block() observer(.completed)