Skip to content

Commit 24bb86a

Browse files
authored
Merge pull request #73 from fpillet/rewrite-errors-observable
Rewrite errors observable
2 parents 4daa9d3 + 87f449b commit 24bb86a

File tree

1 file changed

+22
-40
lines changed

1 file changed

+22
-40
lines changed

Sources/Action/Action.swift

Lines changed: 22 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -61,13 +61,19 @@ public final class Action<Input, Element> {
6161

6262
let enabledSubject = BehaviorSubject<Bool>(value: false)
6363
enabled = enabledSubject.asObservable()
64+
65+
let errorsSubject = PublishSubject<ActionError>()
66+
errors = errorsSubject.asObservable()
6467

6568
executionObservables = inputs
6669
.withLatestFrom(enabled) { $0 }
6770
.flatMap { input, enabled -> Observable<Observable<Element>> in
6871
if enabled {
69-
return Observable.of(workFactory(input).shareReplay(1))
72+
return Observable.of(workFactory(input)
73+
.do(onError: { errorsSubject.onNext(.underlyingError($0)) })
74+
.shareReplay(1))
7075
} else {
76+
errorsSubject.onNext(.notEnabled)
7177
return Observable.empty()
7278
}
7379
}
@@ -76,27 +82,6 @@ public final class Action<Input, Element> {
7682
elements = executionObservables
7783
.flatMap { $0.catchError { _ in Observable.empty() } }
7884

79-
let notEnabledError = inputs
80-
.withLatestFrom(enabled)
81-
.flatMap { $0 ? Observable.empty() : Observable.of(ActionError.notEnabled) }
82-
83-
let underlyingError = executionObservables
84-
.flatMap { elements in
85-
return elements
86-
.flatMap { _ in Observable<ActionError>.never() }
87-
.catchError { error in
88-
if let actionError = error as? ActionError {
89-
return Observable.of(actionError)
90-
} else {
91-
return Observable.of(.underlyingError(error))
92-
}
93-
}
94-
}
95-
96-
errors = Observable
97-
.of(notEnabledError, underlyingError)
98-
.merge()
99-
10085
executing = executionObservables.flatMap {
10186
execution -> Observable<Bool> in
10287
let execution = execution
@@ -122,23 +107,20 @@ public final class Action<Input, Element> {
122107
inputs.onNext(value)
123108
}
124109

125-
let execution = executionObservables
126-
.take(1)
127-
.flatMap { $0 }
128-
.catchError { throw ActionError.underlyingError($0) }
129-
130-
let notEnabledError = inputs
131-
.takeUntil(executionObservables)
132-
.withLatestFrom(enabled)
133-
.flatMap { $0 ? Observable<Element>.empty() : Observable.error(ActionError.notEnabled) }
134-
135-
let subject = ReplaySubject<Element>.createUnbounded()
136-
Observable
137-
.of(execution, notEnabledError)
138-
.merge()
139-
.subscribe(subject)
140-
.addDisposableTo(disposeBag)
141-
142-
return subject.asObservable()
110+
let subject = ReplaySubject<Element>.createUnbounded()
111+
112+
let work = executionObservables
113+
.map { $0.catchError { throw ActionError.underlyingError($0) } }
114+
115+
let error = errors
116+
.map { Observable<Element>.error($0) }
117+
118+
work.amb(error)
119+
.take(1)
120+
.flatMap { $0 }
121+
.subscribe(subject)
122+
.addDisposableTo(disposeBag)
123+
124+
return subject.asObservable()
143125
}
144126
}

0 commit comments

Comments
 (0)