Skip to content

Commit 87f449b

Browse files
committed
Rewrote execute() to eliminate the direct plug on inputs that could cause out-of-order erroring
1 parent 6cb0ce5 commit 87f449b

File tree

1 file changed

+16
-19
lines changed

1 file changed

+16
-19
lines changed

Sources/Action/Action.swift

Lines changed: 16 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ public final class Action<Input, Element> {
7070
.flatMap { input, enabled -> Observable<Observable<Element>> in
7171
if enabled {
7272
return Observable.of(workFactory(input)
73-
.do(onError: { error in errorsSubject.onNext(.underlyingError(error)) })
73+
.do(onError: { errorsSubject.onNext(.underlyingError($0)) })
7474
.shareReplay(1))
7575
} else {
7676
errorsSubject.onNext(.notEnabled)
@@ -107,23 +107,20 @@ public final class Action<Input, Element> {
107107
inputs.onNext(value)
108108
}
109109

110-
let execution = executionObservables
111-
.take(1)
112-
.flatMap { $0 }
113-
.catchError { throw ActionError.underlyingError($0) }
114-
115-
let notEnabledError = inputs
116-
.takeUntil(executionObservables)
117-
.withLatestFrom(enabled)
118-
.flatMap { $0 ? Observable<Element>.empty() : Observable.error(ActionError.notEnabled) }
119-
120-
let subject = ReplaySubject<Element>.createUnbounded()
121-
Observable
122-
.of(execution, notEnabledError)
123-
.merge()
124-
.subscribe(subject)
125-
.addDisposableTo(disposeBag)
126-
127-
return subject.asObservable()
110+
let subject = ReplaySubject<Element>.createUnbounded()
111+
112+
let work = executionObservables
113+
.map { $0.catchError { throw ActionError.underlyingError($0) } }
114+
115+
let error = errors
116+
.map { Observable<Element>.error($0) }
117+
118+
work.amb(error)
119+
.take(1)
120+
.flatMap { $0 }
121+
.subscribe(subject)
122+
.addDisposableTo(disposeBag)
123+
124+
return subject.asObservable()
128125
}
129126
}

0 commit comments

Comments
 (0)