Skip to content

Commit 132f925

Browse files
Merge pull request #443 from ylecaillez/master
OperationSwitch notify onComplete() too early.
2 parents 3303dbd + d48e92f commit 132f925

File tree

1 file changed

+175
-44
lines changed

1 file changed

+175
-44
lines changed

rxjava-core/src/main/java/rx/operators/OperationSwitch.java

Lines changed: 175 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -15,11 +15,15 @@
1515
*/
1616
package rx.operators;
1717

18-
import static org.mockito.Matchers.*;
19-
import static org.mockito.Mockito.*;
18+
import static org.mockito.Matchers.any;
19+
import static org.mockito.Matchers.anyString;
20+
import static org.mockito.Mockito.inOrder;
21+
import static org.mockito.Mockito.mock;
22+
import static org.mockito.Mockito.never;
23+
import static org.mockito.Mockito.times;
24+
import static org.mockito.Mockito.verify;
2025

2126
import java.util.concurrent.TimeUnit;
22-
import java.util.concurrent.atomic.AtomicReference;
2327

2428
import org.junit.Before;
2529
import org.junit.Test;
@@ -30,24 +34,30 @@
3034
import rx.Observer;
3135
import rx.Subscription;
3236
import rx.concurrency.TestScheduler;
37+
import rx.subscriptions.CompositeSubscription;
38+
import rx.subscriptions.MultipleAssignmentSubscription;
3339
import rx.subscriptions.Subscriptions;
3440
import rx.util.functions.Action0;
3541
import rx.util.functions.Func1;
3642

3743
/**
38-
* Transforms an Observable that emits Observables into a single Observable that emits the items
39-
* emitted by the most recently published of those Observables.
44+
* Transforms an Observable that emits Observables into a single Observable that
45+
* emits the items emitted by the most recently published of those Observables.
4046
* <p>
41-
* <img width="640" src="https://github.com/Netflix/RxJava/wiki/images/rx-operators/switchDo.png">
47+
* <img width="640" src=
48+
* "https://github.com/Netflix/RxJava/wiki/images/rx-operators/switchDo.png">
4249
*/
4350
public final class OperationSwitch {
4451

4552
/**
46-
* This function transforms an {@link Observable} sequence of {@link Observable} sequences into a single {@link Observable} sequence which produces values from the most recently published
47-
* {@link Observable}.
53+
* This function transforms an {@link Observable} sequence of
54+
* {@link Observable} sequences into a single {@link Observable} sequence
55+
* which produces values from the most recently published {@link Observable}
56+
* .
4857
*
4958
* @param sequences
50-
* The {@link Observable} sequence consisting of {@link Observable} sequences.
59+
* The {@link Observable} sequence consisting of
60+
* {@link Observable} sequences.
5161
* @return A {@link Func1} which does this transformation.
5262
*/
5363
public static <T> OnSubscribeFunc<T> switchDo(final Observable<? extends Observable<? extends T>> sequences) {
@@ -69,69 +79,113 @@ public Switch(Observable<? extends Observable<? extends T>> sequences) {
6979

7080
@Override
7181
public Subscription onSubscribe(Observer<? super T> observer) {
72-
SafeObservableSubscription subscription = new SafeObservableSubscription();
73-
subscription.wrap(sequences.subscribe(new SwitchObserver<T>(observer, subscription)));
74-
return subscription;
82+
SafeObservableSubscription parent;
83+
parent = new SafeObservableSubscription();
84+
85+
MultipleAssignmentSubscription child;
86+
child = new MultipleAssignmentSubscription();
87+
88+
parent.wrap(sequences.subscribe(new SwitchObserver<T>(observer, parent, child)));
89+
90+
return new CompositeSubscription(parent, child);
7591
}
7692
}
7793

7894
private static class SwitchObserver<T> implements Observer<Observable<? extends T>> {
7995

80-
private final Observer<? super T> observer;
81-
private final SafeObservableSubscription parent;
82-
private final AtomicReference<Subscription> subsequence = new AtomicReference<Subscription>();
96+
private final Object gate;
97+
private final Observer<? super T> observer;
98+
private final SafeObservableSubscription parent;
99+
private final MultipleAssignmentSubscription child;
100+
private long latest;
101+
private boolean stopped;
102+
private boolean hasLatest;
83103

84-
public SwitchObserver(Observer<? super T> observer, SafeObservableSubscription parent) {
104+
public SwitchObserver(Observer<? super T> observer, SafeObservableSubscription parent,
105+
MultipleAssignmentSubscription child) {
85106
this.observer = observer;
86107
this.parent = parent;
87-
}
88-
89-
@Override
90-
public void onCompleted() {
91-
unsubscribeFromSubSequence();
92-
observer.onCompleted();
93-
}
94-
95-
@Override
96-
public void onError(Throwable e) {
97-
unsubscribeFromSubSequence();
98-
observer.onError(e);
108+
this.child = child;
109+
this.gate = new Object();
99110
}
100111

101112
@Override
102113
public void onNext(Observable<? extends T> args) {
103-
unsubscribeFromSubSequence();
114+
final long id;
115+
synchronized (gate) {
116+
id = ++latest;
117+
this.hasLatest = true;
118+
}
104119

105-
subsequence.set(args.subscribe(new Observer<T>() {
120+
final SafeObservableSubscription sub;
121+
sub = new SafeObservableSubscription();
122+
sub.wrap(args.subscribe(new Observer<T>() {
106123
@Override
107-
public void onCompleted() {
108-
// Do nothing.
124+
public void onNext(T args) {
125+
synchronized (gate) {
126+
if (latest == id) {
127+
SwitchObserver.this.observer.onNext(args);
128+
}
129+
}
109130
}
110131

111132
@Override
112133
public void onError(Throwable e) {
113-
parent.unsubscribe();
114-
observer.onError(e);
134+
synchronized (gate) {
135+
sub.unsubscribe();
136+
if (latest == id) {
137+
SwitchObserver.this.observer.onError(e);
138+
SwitchObserver.this.parent.unsubscribe();
139+
}
140+
}
115141
}
116142

117143
@Override
118-
public void onNext(T args) {
119-
observer.onNext(args);
144+
public void onCompleted() {
145+
synchronized (gate) {
146+
sub.unsubscribe();
147+
if (latest == id) {
148+
SwitchObserver.this.hasLatest = false;
149+
}
150+
151+
if (stopped) {
152+
SwitchObserver.this.observer.onCompleted();
153+
SwitchObserver.this.parent.unsubscribe();
154+
}
155+
156+
}
120157
}
158+
121159
}));
160+
161+
this.child.setSubscription(sub);
122162
}
123163

124-
private void unsubscribeFromSubSequence() {
125-
Subscription previousSubscription = subsequence.get();
126-
if (previousSubscription != null) {
127-
previousSubscription.unsubscribe();
164+
@Override
165+
public void onError(Throwable e) {
166+
synchronized (gate) {
167+
this.observer.onError(e);
168+
}
169+
170+
this.parent.unsubscribe();
171+
}
172+
173+
@Override
174+
public void onCompleted() {
175+
synchronized (gate) {
176+
this.stopped = true;
177+
if (!this.hasLatest) {
178+
this.observer.onCompleted();
179+
this.parent.unsubscribe();
180+
}
128181
}
129182
}
183+
130184
}
131185

132186
public static class UnitTest {
133187

134-
private TestScheduler scheduler;
188+
private TestScheduler scheduler;
135189
private Observer<String> observer;
136190

137191
@Before
@@ -141,6 +195,83 @@ public void before() {
141195
observer = mock(Observer.class);
142196
}
143197

198+
@Test
199+
public void testSwitchWhenOuterCompleteBeforeInner() {
200+
Observable<Observable<String>> source = Observable.create(new OnSubscribeFunc<Observable<String>>() {
201+
@Override
202+
public Subscription onSubscribe(Observer<? super Observable<String>> observer) {
203+
publishNext(observer, 50, Observable.create(new OnSubscribeFunc<String>() {
204+
@Override
205+
public Subscription onSubscribe(Observer<? super String> observer) {
206+
publishNext(observer, 70, "one");
207+
publishNext(observer, 100, "two");
208+
publishCompleted(observer, 200);
209+
return Subscriptions.empty();
210+
}
211+
}));
212+
publishCompleted(observer, 60);
213+
214+
return Subscriptions.empty();
215+
}
216+
});
217+
218+
Observable<String> sampled = Observable.create(OperationSwitch.switchDo(source));
219+
sampled.subscribe(observer);
220+
221+
InOrder inOrder = inOrder(observer);
222+
223+
scheduler.advanceTimeTo(350, TimeUnit.MILLISECONDS);
224+
inOrder.verify(observer, times(2)).onNext(anyString());
225+
inOrder.verify(observer, times(1)).onCompleted();
226+
}
227+
228+
@Test
229+
public void testSwitchWhenInnerCompleteBeforeOuter() {
230+
Observable<Observable<String>> source = Observable.create(new OnSubscribeFunc<Observable<String>>() {
231+
@Override
232+
public Subscription onSubscribe(Observer<? super Observable<String>> observer) {
233+
publishNext(observer, 10, Observable.create(new OnSubscribeFunc<String>() {
234+
@Override
235+
public Subscription onSubscribe(Observer<? super String> observer) {
236+
publishNext(observer, 0, "one");
237+
publishNext(observer, 10, "two");
238+
publishCompleted(observer, 20);
239+
return Subscriptions.empty();
240+
}
241+
}));
242+
243+
publishNext(observer, 100, Observable.create(new OnSubscribeFunc<String>() {
244+
@Override
245+
public Subscription onSubscribe(Observer<? super String> observer) {
246+
publishNext(observer, 0, "three");
247+
publishNext(observer, 10, "four");
248+
publishCompleted(observer, 20);
249+
return Subscriptions.empty();
250+
}
251+
}));
252+
publishCompleted(observer, 200);
253+
254+
return Subscriptions.empty();
255+
}
256+
});
257+
258+
Observable<String> sampled = Observable.create(OperationSwitch.switchDo(source));
259+
sampled.subscribe(observer);
260+
261+
InOrder inOrder = inOrder(observer);
262+
263+
scheduler.advanceTimeTo(150, TimeUnit.MILLISECONDS);
264+
inOrder.verify(observer, never()).onCompleted();
265+
inOrder.verify(observer, times(1)).onNext("one");
266+
inOrder.verify(observer, times(1)).onNext("two");
267+
inOrder.verify(observer, times(1)).onNext("three");
268+
inOrder.verify(observer, times(1)).onNext("four");
269+
270+
scheduler.advanceTimeTo(250, TimeUnit.MILLISECONDS);
271+
inOrder.verify(observer, never()).onNext(anyString());
272+
inOrder.verify(observer, times(1)).onCompleted();
273+
}
274+
144275
@Test
145276
public void testSwitchWithComplete() {
146277
Observable<Observable<String>> source = Observable.create(new OnSubscribeFunc<Observable<String>>() {
@@ -149,7 +280,7 @@ public Subscription onSubscribe(Observer<? super Observable<String>> observer) {
149280
publishNext(observer, 50, Observable.create(new OnSubscribeFunc<String>() {
150281
@Override
151282
public Subscription onSubscribe(Observer<? super String> observer) {
152-
publishNext(observer, 50, "one");
283+
publishNext(observer, 60, "one");
153284
publishNext(observer, 100, "two");
154285
return Subscriptions.empty();
155286
}
@@ -196,8 +327,8 @@ public Subscription onSubscribe(Observer<? super String> observer) {
196327
verify(observer, never()).onError(any(Throwable.class));
197328

198329
scheduler.advanceTimeTo(350, TimeUnit.MILLISECONDS);
199-
inOrder.verify(observer, never()).onNext(anyString());
200-
verify(observer, times(1)).onCompleted();
330+
inOrder.verify(observer, times(1)).onNext("four");
331+
verify(observer, never()).onCompleted();
201332
verify(observer, never()).onError(any(Throwable.class));
202333
}
203334

0 commit comments

Comments
 (0)