Skip to content

Commit 20a72ca

Browse files
authored
2.x: Improve JavaDoc of retryWhen() operators (#5773)
* 2.x: Improve JavaDoc of retryWhen() operators * Fix example to use emitter. * Fix factory class name * Fix example termination logic, proper blockingX()
1 parent 3096024 commit 20a72ca

File tree

5 files changed

+137
-11
lines changed

5 files changed

+137
-11
lines changed

src/main/java/io/reactivex/Completable.java

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1597,6 +1597,31 @@ public final Completable retry(Predicate<? super Throwable> predicate) {
15971597
* Returns a Completable which given a Publisher and when this Completable emits an error, delivers
15981598
* that error through a Flowable and the Publisher should signal a value indicating a retry in response
15991599
* or a terminal event indicating a termination.
1600+
* <p>
1601+
* Note that the inner {@code Publisher} returned by the handler function should signal
1602+
* either {@code onNext}, {@code onError} or {@code onComplete} in response to the received
1603+
* {@code Throwable} to indicate the operator should retry or terminate. If the upstream to
1604+
* the operator is asynchronous, signalling onNext followed by onComplete immediately may
1605+
* result in the sequence to be completed immediately. Similarly, if this inner
1606+
* {@code Publisher} signals {@code onError} or {@code onComplete} while the upstream is
1607+
* active, the sequence is terminated with the same signal immediately.
1608+
* <p>
1609+
* The following example demonstrates how to retry an asynchronous source with a delay:
1610+
* <pre><code>
1611+
* Completable.timer(1, TimeUnit.SECONDS)
1612+
* .doOnSubscribe(s -&gt; System.out.println("subscribing"))
1613+
* .doOnComplete(() -&gt; { throw new RuntimeException(); })
1614+
* .retryWhen(errors -&gt; {
1615+
* AtomicInteger counter = new AtomicInteger();
1616+
* return errors
1617+
* .takeWhile(e -&gt; counter.getAndIncrement() != 3)
1618+
* .flatMap(e -&gt; {
1619+
* System.out.println("delay retry by " + counter.get() + " second(s)");
1620+
* return Flowable.timer(counter.get(), TimeUnit.SECONDS);
1621+
* });
1622+
* })
1623+
* .blockingAwait();
1624+
* </code></pre>
16001625
* <dl>
16011626
* <dt><b>Scheduler:</b></dt>
16021627
* <dd>{@code retryWhen} does not operate by default on a particular {@link Scheduler}.</dd>

src/main/java/io/reactivex/Flowable.java

Lines changed: 31 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -11770,19 +11770,19 @@ public final Flowable<T> retryUntil(final BooleanSupplier stop) {
1177011770
* resubscribe to the source Publisher.
1177111771
* <p>
1177211772
* <img width="640" height="430" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/retryWhen.f.png" alt="">
11773-
*
11773+
* <p>
1177411774
* Example:
1177511775
*
1177611776
* This retries 3 times, each time incrementing the number of seconds it waits.
1177711777
*
1177811778
* <pre><code>
11779-
* Publisher.create((Subscriber&lt;? super String&gt; s) -&gt; {
11779+
* Flowable.create((FlowableEmitter&lt;? super String&gt; s) -&gt; {
1178011780
* System.out.println("subscribing");
1178111781
* s.onError(new RuntimeException("always fails"));
11782-
* }).retryWhen(attempts -&gt; {
11782+
* }, BackpressureStrategy.BUFFER).retryWhen(attempts -&gt; {
1178311783
* return attempts.zipWith(Flowable.range(1, 3), (n, i) -&gt; i).flatMap(i -&gt; {
1178411784
* System.out.println("delay retry by " + i + " second(s)");
11785-
* return Publisher.timer(i, TimeUnit.SECONDS);
11785+
* return Flowable.timer(i, TimeUnit.SECONDS);
1178611786
* });
1178711787
* }).blockingForEach(System.out::println);
1178811788
* </code></pre>
@@ -11798,9 +11798,35 @@ public final Flowable<T> retryUntil(final BooleanSupplier stop) {
1179811798
* delay retry by 3 second(s)
1179911799
* subscribing
1180011800
* } </pre>
11801+
* <p>
11802+
* Note that the inner {@code Publisher} returned by the handler function should signal
11803+
* either {@code onNext}, {@code onError} or {@code onComplete} in response to the received
11804+
* {@code Throwable} to indicate the operator should retry or terminate. If the upstream to
11805+
* the operator is asynchronous, signalling onNext followed by onComplete immediately may
11806+
* result in the sequence to be completed immediately. Similarly, if this inner
11807+
* {@code Publisher} signals {@code onError} or {@code onComplete} while the upstream is
11808+
* active, the sequence is terminated with the same signal immediately.
11809+
* <p>
11810+
* The following example demonstrates how to retry an asynchronous source with a delay:
11811+
* <pre><code>
11812+
* Flowable.timer(1, TimeUnit.SECONDS)
11813+
* .doOnSubscribe(s -&gt; System.out.println("subscribing"))
11814+
* .map(v -&gt; { throw new RuntimeException(); })
11815+
* .retryWhen(errors -&gt; {
11816+
* AtomicInteger counter = new AtomicInteger();
11817+
* return errors
11818+
* .takeWhile(e -&gt; counter.getAndIncrement() != 3)
11819+
* .flatMap(e -&gt; {
11820+
* System.out.println("delay retry by " + counter.get() + " second(s)");
11821+
* return Flowable.timer(counter.get(), TimeUnit.SECONDS);
11822+
* });
11823+
* })
11824+
* .blockingSubscribe(System.out::println, System.out::println);
11825+
* </code></pre>
1180111826
* <dl>
1180211827
* <dt><b>Backpressure:</b></dt>
11803-
* <dd>The operator honors downstream backpressure and expects the source {@code Publisher} to honor backpressure as well.
11828+
* <dd>The operator honors downstream backpressure and expects both the source
11829+
* and inner {@code Publisher}s to honor backpressure as well.
1180411830
* If this expectation is violated, the operator <em>may</em> throw an {@code IllegalStateException}.</dd>
1180511831
* <dt><b>Scheduler:</b></dt>
1180611832
* <dd>{@code retryWhen} does not operate by default on a particular {@link Scheduler}.</dd>

src/main/java/io/reactivex/Maybe.java

Lines changed: 28 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3585,19 +3585,19 @@ public final Maybe<T> retryUntil(final BooleanSupplier stop) {
35853585
* resubscribe to the source Publisher.
35863586
* <p>
35873587
* <img width="640" height="430" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/retryWhen.f.png" alt="">
3588-
*
3588+
* <p>
35893589
* Example:
35903590
*
35913591
* This retries 3 times, each time incrementing the number of seconds it waits.
35923592
*
35933593
* <pre><code>
3594-
* Flowable.create((FlowableEmitter&lt;? super String&gt; s) -&gt; {
3594+
* Maybe.create((MaybeEmitter&lt;? super String&gt; s) -&gt; {
35953595
* System.out.println("subscribing");
35963596
* s.onError(new RuntimeException("always fails"));
35973597
* }, BackpressureStrategy.BUFFER).retryWhen(attempts -&gt; {
35983598
* return attempts.zipWith(Publisher.range(1, 3), (n, i) -&gt; i).flatMap(i -&gt; {
35993599
* System.out.println("delay retry by " + i + " second(s)");
3600-
* return Publisher.timer(i, TimeUnit.SECONDS);
3600+
* return Flowable.timer(i, TimeUnit.SECONDS);
36013601
* });
36023602
* }).blockingForEach(System.out::println);
36033603
* </code></pre>
@@ -3613,6 +3613,31 @@ public final Maybe<T> retryUntil(final BooleanSupplier stop) {
36133613
* delay retry by 3 second(s)
36143614
* subscribing
36153615
* } </pre>
3616+
* <p>
3617+
* Note that the inner {@code Publisher} returned by the handler function should signal
3618+
* either {@code onNext}, {@code onError} or {@code onComplete} in response to the received
3619+
* {@code Throwable} to indicate the operator should retry or terminate. If the upstream to
3620+
* the operator is asynchronous, signalling onNext followed by onComplete immediately may
3621+
* result in the sequence to be completed immediately. Similarly, if this inner
3622+
* {@code Publisher} signals {@code onError} or {@code onComplete} while the upstream is
3623+
* active, the sequence is terminated with the same signal immediately.
3624+
* <p>
3625+
* The following example demonstrates how to retry an asynchronous source with a delay:
3626+
* <pre><code>
3627+
* Maybe.timer(1, TimeUnit.SECONDS)
3628+
* .doOnSubscribe(s -&gt; System.out.println("subscribing"))
3629+
* .map(v -&gt; { throw new RuntimeException(); })
3630+
* .retryWhen(errors -&gt; {
3631+
* AtomicInteger counter = new AtomicInteger();
3632+
* return errors
3633+
* .takeWhile(e -&gt; counter.getAndIncrement() != 3)
3634+
* .flatMap(e -&gt; {
3635+
* System.out.println("delay retry by " + counter.get() + " second(s)");
3636+
* return Flowable.timer(counter.get(), TimeUnit.SECONDS);
3637+
* });
3638+
* })
3639+
* .blockingGet();
3640+
* </code></pre>
36163641
* <dl>
36173642
* <dt><b>Scheduler:</b></dt>
36183643
* <dd>{@code retryWhen} does not operate by default on a particular {@link Scheduler}.</dd>

src/main/java/io/reactivex/Observable.java

Lines changed: 28 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9927,19 +9927,19 @@ public final Observable<T> retryUntil(final BooleanSupplier stop) {
99279927
* resubscribe to the source ObservableSource.
99289928
* <p>
99299929
* <img width="640" height="430" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/retryWhen.f.png" alt="">
9930-
*
9930+
* <p>
99319931
* Example:
99329932
*
99339933
* This retries 3 times, each time incrementing the number of seconds it waits.
99349934
*
99359935
* <pre><code>
9936-
* ObservableSource.create((Observer&lt;? super String&gt; s) -&gt; {
9936+
* Observable.create((ObservableEmitter&lt;? super String&gt; s) -&gt; {
99379937
* System.out.println("subscribing");
99389938
* s.onError(new RuntimeException("always fails"));
99399939
* }).retryWhen(attempts -&gt; {
99409940
* return attempts.zipWith(Observable.range(1, 3), (n, i) -&gt; i).flatMap(i -&gt; {
99419941
* System.out.println("delay retry by " + i + " second(s)");
9942-
* return ObservableSource.timer(i, TimeUnit.SECONDS);
9942+
* return Observable.timer(i, TimeUnit.SECONDS);
99439943
* });
99449944
* }).blockingForEach(System.out::println);
99459945
* </code></pre>
@@ -9955,6 +9955,31 @@ public final Observable<T> retryUntil(final BooleanSupplier stop) {
99559955
* delay retry by 3 second(s)
99569956
* subscribing
99579957
* } </pre>
9958+
* <p>
9959+
* Note that the inner {@code ObservableSource} returned by the handler function should signal
9960+
* either {@code onNext}, {@code onError} or {@code onComplete} in response to the received
9961+
* {@code Throwable} to indicate the operator should retry or terminate. If the upstream to
9962+
* the operator is asynchronous, signalling onNext followed by onComplete immediately may
9963+
* result in the sequence to be completed immediately. Similarly, if this inner
9964+
* {@code ObservableSource} signals {@code onError} or {@code onComplete} while the upstream is
9965+
* active, the sequence is terminated with the same signal immediately.
9966+
* <p>
9967+
* The following example demonstrates how to retry an asynchronous source with a delay:
9968+
* <pre><code>
9969+
* Observable.timer(1, TimeUnit.SECONDS)
9970+
* .doOnSubscribe(s -&gt; System.out.println("subscribing"))
9971+
* .map(v -&gt; { throw new RuntimeException(); })
9972+
* .retryWhen(errors -&gt; {
9973+
* AtomicInteger counter = new AtomicInteger();
9974+
* return errors
9975+
* .takeWhile(e -&gt; counter.getAndIncrement() != 3)
9976+
* .flatMap(e -&gt; {
9977+
* System.out.println("delay retry by " + counter.get() + " second(s)");
9978+
* return Observable.timer(counter.get(), TimeUnit.SECONDS);
9979+
* });
9980+
* })
9981+
* .blockingSubscribe(System.out::println, System.out::println);
9982+
* </code></pre>
99589983
* <dl>
99599984
* <dt><b>Scheduler:</b></dt>
99609985
* <dd>{@code retryWhen} does not operate by default on a particular {@link Scheduler}.</dd>

src/main/java/io/reactivex/Single.java

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2692,6 +2692,31 @@ public final Single<T> retry(Predicate<? super Throwable> predicate) {
26922692
* function signals a value.
26932693
* <p>
26942694
* If the Publisher signals an onComplete, the resulting Single will signal a NoSuchElementException.
2695+
* <p>
2696+
* Note that the inner {@code Publisher} returned by the handler function should signal
2697+
* either {@code onNext}, {@code onError} or {@code onComplete} in response to the received
2698+
* {@code Throwable} to indicate the operator should retry or terminate. If the upstream to
2699+
* the operator is asynchronous, signalling onNext followed by onComplete immediately may
2700+
* result in the sequence to be completed immediately. Similarly, if this inner
2701+
* {@code Publisher} signals {@code onError} or {@code onComplete} while the upstream is
2702+
* active, the sequence is terminated with the same signal immediately.
2703+
* <p>
2704+
* The following example demonstrates how to retry an asynchronous source with a delay:
2705+
* <pre><code>
2706+
* Single.timer(1, TimeUnit.SECONDS)
2707+
* .doOnSubscribe(s -&gt; System.out.println("subscribing"))
2708+
* .map(v -&gt; { throw new RuntimeException(); })
2709+
* .retryWhen(errors -&gt; {
2710+
* AtomicInteger counter = new AtomicInteger();
2711+
* return errors
2712+
* .takeWhile(e -&gt; counter.getAndIncrement() != 3)
2713+
* .flatMap(e -&gt; {
2714+
* System.out.println("delay retry by " + counter.get() + " second(s)");
2715+
* return Flowable.timer(counter.get(), TimeUnit.SECONDS);
2716+
* });
2717+
* })
2718+
* .blockingGet();
2719+
* </code></pre>
26952720
* <dl>
26962721
* <dt><b>Scheduler:</b></dt>
26972722
* <dd>{@code retryWhen} does not operate by default on a particular {@link Scheduler}.</dd>

0 commit comments

Comments
 (0)