Skip to content

Commit d08df0b

Browse files
OperationRetry -> OperatorRetry
Updated to use "lift" and Subscriber. Fixes #943 Observable.retry() does not unsubscribe from source
1 parent a20263e commit d08df0b

File tree

4 files changed

+135
-111
lines changed

4 files changed

+135
-111
lines changed

rxjava-core/src/main/java/rx/Observable.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5985,7 +5985,7 @@ public final ConnectableObservable<T> replay(Scheduler scheduler) {
59855985
* @see <a href="https://github.com/Netflix/RxJava/wiki/Error-Handling-Operators#wiki-retry">RxJava Wiki: retry()</a>
59865986
*/
59875987
public final Observable<T> retry() {
5988-
return create(OperationRetry.retry(this));
5988+
return nest().lift(new OperatorRetry<T>());
59895989
}
59905990

59915991
/**
@@ -6009,7 +6009,7 @@ public final Observable<T> retry() {
60096009
* @see <a href="https://github.com/Netflix/RxJava/wiki/Error-Handling-Operators#wiki-retry">RxJava Wiki: retry()</a>
60106010
*/
60116011
public final Observable<T> retry(int retryCount) {
6012-
return create(OperationRetry.retry(this, retryCount));
6012+
return nest().lift(new OperatorRetry<T>(retryCount));
60136013
}
60146014

60156015
/**

rxjava-core/src/main/java/rx/operators/OperationRetry.java

Lines changed: 0 additions & 104 deletions
This file was deleted.
Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,110 @@
1+
/**
2+
* Copyright 2014 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package rx.operators;
17+
18+
/**
19+
* Copyright 2014 Netflix, Inc.
20+
*
21+
* Licensed under the Apache License, Version 2.0 (the "License");
22+
* you may not use this file except in compliance with the License.
23+
* You may obtain a copy of the License at
24+
*
25+
* http://www.apache.org/licenses/LICENSE-2.0
26+
*
27+
* Unless required by applicable law or agreed to in writing, software
28+
* distributed under the License is distributed on an "AS IS" BASIS,
29+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
30+
* See the License for the specific language governing permissions and
31+
* limitations under the License.
32+
*/
33+
34+
import java.util.concurrent.atomic.AtomicInteger;
35+
36+
import rx.Observable;
37+
import rx.Observable.Operator;
38+
import rx.Scheduler.Inner;
39+
import rx.Subscriber;
40+
import rx.functions.Action1;
41+
import rx.schedulers.Schedulers;
42+
43+
public class OperatorRetry<T> implements Operator<T, Observable<T>> {
44+
45+
private static final int INFINITE_RETRY = -1;
46+
47+
private final int retryCount;
48+
private final AtomicInteger attempts = new AtomicInteger(0);
49+
50+
public OperatorRetry(int retryCount) {
51+
this.retryCount = retryCount;
52+
}
53+
54+
public OperatorRetry() {
55+
this(INFINITE_RETRY);
56+
}
57+
58+
@Override
59+
public Subscriber<? super Observable<T>> call(final Subscriber<? super T> s) {
60+
return new Subscriber<Observable<T>>(s) {
61+
62+
@Override
63+
public void onCompleted() {
64+
// ignore as we expect a single nested Observable<T>
65+
}
66+
67+
@Override
68+
public void onError(Throwable e) {
69+
s.onError(e);
70+
}
71+
72+
@Override
73+
public void onNext(final Observable<T> o) {
74+
Schedulers.trampoline().schedule(new Action1<Inner>() {
75+
76+
@Override
77+
public void call(final Inner inner) {
78+
final Action1<Inner> _self = this;
79+
attempts.incrementAndGet();
80+
o.subscribe(new Subscriber<T>(s) {
81+
82+
@Override
83+
public void onCompleted() {
84+
s.onCompleted();
85+
}
86+
87+
@Override
88+
public void onError(Throwable e) {
89+
if ((retryCount == INFINITE_RETRY || attempts.get() <= retryCount) && !inner.isUnsubscribed()) {
90+
// retry again
91+
inner.schedule(_self);
92+
} else {
93+
// give up and pass the failure
94+
s.onError(e);
95+
}
96+
}
97+
98+
@Override
99+
public void onNext(T v) {
100+
s.onNext(v);
101+
}
102+
103+
});
104+
}
105+
});
106+
}
107+
108+
};
109+
}
110+
}

rxjava-core/src/test/java/rx/operators/OperationRetryTest.java renamed to rxjava-core/src/test/java/rx/operators/OperatorRetryTest.java

Lines changed: 23 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,10 @@
1515
*/
1616
package rx.operators;
1717

18+
import static org.junit.Assert.*;
1819
import static org.mockito.Matchers.*;
1920
import static org.mockito.Mockito.*;
20-
import static rx.operators.OperationRetry.*;
21+
import static rx.operators.OperatorRetry.*;
2122

2223
import java.util.concurrent.atomic.AtomicInteger;
2324

@@ -27,9 +28,11 @@
2728
import rx.Observable;
2829
import rx.Observer;
2930
import rx.Subscription;
31+
import rx.functions.Action1;
32+
import rx.subjects.PublishSubject;
3033
import rx.subscriptions.Subscriptions;
3134

32-
public class OperationRetryTest {
35+
public class OperatorRetryTest {
3336

3437
@Test
3538
public void testOriginFails() {
@@ -52,7 +55,7 @@ public void testRetryFail() {
5255
@SuppressWarnings("unchecked")
5356
Observer<String> observer = mock(Observer.class);
5457
Observable<String> origin = Observable.create(new FuncWithErrors(NUM_FAILURES));
55-
Observable.create(retry(origin, NUM_RETRIES)).subscribe(observer);
58+
origin.nest().lift(new OperatorRetry<String>(NUM_RETRIES)).subscribe(observer);
5659

5760
InOrder inOrder = inOrder(observer);
5861
// should show 2 attempts (first time fail, second time (1st retry) fail)
@@ -72,7 +75,7 @@ public void testRetrySuccess() {
7275
@SuppressWarnings("unchecked")
7376
Observer<String> observer = mock(Observer.class);
7477
Observable<String> origin = Observable.create(new FuncWithErrors(NUM_FAILURES));
75-
Observable.create(retry(origin, NUM_RETRIES)).subscribe(observer);
78+
origin.nest().lift(new OperatorRetry<String>(NUM_RETRIES)).subscribe(observer);
7679

7780
InOrder inOrder = inOrder(observer);
7881
// should show 3 attempts
@@ -92,7 +95,7 @@ public void testInfiniteRetry() {
9295
@SuppressWarnings("unchecked")
9396
Observer<String> observer = mock(Observer.class);
9497
Observable<String> origin = Observable.create(new FuncWithErrors(NUM_FAILURES));
95-
Observable.create(retry(origin)).subscribe(observer);
98+
origin.nest().lift(new OperatorRetry<String>()).subscribe(observer);
9699

97100
InOrder inOrder = inOrder(observer);
98101
// should show 3 attempts
@@ -127,4 +130,19 @@ public Subscription onSubscribe(Observer<? super String> o) {
127130
return Subscriptions.empty();
128131
}
129132
}
133+
134+
@Test
135+
public void testUnsubscribeFromRetry() {
136+
PublishSubject<Integer> subject = PublishSubject.create();
137+
final AtomicInteger count = new AtomicInteger(0);
138+
Subscription sub = subject.retry().subscribe(new Action1<Integer>() {
139+
@Override
140+
public void call(Integer n) {
141+
count.incrementAndGet();
142+
}});
143+
subject.onNext(1);
144+
sub.unsubscribe();
145+
subject.onNext(2);
146+
assertEquals(1,count.get());
147+
}
130148
}

0 commit comments

Comments
 (0)