Skip to content

Commit 570a8f9

Browse files
Merge pull request #1175 from akarnokd/ConnectableObservableSynchronousFix
Fixed synchronous ConnectableObservable.connect problem
2 parents cdad283 + 3e58d90 commit 570a8f9

File tree

4 files changed

+95
-24
lines changed

4 files changed

+95
-24
lines changed

rxjava-core/src/main/java/rx/observables/ConnectableObservable.java

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import rx.Observable;
1919
import rx.Subscriber;
2020
import rx.Subscription;
21+
import rx.functions.Action1;
2122
import rx.operators.OperatorRefCount;
2223

2324
/**
@@ -44,10 +45,28 @@ protected ConnectableObservable(OnSubscribe<T> onSubscribe) {
4445
/**
4546
* Call a ConnectableObservable's connect() method to instruct it to begin emitting the
4647
* items from its underlying {@link Observable} to its {@link Subscriber}s.
48+
* <p>To disconnect from a synchronous source, use the {@link #connect(rx.functions.Action1)}
49+
* method.
4750
* @return the subscription representing the connection
4851
*/
49-
public abstract Subscription connect();
50-
52+
public final Subscription connect() {
53+
final Subscription[] out = new Subscription[1];
54+
connect(new Action1<Subscription>() {
55+
@Override
56+
public void call(Subscription t1) {
57+
out[0] = t1;
58+
}
59+
});
60+
return out[0];
61+
}
62+
/**
63+
* Call a ConnectableObservable's connect() method to instruct it to begin emitting the
64+
* items from its underlying {@link Observable} to its {@link Subscriber}s.
65+
* @param connection the action that receives the connection subscription
66+
* before the subscription to source happens allowing the caller
67+
* to synchronously disconnect a synchronous source.
68+
*/
69+
public abstract void connect(Action1<? super Subscription> connection);
5170
/**
5271
* Returns an observable sequence that stays connected to the source as long
5372
* as there is at least one subscription to the observable sequence.

rxjava-core/src/main/java/rx/operators/OperatorMulticast.java

Lines changed: 15 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import rx.Subscriber;
2020
import rx.Subscription;
2121
import rx.functions.Action0;
22+
import rx.functions.Action1;
2223
import rx.observables.ConnectableObservable;
2324
import rx.subjects.Subject;
2425
import rx.subscriptions.Subscriptions;
@@ -48,7 +49,20 @@ public void call(Subscriber<? super R> subscriber) {
4849
}
4950

5051
@Override
51-
public Subscription connect() {
52+
public void connect(Action1<? super Subscription> connection) {
53+
connection.call(Subscriptions.create(new Action0() {
54+
@Override
55+
public void call() {
56+
Subscription s;
57+
synchronized (guard) {
58+
s = subscription;
59+
subscription = null;
60+
}
61+
if (s != null) {
62+
s.unsubscribe();
63+
}
64+
}
65+
}));
5266
Subscriber<T> s = null;
5367
synchronized (guard) {
5468
if (subscription == null) {
@@ -74,20 +88,5 @@ public void onNext(T args) {
7488
if (s != null) {
7589
source.unsafeSubscribe(s);
7690
}
77-
78-
return Subscriptions.create(new Action0() {
79-
@Override
80-
public void call() {
81-
Subscription s;
82-
synchronized (guard) {
83-
s = subscription;
84-
subscription = null;
85-
}
86-
if (s != null) {
87-
s.unsubscribe();
88-
}
89-
}
90-
});
9191
}
92-
9392
}

rxjava-core/src/main/java/rx/operators/OperatorMulticastSelector.java

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,13 @@
1818
import rx.Observable;
1919
import rx.Observable.OnSubscribe;
2020
import rx.Subscriber;
21+
import rx.Subscription;
22+
import rx.functions.Action1;
2123
import rx.functions.Func0;
2224
import rx.functions.Func1;
2325
import rx.observables.ConnectableObservable;
2426
import rx.observers.SafeSubscriber;
2527
import rx.subjects.Subject;
26-
import rx.subscriptions.CompositeSubscription;
2728

2829
/**
2930
* Returns an observable sequence that contains the elements of a sequence
@@ -63,14 +64,16 @@ public void call(Subscriber<? super TResult> child) {
6364
return;
6465
}
6566

66-
CompositeSubscription csub = new CompositeSubscription();
67-
child.add(csub);
68-
69-
SafeSubscriber<TResult> s = new SafeSubscriber<TResult>(child);
67+
final SafeSubscriber<TResult> s = new SafeSubscriber<TResult>(child);
7068

7169
observable.unsafeSubscribe(s);
7270

73-
csub.add(connectable.connect());
71+
connectable.connect(new Action1<Subscription>() {
72+
@Override
73+
public void call(Subscription t1) {
74+
s.add(t1);
75+
}
76+
});
7477
}
7578

7679
}

rxjava-core/src/test/java/rx/operators/OperatorReplayTest.java

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,13 +24,17 @@
2424

2525
import java.util.Arrays;
2626
import java.util.concurrent.TimeUnit;
27+
import java.util.concurrent.atomic.AtomicInteger;
2728

2829
import org.junit.Assert;
30+
import static org.junit.Assert.assertEquals;
2931
import org.junit.Test;
3032
import org.mockito.InOrder;
3133

3234
import rx.Observable;
3335
import rx.Observer;
36+
import rx.functions.Action0;
37+
import rx.functions.Action1;
3438
import rx.functions.Func1;
3539
import rx.observables.ConnectableObservable;
3640
import rx.operators.OperatorReplay.VirtualBoundedList;
@@ -470,4 +474,50 @@ public void testWindowedReplayError() {
470474
verify(observer1, never()).onCompleted();
471475
}
472476
}
477+
@Test
478+
public void testSynchronousDisconnect() {
479+
final AtomicInteger effectCounter = new AtomicInteger();
480+
Observable<Integer> source = Observable.from(1, 2, 3, 4)
481+
.doOnNext(new Action1<Integer>() {
482+
@Override
483+
public void call(Integer v) {
484+
effectCounter.incrementAndGet();
485+
System.out.println("Sideeffect #" + v);
486+
}
487+
});
488+
489+
Observable<Integer> result = source.replay(
490+
new Func1<Observable<Integer>, Observable<Integer>>() {
491+
@Override
492+
public Observable<Integer> call(Observable<Integer> o) {
493+
return o.take(2);
494+
}
495+
});
496+
497+
for (int i = 1; i < 3; i++) {
498+
effectCounter.set(0);
499+
System.out.printf("- %d -%n", i);
500+
result.subscribe(new Action1<Integer>() {
501+
502+
@Override
503+
public void call(Integer t1) {
504+
System.out.println(t1);
505+
}
506+
507+
}, new Action1<Throwable>() {
508+
509+
@Override
510+
public void call(Throwable t1) {
511+
t1.printStackTrace();
512+
}
513+
},
514+
new Action0() {
515+
@Override
516+
public void call() {
517+
System.out.println("Done");
518+
}
519+
});
520+
assertEquals(2, effectCounter.get());
521+
}
522+
}
473523
}

0 commit comments

Comments
 (0)