Skip to content

Commit 0244223

Browse files
committed
RefCount: disconnect all if upstream terminates
1 parent 6a55738 commit 0244223

File tree

2 files changed

+124
-42
lines changed

2 files changed

+124
-42
lines changed

src/main/java/rx/internal/operators/OnSubscribeRefCount.java

Lines changed: 44 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -80,11 +80,8 @@ public void call(final Subscriber<? super T> subscriber) {
8080
}
8181
} else {
8282
try {
83-
// handle unsubscribing from the base subscription
84-
subscriber.add(disconnect());
85-
8683
// ready to subscribe to source so do it
87-
source.unsafeSubscribe(subscriber);
84+
doSubscribe(subscriber, baseSubscription);
8885
} finally {
8986
// release the read lock
9087
lock.unlock();
@@ -101,12 +98,8 @@ public void call(Subscription subscription) {
10198

10299
try {
103100
baseSubscription.add(subscription);
104-
105-
// handle unsubscribing from the base subscription
106-
subscriber.add(disconnect());
107-
108101
// ready to subscribe to source so do it
109-
source.unsafeSubscribe(subscriber);
102+
doSubscribe(subscriber, baseSubscription);
110103
} finally {
111104
// release the write lock
112105
lock.unlock();
@@ -115,18 +108,54 @@ public void call(Subscription subscription) {
115108
}
116109
};
117110
}
111+
112+
void doSubscribe(final Subscriber<? super T> subscriber, final CompositeSubscription currentBase) {
113+
// handle unsubscribing from the base subscription
114+
subscriber.add(disconnect(currentBase));
115+
116+
source.unsafeSubscribe(new Subscriber<T>(subscriber) {
117+
@Override
118+
public void onError(Throwable e) {
119+
cleanup();
120+
subscriber.onError(e);
121+
}
122+
@Override
123+
public void onNext(T t) {
124+
subscriber.onNext(t);
125+
}
126+
@Override
127+
public void onCompleted() {
128+
cleanup();
129+
subscriber.onCompleted();
130+
}
131+
void cleanup() {
132+
lock.lock();
133+
try {
134+
if (baseSubscription == currentBase) {
135+
baseSubscription.unsubscribe();
136+
baseSubscription = new CompositeSubscription();
137+
subscriptionCount.set(0);
138+
}
139+
} finally {
140+
lock.unlock();
141+
}
142+
}
143+
});
144+
}
118145

119-
private Subscription disconnect() {
146+
private Subscription disconnect(final CompositeSubscription current) {
120147
return Subscriptions.create(new Action0() {
121148
@Override
122149
public void call() {
123150
lock.lock();
124151
try {
125-
if (subscriptionCount.decrementAndGet() == 0) {
126-
baseSubscription.unsubscribe();
127-
// need a new baseSubscription because once
128-
// unsubscribed stays that way
129-
baseSubscription = new CompositeSubscription();
152+
if (baseSubscription == current) {
153+
if (subscriptionCount.decrementAndGet() == 0) {
154+
baseSubscription.unsubscribe();
155+
// need a new baseSubscription because once
156+
// unsubscribed stays that way
157+
baseSubscription = new CompositeSubscription();
158+
}
130159
}
131160
} finally {
132161
lock.unlock();

src/test/java/rx/internal/operators/OnSubscribeRefCountTest.java

Lines changed: 80 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -15,39 +15,24 @@
1515
*/
1616
package rx.internal.operators;
1717

18-
import static org.junit.Assert.assertEquals;
19-
import static org.junit.Assert.assertTrue;
20-
import static org.junit.Assert.fail;
18+
import static org.junit.Assert.*;
2119
import static org.mockito.Matchers.any;
22-
import static org.mockito.Mockito.inOrder;
23-
import static org.mockito.Mockito.mock;
24-
import static org.mockito.Mockito.never;
25-
import static org.mockito.Mockito.verify;
26-
27-
import java.util.ArrayList;
28-
import java.util.Arrays;
29-
import java.util.List;
30-
import java.util.concurrent.CountDownLatch;
31-
import java.util.concurrent.TimeUnit;
20+
import static org.mockito.Mockito.*;
21+
22+
import java.util.*;
23+
import java.util.concurrent.*;
3224
import java.util.concurrent.atomic.AtomicInteger;
3325

34-
import org.junit.Before;
35-
import org.junit.Test;
36-
import org.mockito.InOrder;
37-
import org.mockito.MockitoAnnotations;
26+
import org.junit.*;
27+
import org.mockito.*;
3828

39-
import rx.Observable;
29+
import rx.*;
4030
import rx.Observable.OnSubscribe;
31+
import rx.Observable;
4132
import rx.Observer;
42-
import rx.Subscriber;
43-
import rx.Subscription;
44-
import rx.functions.Action0;
45-
import rx.functions.Action1;
46-
import rx.functions.Func2;
47-
import rx.observers.Subscribers;
48-
import rx.observers.TestSubscriber;
49-
import rx.schedulers.Schedulers;
50-
import rx.schedulers.TestScheduler;
33+
import rx.functions.*;
34+
import rx.observers.*;
35+
import rx.schedulers.*;
5136
import rx.subjects.ReplaySubject;
5237
import rx.subscriptions.Subscriptions;
5338

@@ -532,4 +517,72 @@ public Integer call(Integer t1, Integer t2) {
532517
ts2.assertReceivedOnNext(Arrays.asList(30));
533518
}
534519

520+
@Test(timeout = 10000)
521+
public void testUpstreamErrorAllowsRetry() throws InterruptedException {
522+
final AtomicInteger intervalSubscribed = new AtomicInteger();
523+
Observable<String> interval =
524+
Observable.interval(200,TimeUnit.MILLISECONDS)
525+
.doOnSubscribe(
526+
new Action0() {
527+
@Override
528+
public void call() {
529+
System.out.println("Subscribing to interval " + intervalSubscribed.incrementAndGet());
530+
}
531+
}
532+
)
533+
.flatMap(new Func1<Long, Observable<String>>() {
534+
@Override
535+
public Observable<String> call(Long t1) {
536+
return Observable.defer(new Func0<Observable<String>>() {
537+
@Override
538+
public Observable<String> call() {
539+
return Observable.<String>error(new Exception("Some exception"));
540+
}
541+
});
542+
}
543+
})
544+
.onErrorResumeNext(new Func1<Throwable, Observable<String>>() {
545+
@Override
546+
public Observable<String> call(Throwable t1) {
547+
return Observable.error(t1);
548+
}
549+
})
550+
.publish()
551+
.refCount();
552+
553+
interval
554+
.doOnError(new Action1<Throwable>() {
555+
@Override
556+
public void call(Throwable t1) {
557+
System.out.println("Subscriber 1 onError: " + t1);
558+
}
559+
})
560+
.retry(5)
561+
.subscribe(new Action1<String>() {
562+
@Override
563+
public void call(String t1) {
564+
System.out.println("Subscriber 1: " + t1);
565+
}
566+
});
567+
Thread.sleep(100);
568+
interval
569+
.doOnError(new Action1<Throwable>() {
570+
@Override
571+
public void call(Throwable t1) {
572+
System.out.println("Subscriber 2 onError: " + t1);
573+
}
574+
})
575+
.retry(5)
576+
.subscribe(new Action1<String>() {
577+
@Override
578+
public void call(String t1) {
579+
System.out.println("Subscriber 2: " + t1);
580+
}
581+
});
582+
583+
Thread.sleep(1300);
584+
585+
System.out.println(intervalSubscribed.get());
586+
assertEquals(6, intervalSubscribed.get());
587+
}
535588
}

0 commit comments

Comments
 (0)