Skip to content

Commit 34242e1

Browse files
vanniktechakarnokd
authored andcommitted
2.x: Add retry(times, predicate) to Single & Completable and verify behavior across them and Maybe. (#5753)
* 2.x: Add retry(times, predicate) to Single & Completable and verify behavior across them and Maybe. * Adjust ParamValidationCheckerTest for the 2 newly added methods. * Add AtomicInteger for checking the number of subscribe calls. * Fix copy pasta mistake.
1 parent ec40a5e commit 34242e1

File tree

6 files changed

+401
-0
lines changed

6 files changed

+401
-0
lines changed

src/main/java/io/reactivex/Completable.java

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1545,6 +1545,28 @@ public final Completable retry(long times) {
15451545
return fromPublisher(toFlowable().retry(times));
15461546
}
15471547

1548+
/**
1549+
* Returns a Completable that when this Completable emits an error, retries at most times
1550+
* or until the predicate returns false, whichever happens first and emitting the last error.
1551+
* <dl>
1552+
* <dt><b>Scheduler:</b></dt>
1553+
* <dd>{@code retry} does not operate by default on a particular {@link Scheduler}.</dd>
1554+
* </dl>
1555+
* @param times the number of times the returned Completable should retry this Completable
1556+
* @param predicate the predicate that is called with the latest throwable and should return
1557+
* true to indicate the returned Completable should resubscribe to this Completable.
1558+
* @return the new Completable instance
1559+
* @throws NullPointerException if predicate is null
1560+
* @throws IllegalArgumentException if times is negative
1561+
* @since 2.1.8 - experimental
1562+
*/
1563+
@Experimental
1564+
@CheckReturnValue
1565+
@SchedulerSupport(SchedulerSupport.NONE)
1566+
public final Completable retry(long times, Predicate<? super Throwable> predicate) {
1567+
return fromPublisher(toFlowable().retry(times, predicate));
1568+
}
1569+
15481570
/**
15491571
* Returns a Completable that when this Completable emits an error, calls the given predicate with
15501572
* the latest exception to decide whether to resubscribe to this or not.

src/main/java/io/reactivex/Single.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2649,6 +2649,26 @@ public final Single<T> retry(BiPredicate<? super Integer, ? super Throwable> pre
26492649
return toSingle(toFlowable().retry(predicate));
26502650
}
26512651

2652+
/**
2653+
* Repeatedly re-subscribe at most times or until the predicate returns false, whichever happens first
2654+
* if it fails with an onError.
2655+
* <dl>
2656+
* <dt><b>Scheduler:</b></dt>
2657+
* <dd>{@code retry} does not operate by default on a particular {@link Scheduler}.</dd>
2658+
* </dl>
2659+
* @param times the number of times to resubscribe if the current Single fails
2660+
* @param predicate the predicate called with the failure Throwable
2661+
* and should return true if a resubscription should happen
2662+
* @return the new Single instance
2663+
* @since 2.1.8 - experimental
2664+
*/
2665+
@Experimental
2666+
@CheckReturnValue
2667+
@SchedulerSupport(SchedulerSupport.NONE)
2668+
public final Single<T> retry(long times, Predicate<? super Throwable> predicate) {
2669+
return toSingle(toFlowable().retry(times, predicate));
2670+
}
2671+
26522672
/**
26532673
* Re-subscribe to the current Single if the given predicate returns true when the Single fails
26542674
* with an onError.

src/test/java/io/reactivex/ParamValidationCheckerTest.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -258,6 +258,7 @@ public void checkParallelFlowable() {
258258

259259
// zero retry is allowed
260260
addOverride(new ParamOverride(Completable.class, 0, ParamMode.NON_NEGATIVE, "retry", Long.TYPE));
261+
addOverride(new ParamOverride(Completable.class, 0, ParamMode.NON_NEGATIVE, "retry", Long.TYPE, Predicate.class));
261262

262263
// negative time is considered as zero time
263264
addOverride(new ParamOverride(Completable.class, 0, ParamMode.ANY, "blockingGet", Long.TYPE, TimeUnit.class));
@@ -323,6 +324,7 @@ public void checkParallelFlowable() {
323324

324325
// zero retry is allowed
325326
addOverride(new ParamOverride(Single.class, 0, ParamMode.NON_NEGATIVE, "retry", Long.TYPE));
327+
addOverride(new ParamOverride(Single.class, 0, ParamMode.NON_NEGATIVE, "retry", Long.TYPE, Predicate.class));
326328

327329
// negative time is considered as zero time
328330
addOverride(new ParamOverride(Single.class, 0, ParamMode.ANY, "delaySubscription", Long.TYPE, TimeUnit.class));
Lines changed: 115 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,115 @@
1+
/**
2+
* Copyright (c) 2017-present, RxJava Contributors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
5+
* compliance with the License. You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is
10+
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
11+
* the License for the specific language governing permissions and limitations under the License.
12+
*/
13+
14+
package io.reactivex.completable;
15+
16+
import io.reactivex.Completable;
17+
import io.reactivex.functions.Action;
18+
import io.reactivex.functions.Predicate;
19+
import io.reactivex.internal.functions.Functions;
20+
import java.util.concurrent.atomic.AtomicInteger;
21+
import org.junit.Test;
22+
23+
import static org.junit.Assert.assertEquals;
24+
25+
public class CompletableRetryTest {
26+
@Test
27+
public void retryTimesPredicateWithMatchingPredicate() {
28+
final AtomicInteger atomicInteger = new AtomicInteger(3);
29+
final AtomicInteger numberOfSubscribeCalls = new AtomicInteger(0);
30+
31+
Completable.fromAction(new Action() {
32+
@Override public void run() throws Exception {
33+
numberOfSubscribeCalls.incrementAndGet();
34+
35+
if (atomicInteger.decrementAndGet() != 0) {
36+
throw new RuntimeException();
37+
}
38+
39+
throw new IllegalArgumentException();
40+
}
41+
})
42+
.retry(Integer.MAX_VALUE, new Predicate<Throwable>() {
43+
@Override public boolean test(final Throwable throwable) throws Exception {
44+
return !(throwable instanceof IllegalArgumentException);
45+
}
46+
})
47+
.test()
48+
.assertFailure(IllegalArgumentException.class);
49+
50+
assertEquals(3, numberOfSubscribeCalls.get());
51+
}
52+
53+
@Test
54+
public void retryTimesPredicateWithMatchingRetryAmount() {
55+
final AtomicInteger atomicInteger = new AtomicInteger(3);
56+
final AtomicInteger numberOfSubscribeCalls = new AtomicInteger(0);
57+
58+
Completable.fromAction(new Action() {
59+
@Override public void run() throws Exception {
60+
numberOfSubscribeCalls.incrementAndGet();
61+
62+
if (atomicInteger.decrementAndGet() != 0) {
63+
throw new RuntimeException();
64+
}
65+
}
66+
})
67+
.retry(2, Functions.alwaysTrue())
68+
.test()
69+
.assertResult();
70+
71+
assertEquals(3, numberOfSubscribeCalls.get());
72+
}
73+
74+
@Test
75+
public void retryTimesPredicateWithNotMatchingRetryAmount() {
76+
final AtomicInteger atomicInteger = new AtomicInteger(3);
77+
final AtomicInteger numberOfSubscribeCalls = new AtomicInteger(0);
78+
79+
Completable.fromAction(new Action() {
80+
@Override public void run() throws Exception {
81+
numberOfSubscribeCalls.incrementAndGet();
82+
83+
if (atomicInteger.decrementAndGet() != 0) {
84+
throw new RuntimeException();
85+
}
86+
}
87+
})
88+
.retry(1, Functions.alwaysTrue())
89+
.test()
90+
.assertFailure(RuntimeException.class);
91+
92+
assertEquals(2, numberOfSubscribeCalls.get());
93+
}
94+
95+
@Test
96+
public void retryTimesPredicateWithZeroRetries() {
97+
final AtomicInteger atomicInteger = new AtomicInteger(2);
98+
final AtomicInteger numberOfSubscribeCalls = new AtomicInteger(0);
99+
100+
Completable.fromAction(new Action() {
101+
@Override public void run() throws Exception {
102+
numberOfSubscribeCalls.incrementAndGet();
103+
104+
if (atomicInteger.decrementAndGet() != 0) {
105+
throw new RuntimeException();
106+
}
107+
}
108+
})
109+
.retry(0, Functions.alwaysTrue())
110+
.test()
111+
.assertFailure(RuntimeException.class);
112+
113+
assertEquals(1, numberOfSubscribeCalls.get());
114+
}
115+
}
Lines changed: 121 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,121 @@
1+
/**
2+
* Copyright (c) 2017-present, RxJava Contributors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
5+
* compliance with the License. You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is
10+
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
11+
* the License for the specific language governing permissions and limitations under the License.
12+
*/
13+
14+
package io.reactivex.maybe;
15+
16+
import io.reactivex.Maybe;
17+
import io.reactivex.functions.Predicate;
18+
import io.reactivex.internal.functions.Functions;
19+
import java.util.concurrent.Callable;
20+
import java.util.concurrent.atomic.AtomicInteger;
21+
import org.junit.Test;
22+
23+
import static org.junit.Assert.assertEquals;
24+
25+
public class MaybeRetryTest {
26+
@Test
27+
public void retryTimesPredicateWithMatchingPredicate() {
28+
final AtomicInteger atomicInteger = new AtomicInteger(3);
29+
final AtomicInteger numberOfSubscribeCalls = new AtomicInteger(0);
30+
31+
Maybe.fromCallable(new Callable<Boolean>() {
32+
@Override public Boolean call() throws Exception {
33+
numberOfSubscribeCalls.incrementAndGet();
34+
35+
if (atomicInteger.decrementAndGet() != 0) {
36+
throw new RuntimeException();
37+
}
38+
39+
throw new IllegalArgumentException();
40+
}
41+
})
42+
.retry(Integer.MAX_VALUE, new Predicate<Throwable>() {
43+
@Override public boolean test(final Throwable throwable) throws Exception {
44+
return !(throwable instanceof IllegalArgumentException);
45+
}
46+
})
47+
.test()
48+
.assertFailure(IllegalArgumentException.class);
49+
50+
assertEquals(3, numberOfSubscribeCalls.get());
51+
}
52+
53+
@Test
54+
public void retryTimesPredicateWithMatchingRetryAmount() {
55+
final AtomicInteger atomicInteger = new AtomicInteger(3);
56+
final AtomicInteger numberOfSubscribeCalls = new AtomicInteger(0);
57+
58+
Maybe.fromCallable(new Callable<Boolean>() {
59+
@Override public Boolean call() throws Exception {
60+
numberOfSubscribeCalls.incrementAndGet();
61+
62+
if (atomicInteger.decrementAndGet() != 0) {
63+
throw new RuntimeException();
64+
}
65+
66+
return true;
67+
}
68+
})
69+
.retry(2, Functions.alwaysTrue())
70+
.test()
71+
.assertResult(true);
72+
73+
assertEquals(3, numberOfSubscribeCalls.get());
74+
}
75+
76+
@Test
77+
public void retryTimesPredicateWithNotMatchingRetryAmount() {
78+
final AtomicInteger atomicInteger = new AtomicInteger(3);
79+
final AtomicInteger numberOfSubscribeCalls = new AtomicInteger(0);
80+
81+
Maybe.fromCallable(new Callable<Boolean>() {
82+
@Override public Boolean call() throws Exception {
83+
numberOfSubscribeCalls.incrementAndGet();
84+
85+
if (atomicInteger.decrementAndGet() != 0) {
86+
throw new RuntimeException();
87+
}
88+
89+
return true;
90+
}
91+
})
92+
.retry(1, Functions.alwaysTrue())
93+
.test()
94+
.assertFailure(RuntimeException.class);
95+
96+
assertEquals(2, numberOfSubscribeCalls.get());
97+
}
98+
99+
@Test
100+
public void retryTimesPredicateWithZeroRetries() {
101+
final AtomicInteger atomicInteger = new AtomicInteger(2);
102+
final AtomicInteger numberOfSubscribeCalls = new AtomicInteger(0);
103+
104+
Maybe.fromCallable(new Callable<Boolean>() {
105+
@Override public Boolean call() throws Exception {
106+
numberOfSubscribeCalls.incrementAndGet();
107+
108+
if (atomicInteger.decrementAndGet() != 0) {
109+
throw new RuntimeException();
110+
}
111+
112+
return true;
113+
}
114+
})
115+
.retry(0, Functions.alwaysTrue())
116+
.test()
117+
.assertFailure(RuntimeException.class);
118+
119+
assertEquals(1, numberOfSubscribeCalls.get());
120+
}
121+
}

0 commit comments

Comments
 (0)