Skip to content

Commit 41ebe38

Browse files
author
ylecaillez
committed
Fix OperationSwitch so that it does not onComplete() before inner and
outer subscription completes.
1 parent 2c4ab1a commit 41ebe38

File tree

1 file changed

+164
-40
lines changed

1 file changed

+164
-40
lines changed

rxjava-core/src/main/java/rx/operators/OperationSwitch.java

Lines changed: 164 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -15,11 +15,15 @@
1515
*/
1616
package rx.operators;
1717

18-
import static org.mockito.Matchers.*;
19-
import static org.mockito.Mockito.*;
18+
import static org.mockito.Matchers.any;
19+
import static org.mockito.Matchers.anyString;
20+
import static org.mockito.Mockito.inOrder;
21+
import static org.mockito.Mockito.mock;
22+
import static org.mockito.Mockito.never;
23+
import static org.mockito.Mockito.times;
24+
import static org.mockito.Mockito.verify;
2025

2126
import java.util.concurrent.TimeUnit;
22-
import java.util.concurrent.atomic.AtomicReference;
2327

2428
import org.junit.Before;
2529
import org.junit.Test;
@@ -30,24 +34,29 @@
3034
import rx.Observer;
3135
import rx.Subscription;
3236
import rx.concurrency.TestScheduler;
37+
import rx.subscriptions.MultipleAssignmentSubscription;
3338
import rx.subscriptions.Subscriptions;
3439
import rx.util.functions.Action0;
3540
import rx.util.functions.Func1;
3641

3742
/**
38-
* Transforms an Observable that emits Observables into a single Observable that emits the items
39-
* emitted by the most recently published of those Observables.
43+
* Transforms an Observable that emits Observables into a single Observable that
44+
* emits the items emitted by the most recently published of those Observables.
4045
* <p>
41-
* <img width="640" src="https://github.com/Netflix/RxJava/wiki/images/rx-operators/switchDo.png">
46+
* <img width="640" src=
47+
* "https://github.com/Netflix/RxJava/wiki/images/rx-operators/switchDo.png">
4248
*/
4349
public final class OperationSwitch {
4450

4551
/**
46-
* This function transforms an {@link Observable} sequence of {@link Observable} sequences into a single {@link Observable} sequence which produces values from the most recently published
47-
* {@link Observable}.
52+
* This function transforms an {@link Observable} sequence of
53+
* {@link Observable} sequences into a single {@link Observable} sequence
54+
* which produces values from the most recently published {@link Observable}
55+
* .
4856
*
4957
* @param sequences
50-
* The {@link Observable} sequence consisting of {@link Observable} sequences.
58+
* The {@link Observable} sequence consisting of
59+
* {@link Observable} sequences.
5160
* @return A {@link Func1} which does this transformation.
5261
*/
5362
public static <T> OnSubscribeFunc<T> switchDo(final Observable<? extends Observable<? extends T>> sequences) {
@@ -77,61 +86,99 @@ public Subscription onSubscribe(Observer<? super T> observer) {
7786

7887
private static class SwitchObserver<T> implements Observer<Observable<? extends T>> {
7988

80-
private final Observer<? super T> observer;
81-
private final SafeObservableSubscription parent;
82-
private final AtomicReference<Subscription> subsequence = new AtomicReference<Subscription>();
89+
private final Object gate;
90+
private final Observer<? super T> observer;
91+
private final SafeObservableSubscription parent;
92+
private final MultipleAssignmentSubscription innerSubscription;
93+
private long latest;
94+
private boolean stopped;
95+
private boolean hasLatest;
8396

8497
public SwitchObserver(Observer<? super T> observer, SafeObservableSubscription parent) {
8598
this.observer = observer;
8699
this.parent = parent;
87-
}
88-
89-
@Override
90-
public void onCompleted() {
91-
unsubscribeFromSubSequence();
92-
observer.onCompleted();
93-
}
94-
95-
@Override
96-
public void onError(Throwable e) {
97-
unsubscribeFromSubSequence();
98-
observer.onError(e);
100+
this.gate = new Object();
101+
this.innerSubscription = new MultipleAssignmentSubscription();
99102
}
100103

101104
@Override
102105
public void onNext(Observable<? extends T> args) {
103-
unsubscribeFromSubSequence();
106+
final long id;
107+
synchronized (gate) {
108+
id = ++latest;
109+
hasLatest = true;
110+
}
104111

105-
subsequence.set(args.subscribe(new Observer<T>() {
112+
final SafeObservableSubscription sub;
113+
sub = new SafeObservableSubscription();
114+
sub.wrap(args.subscribe(new Observer<T>() {
106115
@Override
107-
public void onCompleted() {
108-
// Do nothing.
116+
public void onNext(T args) {
117+
synchronized (gate) {
118+
if (latest == id) {
119+
observer.onNext(args);
120+
}
121+
}
109122
}
110123

111124
@Override
112125
public void onError(Throwable e) {
113-
parent.unsubscribe();
114-
observer.onError(e);
126+
synchronized (gate) {
127+
sub.unsubscribe();
128+
if (latest == id) {
129+
observer.onError(e);
130+
parent.unsubscribe();
131+
}
132+
}
115133
}
116134

117135
@Override
118-
public void onNext(T args) {
119-
observer.onNext(args);
136+
public void onCompleted() {
137+
synchronized (gate) {
138+
sub.unsubscribe();
139+
if (latest == id) {
140+
hasLatest = false;
141+
}
142+
143+
if (stopped) {
144+
observer.onCompleted();
145+
parent.unsubscribe();
146+
}
147+
148+
}
120149
}
150+
121151
}));
152+
153+
innerSubscription.setSubscription(sub);
154+
}
155+
156+
@Override
157+
public void onError(Throwable e) {
158+
synchronized (gate) {
159+
observer.onError(e);
160+
}
161+
162+
parent.unsubscribe();
122163
}
123164

124-
private void unsubscribeFromSubSequence() {
125-
Subscription previousSubscription = subsequence.get();
126-
if (previousSubscription != null) {
127-
previousSubscription.unsubscribe();
165+
@Override
166+
public void onCompleted() {
167+
synchronized (gate) {
168+
innerSubscription.unsubscribe();
169+
stopped = true;
170+
if (!hasLatest) {
171+
observer.onCompleted();
172+
parent.unsubscribe();
173+
}
128174
}
129175
}
176+
130177
}
131178

132179
public static class UnitTest {
133180

134-
private TestScheduler scheduler;
181+
private TestScheduler scheduler;
135182
private Observer<String> observer;
136183

137184
@Before
@@ -141,6 +188,83 @@ public void before() {
141188
observer = mock(Observer.class);
142189
}
143190

191+
@Test
192+
public void testSwitchWhenOuterCompleteBeforeInner() {
193+
Observable<Observable<String>> source = Observable.create(new OnSubscribeFunc<Observable<String>>() {
194+
@Override
195+
public Subscription onSubscribe(Observer<? super Observable<String>> observer) {
196+
publishNext(observer, 50, Observable.create(new OnSubscribeFunc<String>() {
197+
@Override
198+
public Subscription onSubscribe(Observer<? super String> observer) {
199+
publishNext(observer, 70, "one");
200+
publishNext(observer, 100, "two");
201+
publishCompleted(observer, 200);
202+
return Subscriptions.empty();
203+
}
204+
}));
205+
publishCompleted(observer, 60);
206+
207+
return Subscriptions.empty();
208+
}
209+
});
210+
211+
Observable<String> sampled = Observable.create(OperationSwitch.switchDo(source));
212+
sampled.subscribe(observer);
213+
214+
InOrder inOrder = inOrder(observer);
215+
216+
scheduler.advanceTimeTo(350, TimeUnit.MILLISECONDS);
217+
inOrder.verify(observer, times(2)).onNext(anyString());
218+
inOrder.verify(observer, times(1)).onCompleted();
219+
}
220+
221+
@Test
222+
public void testSwitchWhenInnerCompleteBeforeOuter() {
223+
Observable<Observable<String>> source = Observable.create(new OnSubscribeFunc<Observable<String>>() {
224+
@Override
225+
public Subscription onSubscribe(Observer<? super Observable<String>> observer) {
226+
publishNext(observer, 10, Observable.create(new OnSubscribeFunc<String>() {
227+
@Override
228+
public Subscription onSubscribe(Observer<? super String> observer) {
229+
publishNext(observer, 0, "one");
230+
publishNext(observer, 10, "two");
231+
publishCompleted(observer, 20);
232+
return Subscriptions.empty();
233+
}
234+
}));
235+
236+
publishNext(observer, 100, Observable.create(new OnSubscribeFunc<String>() {
237+
@Override
238+
public Subscription onSubscribe(Observer<? super String> observer) {
239+
publishNext(observer, 0, "three");
240+
publishNext(observer, 10, "four");
241+
publishCompleted(observer, 20);
242+
return Subscriptions.empty();
243+
}
244+
}));
245+
publishCompleted(observer, 200);
246+
247+
return Subscriptions.empty();
248+
}
249+
});
250+
251+
Observable<String> sampled = Observable.create(OperationSwitch.switchDo(source));
252+
sampled.subscribe(observer);
253+
254+
InOrder inOrder = inOrder(observer);
255+
256+
scheduler.advanceTimeTo(150, TimeUnit.MILLISECONDS);
257+
inOrder.verify(observer, never()).onCompleted();
258+
inOrder.verify(observer, times(1)).onNext("one");
259+
inOrder.verify(observer, times(1)).onNext("two");
260+
inOrder.verify(observer, times(1)).onNext("three");
261+
inOrder.verify(observer, times(1)).onNext("four");
262+
263+
scheduler.advanceTimeTo(250, TimeUnit.MILLISECONDS);
264+
inOrder.verify(observer, never()).onNext(anyString());
265+
inOrder.verify(observer, times(1)).onCompleted();
266+
}
267+
144268
@Test
145269
public void testSwitchWithComplete() {
146270
Observable<Observable<String>> source = Observable.create(new OnSubscribeFunc<Observable<String>>() {
@@ -149,7 +273,7 @@ public Subscription onSubscribe(Observer<? super Observable<String>> observer) {
149273
publishNext(observer, 50, Observable.create(new OnSubscribeFunc<String>() {
150274
@Override
151275
public Subscription onSubscribe(Observer<? super String> observer) {
152-
publishNext(observer, 50, "one");
276+
publishNext(observer, 60, "one");
153277
publishNext(observer, 100, "two");
154278
return Subscriptions.empty();
155279
}
@@ -196,8 +320,8 @@ public Subscription onSubscribe(Observer<? super String> observer) {
196320
verify(observer, never()).onError(any(Throwable.class));
197321

198322
scheduler.advanceTimeTo(350, TimeUnit.MILLISECONDS);
199-
inOrder.verify(observer, never()).onNext(anyString());
200-
verify(observer, times(1)).onCompleted();
323+
inOrder.verify(observer, times(1)).onNext("four");
324+
verify(observer, never()).onCompleted();
201325
verify(observer, never()).onError(any(Throwable.class));
202326
}
203327

0 commit comments

Comments
 (0)