@@ -5558,7 +5558,7 @@ public final <R> Observable<R> reduce(R initialValue, Func2<R, ? super T, R> acc
55585558 * @see <a href="http://msdn.microsoft.com/en-us/library/hh229428.aspx">MSDN: Observable.Repeat</a>
55595559 */
55605560 public final Observable <T > repeat () {
5561- return nest (). lift ( new OperatorRepeat <T >() );
5561+ return OnSubscribeRedo . <T >repeat ( this );
55625562 }
55635563
55645564 /**
@@ -5574,7 +5574,7 @@ public final Observable<T> repeat() {
55745574 * @see <a href="http://msdn.microsoft.com/en-us/library/hh229428.aspx">MSDN: Observable.Repeat</a>
55755575 */
55765576 public final Observable <T > repeat (Scheduler scheduler ) {
5577- return nest (). lift ( new OperatorRepeat <T >( scheduler ) );
5577+ return OnSubscribeRedo . <T >repeat ( this , scheduler );
55785578 }
55795579
55805580 /**
@@ -5596,11 +5596,8 @@ public final Observable<T> repeat(Scheduler scheduler) {
55965596 * @see <a href="http://msdn.microsoft.com/en-us/library/hh229428.aspx">MSDN: Observable.Repeat</a>
55975597 * @since 0.17
55985598 */
5599- public final Observable <T > repeat (long count ) {
5600- if (count < 0 ) {
5601- throw new IllegalArgumentException ("count >= 0 expected" );
5602- }
5603- return nest ().lift (new OperatorRepeat <T >(count ));
5599+ public final Observable <T > repeat (final long count ) {
5600+ return OnSubscribeRedo .<T >repeat (this , count );
56045601 }
56055602
56065603 /**
@@ -5616,12 +5613,50 @@ public final Observable<T> repeat(long count) {
56165613 * the {@link Scheduler} to emit the items on
56175614 * @return an Observable that repeats the sequence of items emitted by the source Observable at most
56185615 * {@code count} times on a particular Scheduler
5619- * @see <a href="https://github.com/Netflix/RxJava/wiki/Creating-Observables#repeat">RxJava wiki : repeat</a>
5616+ * @see <a href="https://github.com/Netflix/RxJava/wiki/Creating-Observables#wiki- repeat">RxJava Wiki : repeat() </a>
56205617 * @see <a href="http://msdn.microsoft.com/en-us/library/hh229428.aspx">MSDN: Observable.Repeat</a>
56215618 * @since 0.17
56225619 */
5623- public final Observable <T > repeat (long count , Scheduler scheduler ) {
5624- return nest ().lift (new OperatorRepeat <T >(count , scheduler ));
5620+ public final Observable <T > repeat (final long count , Scheduler scheduler ) {
5621+ return OnSubscribeRedo .<T >repeat (this , count , scheduler );
5622+ }
5623+
5624+ /**
5625+ * Returns an Observable that emits the same values as the source observable with the exception of an {@code onCompleted}.
5626+ * An onCompleted will emit a {@link Notification} to the observable provided as an argument to the notificationHandler
5627+ * func. If the observable returned {@code onCompletes} or {@code onErrors} then repeat will call {@code onCompleted}
5628+ * or {@code onError} on the child subscription. Otherwise, this observable will resubscribe to the source observable, on a particular Scheduler.
5629+ * <p>
5630+ * <img width="640" height="430" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/repeatWhen.f.png" alt="">
5631+ *
5632+ * @param notificationHandler
5633+ * recieves an Observable of notifications with which a user can complete or error, aborting the repeat.
5634+ * @param scheduler
5635+ * the {@link Scheduler} to emit the items on
5636+ * @return the source Observable modified with repeat logic
5637+ * @see <a href="https://github.com/Netflix/RxJava/wiki/Creating-Observables#wiki-repeat">RxJava Wiki: repeatWhen()</a>
5638+ * @see <a href="http://msdn.microsoft.com/en-us/library/hh229428.aspx">MSDN: Observable.Repeat</a>
5639+ */
5640+ public final Observable <T > repeatWhen (Func1 <? super Observable <? extends Notification <?>>, ? extends Observable <? extends Notification <?>>> notificationHandler , Scheduler scheduler ) {
5641+ return OnSubscribeRedo .repeat (this , notificationHandler , scheduler );
5642+ }
5643+
5644+ /**
5645+ * Returns an Observable that emits the same values as the source observable with the exception of an {@code onCompleted}.
5646+ * An onCompleted will emit a {@link Notification} to the observable provided as an argument to the notificationHandler
5647+ * func. If the observable returned {@code onCompletes} or {@code onErrors} then repeat will call {@code onCompleted}
5648+ * or {@code onError} on the child subscription. Otherwise, this observable will resubscribe to the source observable.
5649+ * <p>
5650+ * <img width="640" height="430" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/repeatWhen.f.png" alt="">
5651+ *
5652+ * @param notificationHandler
5653+ * recieves an Observable of notifications with which a user can complete or error, aborting the repeat.
5654+ * @return the source Observable modified with repeat logic
5655+ * @see <a href="https://github.com/Netflix/RxJava/wiki/Creating-Observables#wiki-repeat">RxJava Wiki: repeatWhen()</a>
5656+ * @see <a href="http://msdn.microsoft.com/en-us/library/hh229428.aspx">MSDN: Observable.Repeat</a>
5657+ */
5658+ public final Observable <T > repeatWhen (Func1 <? super Observable <? extends Notification <?>>, ? extends Observable <? extends Notification <?>>> notificationHandler ) {
5659+ return OnSubscribeRedo .repeat (this , notificationHandler );
56255660 }
56265661
56275662 /**
@@ -6177,7 +6212,7 @@ public final ConnectableObservable<T> replay(Scheduler scheduler) {
61776212 * @see <a href="http://msdn.microsoft.com/en-us/library/system.reactive.linq.observable.retry.aspx">MSDN: Observable.Retry</a>
61786213 */
61796214 public final Observable <T > retry () {
6180- return nest (). lift ( new OperatorRetry <T >() );
6215+ return OnSubscribeRedo . <T >retry ( this );
61816216 }
61826217
61836218 /**
@@ -6201,8 +6236,8 @@ public final Observable<T> retry() {
62016236 * @see <a href="https://github.com/Netflix/RxJava/wiki/Error-Handling-Operators#retry">RxJava wiki: retry</a>
62026237 * @see <a href="http://msdn.microsoft.com/en-us/library/system.reactive.linq.observable.retry.aspx">MSDN: Observable.Retry</a>
62036238 */
6204- public final Observable <T > retry (int retryCount ) {
6205- return nest (). lift ( new OperatorRetry <T >( retryCount ) );
6239+ public final Observable <T > retry (final long count ) {
6240+ return OnSubscribeRedo . <T >retry ( this , count );
62066241 }
62076242
62086243 /**
@@ -6218,13 +6253,77 @@ public final Observable<T> retry(int retryCount) {
62186253 * and retry count
62196254 * @return the source Observable modified with retry logic
62206255 * @see #retry()
6221- * @see <a href="https://github.com/Netflix/RxJava/wiki/Error-Handling-Operators#retry">RxJava wiki: retry</a>
6222- * @see <a href="http://msdn.microsoft.com/en-us/library/system.reactive.linq.observable.retry.aspx">MSDN: Observable.Retry</a>
6256+ * @see <a href="https://github.com/Netflix/RxJava/wiki/Error-Handling-Operators#retry">RxJava Wiki: retry()</a>
62236257 */
62246258 public final Observable <T > retry (Func2 <Integer , Throwable , Boolean > predicate ) {
62256259 return nest ().lift (new OperatorRetryWithPredicate <T >(predicate ));
62266260 }
6227-
6261+
6262+ /**
6263+ * Returns an Observable that emits the same values as the source observable with the exception of an {@code onError}.
6264+ * An onError will emit a {@link Notification} to the observable provided as an argument to the notificationHandler
6265+ * func. If the observable returned {@code onCompletes} or {@code onErrors} then retry will call {@code onCompleted}
6266+ * or {@code onError} on the child subscription. Otherwise, this observable will resubscribe to the source observable.
6267+ * <p>
6268+ * <img width="640" height="430" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/retryWhen.f.png" alt="">
6269+ * <p>
6270+ * {@code retryWhen} operates by default on the {@code trampoline} {@link Scheduler}.
6271+ *
6272+ * Example:
6273+ *
6274+ * This retries 3 times, each time incrementing the number of seconds it waits.
6275+ *
6276+ * <pre> {@code
6277+ * Observable.create((Subscriber<? super String> s) -> {
6278+ * System.out.println("subscribing");
6279+ * s.onError(new RuntimeException("always fails"));
6280+ * }).retryWhen(attempts -> {
6281+ * return attempts.zip(Observable.range(1, 3), (n, i) -> i).flatMap(i -> {
6282+ * System.out.println("delay retry by " + i + " second(s)");
6283+ * return Observable.timer(i, TimeUnit.SECONDS);
6284+ * });
6285+ * }).toBlocking().forEach(System.out::println);
6286+ * } </pre>
6287+ *
6288+ * Output is:
6289+ *
6290+ * <pre> {@code
6291+ * subscribing
6292+ * delay retry by 1 second(s)
6293+ * subscribing
6294+ * delay retry by 2 second(s)
6295+ * subscribing
6296+ * delay retry by 3 second(s)
6297+ * subscribing
6298+ * } </pre>
6299+ *
6300+ * @param notificationHandler
6301+ * recieves an Observable of notifications with which a user can complete or error, aborting the retry.
6302+ * @return the source Observable modified with retry logic
6303+ * @see <a href="https://github.com/Netflix/RxJava/wiki/Error-Handling-Operators#wiki-retry">RxJava Wiki: retryWhen()</a>
6304+ */
6305+ public final Observable <T > retryWhen (Func1 <? super Observable <? extends Notification <?>>, ? extends Observable <?>> notificationHandler ) {
6306+ return OnSubscribeRedo .<T > retry (this , notificationHandler );
6307+ }
6308+
6309+ /**
6310+ * Returns an Observable that emits the same values as the source observable with the exception of an {@code onError}.
6311+ * An onError will emit a {@link Notification} to the observable provided as an argument to the notificationHandler
6312+ * func. If the observable returned {@code onCompletes} or {@code onErrors} then retry will call {@code onCompleted}
6313+ * or {@code onError} on the child subscription. Otherwise, this observable will resubscribe to the source observable, on a particular Scheduler.
6314+ * <p>
6315+ * <img width="640" height="430" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/retryWhen.f.png" alt="">
6316+ * <p>
6317+ *
6318+ * @param notificationHandler
6319+ * recieves an Observable of notifications with which a user can complete or error, aborting the retry.
6320+ * @return the source Observable modified with retry logic
6321+ * @see <a href="https://github.com/Netflix/RxJava/wiki/Error-Handling-Operators#wiki-retry">RxJava Wiki: retryWhen()</a>
6322+ */
6323+ public final Observable <T > retryWhen (Func1 <? super Observable <? extends Notification <?>>, ? extends Observable <? extends Notification <?>>> notificationHandler , Scheduler scheduler ) {
6324+ return OnSubscribeRedo .<T > retry (this , notificationHandler , scheduler );
6325+ }
6326+
62286327 /**
62296328 * Returns an Observable that emits the most recently emitted item (if any) emitted by the source Observable
62306329 * within periodic time intervals.
0 commit comments