Skip to content

Commit d9fef71

Browse files
committed
isolate subscriber used for retries, cleanup tests
1 parent 3c6fbe0 commit d9fef71

File tree

2 files changed

+146
-88
lines changed

2 files changed

+146
-88
lines changed

rxjava-core/src/main/java/rx/operators/OperatorRetry.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,7 @@ public void call(final Inner inner) {
8282
final Action1<Inner> _self = this;
8383
attempts.incrementAndGet();
8484

85-
Subscriber<T> subscriber = new Subscriber<T>(child) {
85+
Subscriber<T> subscriber = new Subscriber<T>() {
8686

8787
@Override
8888
public void onCompleted() {

rxjava-core/src/test/java/rx/operators/OperatorRetryTest.java

Lines changed: 145 additions & 87 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121

2222
import java.util.concurrent.CountDownLatch;
2323
import java.util.concurrent.TimeUnit;
24+
import java.util.concurrent.atomic.AtomicBoolean;
2425
import java.util.concurrent.atomic.AtomicInteger;
2526

2627
import org.junit.Test;
@@ -31,6 +32,7 @@
3132
import rx.Observer;
3233
import rx.Subscriber;
3334
import rx.Subscription;
35+
import rx.functions.Action0;
3436
import rx.functions.Action1;
3537
import rx.subjects.PublishSubject;
3638
import rx.subscriptions.Subscriptions;
@@ -150,93 +152,6 @@ public void call(Integer n) {
150152
assertEquals(1, count.get());
151153
}
152154

153-
public static class SlowFuncAlwaysFails implements Observable.OnSubscribe<String> {
154-
155-
final AtomicInteger nextSeq=new AtomicInteger();
156-
final AtomicInteger activeSubs=new AtomicInteger();
157-
final AtomicInteger concurrentSubs=new AtomicInteger();
158-
159-
public void call(final Subscriber<? super String> s)
160-
{
161-
final int seq=nextSeq.incrementAndGet();
162-
163-
int cur=activeSubs.incrementAndGet();
164-
// Track concurrent subscriptions
165-
concurrentSubs.set(Math.max(cur,concurrentSubs.get()));
166-
167-
// Use async error
168-
new Thread(new Runnable() {
169-
@Override
170-
public void run() {
171-
try {
172-
Thread.sleep(100);
173-
} catch (InterruptedException e) {
174-
// ignore
175-
}
176-
s.onError(new RuntimeException("Subscriber #"+seq+" fails"));
177-
}
178-
}).start();
179-
180-
// Track unsubscribes
181-
s.add(new Subscription()
182-
{
183-
private boolean active=true;
184-
185-
public void unsubscribe()
186-
{
187-
if (active) {
188-
activeSubs.decrementAndGet();
189-
active=false;
190-
}
191-
}
192-
193-
public boolean isUnsubscribed()
194-
{
195-
return !active;
196-
}
197-
});
198-
}
199-
}
200-
201-
@Test
202-
public void testUnsubscribeAfterError() {
203-
204-
final CountDownLatch check=new CountDownLatch(1);
205-
final SlowFuncAlwaysFails sf=new SlowFuncAlwaysFails();
206-
207-
Observable
208-
.create(sf)
209-
.retry(4)
210-
.subscribe(
211-
new Action1<String>()
212-
{
213-
@Override
214-
public void call(String v)
215-
{
216-
fail("Should never happen");
217-
}
218-
},
219-
new Action1<Throwable>()
220-
{
221-
public void call(Throwable throwable)
222-
{
223-
check.countDown();
224-
}
225-
}
226-
);
227-
228-
try
229-
{
230-
check.await(1, TimeUnit.SECONDS);
231-
} catch (InterruptedException e)
232-
{
233-
fail("interrupted");
234-
}
235-
236-
assertEquals("5 Subscribers created", 5, sf.nextSeq.get());
237-
assertEquals("1 Active Subscriber", 1, sf.concurrentSubs.get());
238-
}
239-
240155
@Test
241156
public void testRetryAllowsSubscriptionAfterAllSubscriptionsUnsubsribed() throws InterruptedException {
242157
final AtomicInteger subsCount = new AtomicInteger(0);
@@ -269,4 +184,147 @@ public boolean isUnsubscribed() {
269184
streamWithRetry.subscribe();
270185
assertEquals(1, subsCount.get());
271186
}
187+
188+
class SlowObservable implements Observable.OnSubscribe<Long> {
189+
190+
private AtomicInteger efforts=new AtomicInteger(0);
191+
private AtomicInteger active=new AtomicInteger(0),maxActive=new AtomicInteger(0);
192+
private AtomicInteger nextBeforeFailure;
193+
194+
private final int emitDelay;
195+
196+
public SlowObservable(int emitDelay,int countNext) {
197+
this.emitDelay=emitDelay;
198+
this.nextBeforeFailure=new AtomicInteger(countNext);
199+
}
200+
201+
public void call(final Subscriber<? super Long> subscriber) {
202+
final AtomicBoolean terminate=new AtomicBoolean(false);
203+
efforts.getAndIncrement();
204+
active.getAndIncrement();
205+
maxActive.set(Math.max(active.get(),maxActive.get()));
206+
final Thread thread = new Thread() {
207+
@Override
208+
public void run() {
209+
long nr = 0;
210+
try {
211+
while (!terminate.get()) {
212+
Thread.sleep(emitDelay);
213+
if (nextBeforeFailure.getAndDecrement()>0) {
214+
subscriber.onNext(nr++);
215+
}
216+
else {
217+
subscriber.onError(new RuntimeException("expected-failed"));
218+
}
219+
}
220+
}
221+
catch(InterruptedException t) {
222+
}
223+
}
224+
};
225+
thread.start();
226+
subscriber.add(Subscriptions.create(new Action0() {
227+
@Override
228+
public void call() {
229+
terminate.set(true);
230+
active.decrementAndGet();
231+
}
232+
}));
233+
}
234+
}
235+
236+
/** Observer for listener on seperate thread */
237+
class AsyncObserver<T> implements Observer<T> {
238+
239+
protected CountDownLatch latch=new CountDownLatch(1);
240+
241+
protected Observer<T> target;
242+
243+
/** Wrap existing Observer */
244+
public AsyncObserver(Observer<T> target) {
245+
this.target=target;
246+
}
247+
248+
/** Wait */
249+
public void await() {
250+
try {
251+
latch.await();
252+
} catch (InterruptedException e) {
253+
fail("Test interrupted");
254+
}
255+
}
256+
257+
// Observer implementation
258+
259+
@Override
260+
public void onCompleted() {
261+
target.onCompleted();
262+
latch.countDown();
263+
}
264+
265+
@Override
266+
public void onError(Throwable t) {
267+
target.onError(t);
268+
latch.countDown();
269+
}
270+
271+
@Override
272+
public void onNext(T v) {
273+
target.onNext(v);
274+
}
275+
}
276+
277+
@Test
278+
public void testUnsubscribeAfterError() {
279+
280+
@SuppressWarnings("unchecked")
281+
Observer<Long> observer=mock(Observer.class);
282+
283+
// Observable that always fails after 100ms
284+
SlowObservable so=new SlowObservable(100,0);
285+
Observable<Long> o=Observable
286+
.create(so)
287+
.retry(5);
288+
289+
AsyncObserver<Long> async=new AsyncObserver<Long>(observer);
290+
291+
o.subscribe(async);
292+
293+
async.await();
294+
295+
InOrder inOrder = inOrder(observer);
296+
// Should fail once
297+
inOrder.verify(observer, times(1)).onError(any(Throwable.class));
298+
inOrder.verify(observer, never()).onCompleted();
299+
300+
assertEquals("Start 6 threads, retry 5 then fail on 6",6,so.efforts.get());
301+
assertEquals("Only 1 active subscription",1,so.maxActive.get());
302+
}
303+
304+
@Test
305+
public void testTimeoutWithRetry() {
306+
307+
@SuppressWarnings("unchecked")
308+
Observer<Long> observer=mock(Observer.class);
309+
310+
// Observable that sends every 100ms (timeout fails instead)
311+
SlowObservable so=new SlowObservable(100,10);
312+
Observable<Long> o=Observable
313+
.create(so)
314+
.timeout(80, TimeUnit.MILLISECONDS)
315+
.retry(5);
316+
317+
AsyncObserver<Long> async=new AsyncObserver<Long>(observer);
318+
319+
o.subscribe(async);
320+
321+
async.await();
322+
323+
InOrder inOrder = inOrder(observer);
324+
// Should fail once
325+
inOrder.verify(observer, times(1)).onError(any(Throwable.class));
326+
inOrder.verify(observer, never()).onCompleted();
327+
328+
assertEquals("Start 6 threads, retry 5 then fail on 6",6,so.efforts.get());
329+
}
272330
}

0 commit comments

Comments
 (0)