-
Notifications
You must be signed in to change notification settings - Fork 3k
Closed
Description
Feature Request
Like catchError it would be nice if there were a native way to catch a complete stream and replace it with another stream. This lets you chain a new observable into the completion of a previous observable.
Here is how I've implemented it in the past.
function catchComplete<T, R>(fn: () => Observable<R>): OperatorFunction<T, T|R>{
return s => new Observable(observer => {
let iSub: Subscription;
const oSub = s.subscribe({
next: x => observer.next(x),
error: err => observer.error(err),
complete: () => {
iSub = fn().subscribe(observer);
}
});
return { unsubscribe: () => {
oSub?.unsubscribe();
iSub?.unsubscribe();
}}
});
}or
function catchComplete<T, R>(fn: () => Observable<R>): OperatorFunction<T, T|R> {
return s => concat(s, defer(fn));
}Here is a simple example of this in use:
from([1,2,3,4,5]).pipe(
catchComplete(() => from([5,4,3,2,1]))
).subscribe(console.log);
// Output: 1 2 3 4 5 5 4 3 2 1Alternatives do exist,
Generally:
stream1.pipe(
catchComplete(() => stream2)
)Can be re-written as:
concat(
stream1,
defer(() => stream2)
)I would argue that catchComplete is more ergonomic and easier to understand than the equivalent use of concat
Consider the following two snippets of code:
stream1.pipe(
mergeMap(this.serviceCall),
catchComplete(() => of("this.finalValue"))
map(msg => msg + "!")
)vs
concat(
stream1.pipe(
mergeMap(this.serviceCall)
),
defer(() => of("this.finalValue"))
).pipe(
map(msg => msg + "!")
)Reactions are currently unavailable
Metadata
Metadata
Assignees
Labels
No labels