Skip to content

Commit 543f8d5

Browse files
Aaron Tullbenjchristensen
authored andcommitted
retryWhen/repeatWhen
1 parent cb6333e commit 543f8d5

File tree

6 files changed

+621
-297
lines changed

6 files changed

+621
-297
lines changed

rxjava-core/src/main/java/rx/Observable.java

Lines changed: 87 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -5558,7 +5558,7 @@ public final <R> Observable<R> reduce(R initialValue, Func2<R, ? super T, R> acc
55585558
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229428.aspx">MSDN: Observable.Repeat</a>
55595559
*/
55605560
public final Observable<T> repeat() {
5561-
return nest().lift(new OperatorRepeat<T>());
5561+
return OnSubscribeRedo.<T>repeat(this);
55625562
}
55635563

55645564
/**
@@ -5574,7 +5574,7 @@ public final Observable<T> repeat() {
55745574
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229428.aspx">MSDN: Observable.Repeat</a>
55755575
*/
55765576
public final Observable<T> repeat(Scheduler scheduler) {
5577-
return nest().lift(new OperatorRepeat<T>(scheduler));
5577+
return OnSubscribeRedo.<T>repeat(this, scheduler);
55785578
}
55795579

55805580
/**
@@ -5596,11 +5596,8 @@ public final Observable<T> repeat(Scheduler scheduler) {
55965596
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229428.aspx">MSDN: Observable.Repeat</a>
55975597
* @since 0.17
55985598
*/
5599-
public final Observable<T> repeat(long count) {
5600-
if (count < 0) {
5601-
throw new IllegalArgumentException("count >= 0 expected");
5602-
}
5603-
return nest().lift(new OperatorRepeat<T>(count));
5599+
public final Observable<T> repeat(final long count) {
5600+
return OnSubscribeRedo.<T>repeat(this, count);
56045601
}
56055602

56065603
/**
@@ -5616,12 +5613,50 @@ public final Observable<T> repeat(long count) {
56165613
* the {@link Scheduler} to emit the items on
56175614
* @return an Observable that repeats the sequence of items emitted by the source Observable at most
56185615
* {@code count} times on a particular Scheduler
5619-
* @see <a href="https://github.com/Netflix/RxJava/wiki/Creating-Observables#repeat">RxJava wiki: repeat</a>
5616+
* @see <a href="https://github.com/Netflix/RxJava/wiki/Creating-Observables#wiki-repeat">RxJava Wiki: repeat()</a>
56205617
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229428.aspx">MSDN: Observable.Repeat</a>
56215618
* @since 0.17
56225619
*/
5623-
public final Observable<T> repeat(long count, Scheduler scheduler) {
5624-
return nest().lift(new OperatorRepeat<T>(count, scheduler));
5620+
public final Observable<T> repeat(final long count, Scheduler scheduler) {
5621+
return OnSubscribeRedo.<T>repeat(this, count, scheduler);
5622+
}
5623+
5624+
/**
5625+
* Returns an Observable that emits the same values as the source observable with the exception of an {@code onCompleted}.
5626+
* An onCompleted will emit a {@link Notification} to the observable provided as an argument to the notificationHandler
5627+
* func. If the observable returned {@code onCompletes} or {@code onErrors} then repeat will call {@code onCompleted}
5628+
* or {@code onError} on the child subscription. Otherwise, this observable will resubscribe to the source observable, on a particular Scheduler.
5629+
* <p>
5630+
* <img width="640" height="430" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/repeatWhen.f.png" alt="">
5631+
*
5632+
* @param notificationHandler
5633+
* recieves an Observable of notifications with which a user can complete or error, aborting the repeat.
5634+
* @param scheduler
5635+
* the {@link Scheduler} to emit the items on
5636+
* @return the source Observable modified with repeat logic
5637+
* @see <a href="https://github.com/Netflix/RxJava/wiki/Creating-Observables#wiki-repeat">RxJava Wiki: repeatWhen()</a>
5638+
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229428.aspx">MSDN: Observable.Repeat</a>
5639+
*/
5640+
public final Observable<T> repeatWhen(Func1<? super Observable<? extends Notification<?>>, ? extends Observable<? extends Notification<?>>> notificationHandler, Scheduler scheduler) {
5641+
return OnSubscribeRedo.repeat(this, notificationHandler, scheduler);
5642+
}
5643+
5644+
/**
5645+
* Returns an Observable that emits the same values as the source observable with the exception of an {@code onCompleted}.
5646+
* An onCompleted will emit a {@link Notification} to the observable provided as an argument to the notificationHandler
5647+
* func. If the observable returned {@code onCompletes} or {@code onErrors} then repeat will call {@code onCompleted}
5648+
* or {@code onError} on the child subscription. Otherwise, this observable will resubscribe to the source observable.
5649+
* <p>
5650+
* <img width="640" height="430" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/repeatWhen.f.png" alt="">
5651+
*
5652+
* @param notificationHandler
5653+
* recieves an Observable of notifications with which a user can complete or error, aborting the repeat.
5654+
* @return the source Observable modified with repeat logic
5655+
* @see <a href="https://github.com/Netflix/RxJava/wiki/Creating-Observables#wiki-repeat">RxJava Wiki: repeatWhen()</a>
5656+
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229428.aspx">MSDN: Observable.Repeat</a>
5657+
*/
5658+
public final Observable<T> repeatWhen(Func1<? super Observable<? extends Notification<?>>, ? extends Observable<? extends Notification<?>>> notificationHandler) {
5659+
return OnSubscribeRedo.repeat(this, notificationHandler);
56255660
}
56265661

56275662
/**
@@ -6177,7 +6212,7 @@ public final ConnectableObservable<T> replay(Scheduler scheduler) {
61776212
* @see <a href="http://msdn.microsoft.com/en-us/library/system.reactive.linq.observable.retry.aspx">MSDN: Observable.Retry</a>
61786213
*/
61796214
public final Observable<T> retry() {
6180-
return nest().lift(new OperatorRetry<T>());
6215+
return OnSubscribeRedo.<T>retry(this);
61816216
}
61826217

61836218
/**
@@ -6201,8 +6236,8 @@ public final Observable<T> retry() {
62016236
* @see <a href="https://github.com/Netflix/RxJava/wiki/Error-Handling-Operators#retry">RxJava wiki: retry</a>
62026237
* @see <a href="http://msdn.microsoft.com/en-us/library/system.reactive.linq.observable.retry.aspx">MSDN: Observable.Retry</a>
62036238
*/
6204-
public final Observable<T> retry(int retryCount) {
6205-
return nest().lift(new OperatorRetry<T>(retryCount));
6239+
public final Observable<T> retry(final long count) {
6240+
return OnSubscribeRedo.<T>retry(this, count);
62066241
}
62076242

62086243
/**
@@ -6218,13 +6253,49 @@ public final Observable<T> retry(int retryCount) {
62186253
* and retry count
62196254
* @return the source Observable modified with retry logic
62206255
* @see #retry()
6221-
* @see <a href="https://github.com/Netflix/RxJava/wiki/Error-Handling-Operators#retry">RxJava wiki: retry</a>
6222-
* @see <a href="http://msdn.microsoft.com/en-us/library/system.reactive.linq.observable.retry.aspx">MSDN: Observable.Retry</a>
6256+
* @see <a href="https://github.com/Netflix/RxJava/wiki/Error-Handling-Operators#retry">RxJava Wiki: retry()</a>
62236257
*/
62246258
public final Observable<T> retry(Func2<Integer, Throwable, Boolean> predicate) {
62256259
return nest().lift(new OperatorRetryWithPredicate<T>(predicate));
62266260
}
6227-
6261+
6262+
/**
6263+
* Returns an Observable that emits the same values as the source observable with the exception of an {@code onError}.
6264+
* An onError will emit a {@link Notification} to the observable provided as an argument to the notificationHandler
6265+
* func. If the observable returned {@code onCompletes} or {@code onErrors} then retry will call {@code onCompleted}
6266+
* or {@code onError} on the child subscription. Otherwise, this observable will resubscribe to the source observable.
6267+
* <p>
6268+
* <img width="640" height="430" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/retryWhen.f.png" alt="">
6269+
* <p>
6270+
* {@code retryWhen} operates by default on the {@code trampoline} {@link Scheduler}.
6271+
*
6272+
* @param notificationHandler
6273+
* recieves an Observable of notifications with which a user can complete or error, aborting the retry.
6274+
* @return the source Observable modified with retry logic
6275+
* @see <a href="https://github.com/Netflix/RxJava/wiki/Error-Handling-Operators#wiki-retry">RxJava Wiki: retryWhen()</a>
6276+
*/
6277+
public final Observable<T> retryWhen(Func1<? super Observable<? extends Notification<?>>, ? extends Observable<?>> notificationHandler) {
6278+
return OnSubscribeRedo.<T> retry(this, notificationHandler);
6279+
}
6280+
6281+
/**
6282+
* Returns an Observable that emits the same values as the source observable with the exception of an {@code onError}.
6283+
* An onError will emit a {@link Notification} to the observable provided as an argument to the notificationHandler
6284+
* func. If the observable returned {@code onCompletes} or {@code onErrors} then retry will call {@code onCompleted}
6285+
* or {@code onError} on the child subscription. Otherwise, this observable will resubscribe to the source observable, on a particular Scheduler.
6286+
* <p>
6287+
* <img width="640" height="430" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/retryWhen.f.png" alt="">
6288+
* <p>
6289+
*
6290+
* @param notificationHandler
6291+
* recieves an Observable of notifications with which a user can complete or error, aborting the retry.
6292+
* @return the source Observable modified with retry logic
6293+
* @see <a href="https://github.com/Netflix/RxJava/wiki/Error-Handling-Operators#wiki-retry">RxJava Wiki: retryWhen()</a>
6294+
*/
6295+
public final Observable<T> retryWhen(Func1<? super Observable<? extends Notification<?>>, ? extends Observable<? extends Notification<?>>> notificationHandler, Scheduler scheduler) {
6296+
return OnSubscribeRedo.<T> retry(this, notificationHandler, scheduler);
6297+
}
6298+
62286299
/**
62296300
* Returns an Observable that emits the most recently emitted item (if any) emitted by the source Observable
62306301
* within periodic time intervals.

0 commit comments

Comments
 (0)