Skip to content

Commit 1e503b4

Browse files
Unit Test and Review of #1027
1 parent 3cf2758 commit 1e503b4

File tree

2 files changed

+74
-41
lines changed

2 files changed

+74
-41
lines changed

rxjava-core/src/main/java/rx/operators/OperatorRetry.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,9 +37,11 @@
3737
import rx.Observable.Operator;
3838
import rx.Scheduler.Inner;
3939
import rx.Subscriber;
40+
import rx.functions.Action0;
4041
import rx.functions.Action1;
4142
import rx.schedulers.Schedulers;
4243
import rx.subscriptions.SerialSubscription;
44+
import rx.subscriptions.Subscriptions;
4345

4446
public class OperatorRetry<T> implements Operator<T, Observable<T>> {
4547

@@ -82,6 +84,8 @@ public void call(final Inner inner) {
8284
final Action1<Inner> _self = this;
8385
attempts.incrementAndGet();
8486

87+
// new subscription each time so if it unsubscribes itself it does not prevent retries
88+
// by unsubscribing the child subscription
8589
Subscriber<T> subscriber = new Subscriber<T>() {
8690

8791
@Override

rxjava-core/src/test/java/rx/operators/OperatorRetryTest.java

Lines changed: 70 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,13 @@
1515
*/
1616
package rx.operators;
1717

18-
import static org.junit.Assert.*;
19-
import static org.mockito.Matchers.*;
20-
import static org.mockito.Mockito.*;
18+
import static org.junit.Assert.assertEquals;
19+
import static org.junit.Assert.fail;
20+
import static org.mockito.Matchers.any;
21+
import static org.mockito.Mockito.inOrder;
22+
import static org.mockito.Mockito.mock;
23+
import static org.mockito.Mockito.never;
24+
import static org.mockito.Mockito.times;
2125

2226
import java.util.concurrent.CountDownLatch;
2327
import java.util.concurrent.TimeUnit;
@@ -28,12 +32,13 @@
2832
import org.mockito.InOrder;
2933

3034
import rx.Observable;
31-
import rx.Observable.OnSubscribeFunc;
35+
import rx.Observable.OnSubscribe;
3236
import rx.Observer;
3337
import rx.Subscriber;
3438
import rx.Subscription;
3539
import rx.functions.Action0;
3640
import rx.functions.Action1;
41+
import rx.observers.TestSubscriber;
3742
import rx.subjects.PublishSubject;
3843
import rx.subscriptions.Subscriptions;
3944

@@ -114,7 +119,7 @@ public void testInfiniteRetry() {
114119
inOrder.verifyNoMoreInteractions();
115120
}
116121

117-
public static class FuncWithErrors implements Observable.OnSubscribeFunc<String> {
122+
public static class FuncWithErrors implements Observable.OnSubscribe<String> {
118123

119124
private final int numFailures;
120125
private final AtomicInteger count = new AtomicInteger(0);
@@ -124,15 +129,14 @@ public static class FuncWithErrors implements Observable.OnSubscribeFunc<String>
124129
}
125130

126131
@Override
127-
public Subscription onSubscribe(Observer<? super String> o) {
132+
public void call(Subscriber<? super String> o) {
128133
o.onNext("beginningEveryTime");
129134
if (count.incrementAndGet() <= numFailures) {
130135
o.onError(new RuntimeException("forced failure: " + count.get()));
131136
} else {
132137
o.onNext("onSuccessOnly");
133138
o.onCompleted();
134139
}
135-
return Subscriptions.empty();
136140
}
137141
}
138142

@@ -153,13 +157,13 @@ public void call(Integer n) {
153157
}
154158

155159
@Test
156-
public void testRetryAllowsSubscriptionAfterAllSubscriptionsUnsubsribed() throws InterruptedException {
160+
public void testRetryAllowsSubscriptionAfterAllSubscriptionsUnsubscribed() throws InterruptedException {
157161
final AtomicInteger subsCount = new AtomicInteger(0);
158-
OnSubscribeFunc<String> onSubscribe = new OnSubscribeFunc<String>() {
162+
OnSubscribe<String> onSubscribe = new OnSubscribe<String>() {
159163
@Override
160-
public Subscription onSubscribe(Observer<? super String> observer) {
164+
public void call(Subscriber<? super String> s) {
161165
subsCount.incrementAndGet();
162-
return new Subscription() {
166+
s.add(new Subscription() {
163167
boolean unsubscribed = false;
164168

165169
@Override
@@ -172,7 +176,7 @@ public void unsubscribe() {
172176
public boolean isUnsubscribed() {
173177
return unsubscribed;
174178
}
175-
};
179+
});
176180
}
177181
};
178182
Observable<String> stream = Observable.create(onSubscribe);
@@ -185,40 +189,65 @@ public boolean isUnsubscribed() {
185189
assertEquals(1, subsCount.get());
186190
}
187191

192+
@Test
193+
public void testSourceObservableCallsUnsubscribe() throws InterruptedException {
194+
final AtomicInteger subsCount = new AtomicInteger(0);
195+
196+
final TestSubscriber<String> ts = new TestSubscriber<String>();
197+
198+
OnSubscribe<String> onSubscribe = new OnSubscribe<String>() {
199+
@Override
200+
public void call(Subscriber<? super String> s) {
201+
// if isUnsubscribed is true that means we have a bug such as https://github.com/Netflix/RxJava/issues/1024
202+
if (!s.isUnsubscribed()) {
203+
subsCount.incrementAndGet();
204+
s.onError(new RuntimeException("failed"));
205+
// it unsubscribes the child directly
206+
// this simulates various error/completion scenarios that could occur
207+
// or just a source that proactively triggers cleanup
208+
s.unsubscribe();
209+
}
210+
}
211+
};
212+
213+
Observable.create(onSubscribe).retry(3).subscribe(ts);
214+
assertEquals(4, subsCount.get()); // 1 + 3 retries
215+
}
216+
188217
class SlowObservable implements Observable.OnSubscribe<Long> {
189218

190-
private AtomicInteger efforts=new AtomicInteger(0);
191-
private AtomicInteger active=new AtomicInteger(0),maxActive=new AtomicInteger(0);
219+
private AtomicInteger efforts = new AtomicInteger(0);
220+
private AtomicInteger active = new AtomicInteger(0), maxActive = new AtomicInteger(0);
192221
private AtomicInteger nextBeforeFailure;
193222

194223
private final int emitDelay;
195224

196-
public SlowObservable(int emitDelay,int countNext) {
197-
this.emitDelay=emitDelay;
198-
this.nextBeforeFailure=new AtomicInteger(countNext);
225+
public SlowObservable(int emitDelay, int countNext) {
226+
this.emitDelay = emitDelay;
227+
this.nextBeforeFailure = new AtomicInteger(countNext);
199228
}
200229

201230
public void call(final Subscriber<? super Long> subscriber) {
202-
final AtomicBoolean terminate=new AtomicBoolean(false);
231+
final AtomicBoolean terminate = new AtomicBoolean(false);
203232
efforts.getAndIncrement();
204233
active.getAndIncrement();
205-
maxActive.set(Math.max(active.get(),maxActive.get()));
234+
maxActive.set(Math.max(active.get(), maxActive.get()));
206235
final Thread thread = new Thread() {
207236
@Override
208237
public void run() {
209238
long nr = 0;
210239
try {
211240
while (!terminate.get()) {
212241
Thread.sleep(emitDelay);
213-
if (nextBeforeFailure.getAndDecrement()>0) {
242+
if (nextBeforeFailure.getAndDecrement() > 0) {
214243
subscriber.onNext(nr++);
215244
}
216245
else {
217246
subscriber.onError(new RuntimeException("expected-failed"));
218247
}
219248
}
220249
}
221-
catch(InterruptedException t) {
250+
catch (InterruptedException t) {
222251
}
223252
}
224253
};
@@ -236,13 +265,13 @@ public void call() {
236265
/** Observer for listener on seperate thread */
237266
class AsyncObserver<T> implements Observer<T> {
238267

239-
protected CountDownLatch latch=new CountDownLatch(1);
268+
protected CountDownLatch latch = new CountDownLatch(1);
240269

241270
protected Observer<T> target;
242271

243272
/** Wrap existing Observer */
244273
public AsyncObserver(Observer<T> target) {
245-
this.target=target;
274+
this.target = target;
246275
}
247276

248277
/** Wait */
@@ -274,19 +303,19 @@ public void onNext(T v) {
274303
}
275304
}
276305

277-
@Test
306+
@Test(timeout = 1000)
278307
public void testUnsubscribeAfterError() {
279308

280309
@SuppressWarnings("unchecked")
281-
Observer<Long> observer=mock(Observer.class);
310+
Observer<Long> observer = mock(Observer.class);
282311

283312
// Observable that always fails after 100ms
284-
SlowObservable so=new SlowObservable(100,0);
285-
Observable<Long> o=Observable
286-
.create(so)
287-
.retry(5);
313+
SlowObservable so = new SlowObservable(100, 0);
314+
Observable<Long> o = Observable
315+
.create(so)
316+
.retry(5);
288317

289-
AsyncObserver<Long> async=new AsyncObserver<Long>(observer);
318+
AsyncObserver<Long> async = new AsyncObserver<Long>(observer);
290319

291320
o.subscribe(async);
292321

@@ -297,24 +326,24 @@ public void testUnsubscribeAfterError() {
297326
inOrder.verify(observer, times(1)).onError(any(Throwable.class));
298327
inOrder.verify(observer, never()).onCompleted();
299328

300-
assertEquals("Start 6 threads, retry 5 then fail on 6",6,so.efforts.get());
301-
assertEquals("Only 1 active subscription",1,so.maxActive.get());
329+
assertEquals("Start 6 threads, retry 5 then fail on 6", 6, so.efforts.get());
330+
assertEquals("Only 1 active subscription", 1, so.maxActive.get());
302331
}
303332

304-
@Test
333+
@Test(timeout = 1000)
305334
public void testTimeoutWithRetry() {
306335

307336
@SuppressWarnings("unchecked")
308-
Observer<Long> observer=mock(Observer.class);
337+
Observer<Long> observer = mock(Observer.class);
309338

310339
// Observable that sends every 100ms (timeout fails instead)
311-
SlowObservable so=new SlowObservable(100,10);
312-
Observable<Long> o=Observable
313-
.create(so)
314-
.timeout(80, TimeUnit.MILLISECONDS)
315-
.retry(5);
340+
SlowObservable so = new SlowObservable(100, 10);
341+
Observable<Long> o = Observable
342+
.create(so)
343+
.timeout(80, TimeUnit.MILLISECONDS)
344+
.retry(5);
316345

317-
AsyncObserver<Long> async=new AsyncObserver<Long>(observer);
346+
AsyncObserver<Long> async = new AsyncObserver<Long>(observer);
318347

319348
o.subscribe(async);
320349

@@ -325,6 +354,6 @@ public void testTimeoutWithRetry() {
325354
inOrder.verify(observer, times(1)).onError(any(Throwable.class));
326355
inOrder.verify(observer, never()).onCompleted();
327356

328-
assertEquals("Start 6 threads, retry 5 then fail on 6",6,so.efforts.get());
357+
assertEquals("Start 6 threads, retry 5 then fail on 6", 6, so.efforts.get());
329358
}
330359
}

0 commit comments

Comments
 (0)