Skip to content

Commit 17e8102

Browse files
committed
feat: Add detached parameter to AsyncSequence.asObservable()
1 parent b0e5e21 commit 17e8102

File tree

2 files changed

+10
-7
lines changed

2 files changed

+10
-7
lines changed

RxSwift/Observable+Concurrency.swift

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -58,22 +58,25 @@ public extension AsyncSequence {
5858
/// values of the asynchronous sequence's type
5959
///
6060
/// - returns: An `Observable` of the async sequence's type
61-
func asObservable() -> Observable<Element> {
61+
func asObservable(detached: Bool = false) -> Observable<Element> {
6262
Observable.create { observer in
63-
let task = Task.detached {
63+
let taskBlock = {
6464
do {
6565
for try await value in self {
6666
observer.onNext(value)
6767
}
68-
6968
observer.onCompleted()
7069
} catch is CancellationError {
7170
observer.onCompleted()
7271
} catch {
7372
observer.onError(error)
7473
}
7574
}
76-
75+
76+
let task: Task<Void, Never> = detached
77+
? Task.detached(operation: taskBlock)
78+
: Task(operation: taskBlock)
79+
7780
return Disposables.create { task.cancel() }
7881
}
7982
}

Tests/RxSwiftTests/Observable+ConcurrencyTests.swift

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,7 @@ extension ObservableConcurrencyTests {
102102
let expectation = XCTestExpectation(description: "Observable completes")
103103

104104
DispatchQueue.main.async {
105-
let observable = asyncSequence.asObservable()
105+
let observable = asyncSequence.asObservable(detached: true)
106106

107107
var threadIsNotMain = false
108108
var values = [Int]()
@@ -141,7 +141,7 @@ extension ObservableConcurrencyTests {
141141
var values = [Int]()
142142
var executionThreads = Set<String>()
143143

144-
_ = asyncSequence.asObservable().subscribe(
144+
_ = asyncSequence.asObservable(detached: true).subscribe(
145145
onNext: { value in
146146
values.append(value)
147147
let threadName = Thread.current.description
@@ -178,7 +178,7 @@ extension ObservableConcurrencyTests {
178178
var threadIsNotMain = false
179179

180180
DispatchQueue.main.async {
181-
_ = asyncSequence.asObservable().subscribe(
181+
_ = asyncSequence.asObservable(detached: true).subscribe(
182182
onNext: { value in
183183
values.append(value)
184184
threadIsNotMain = !Thread.isMainThread

0 commit comments

Comments
 (0)