Skip to content

Commit 2de214c

Browse files
author
ylecaillez
committed
Fix OperationSwitch so that it does not onComplete() before inner and
outer subscription completes.
1 parent 41ebe38 commit 2de214c

File tree

1 file changed

+29
-21
lines changed

1 file changed

+29
-21
lines changed

rxjava-core/src/main/java/rx/operators/OperationSwitch.java

Lines changed: 29 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
import rx.Observer;
3535
import rx.Subscription;
3636
import rx.concurrency.TestScheduler;
37+
import rx.subscriptions.CompositeSubscription;
3738
import rx.subscriptions.MultipleAssignmentSubscription;
3839
import rx.subscriptions.Subscriptions;
3940
import rx.util.functions.Action0;
@@ -78,9 +79,15 @@ public Switch(Observable<? extends Observable<? extends T>> sequences) {
7879

7980
@Override
8081
public Subscription onSubscribe(Observer<? super T> observer) {
81-
SafeObservableSubscription subscription = new SafeObservableSubscription();
82-
subscription.wrap(sequences.subscribe(new SwitchObserver<T>(observer, subscription)));
83-
return subscription;
82+
SafeObservableSubscription parent;
83+
parent = new SafeObservableSubscription();
84+
85+
MultipleAssignmentSubscription child;
86+
child = new MultipleAssignmentSubscription();
87+
88+
parent.wrap(sequences.subscribe(new SwitchObserver<T>(observer, parent, child)));
89+
90+
return new CompositeSubscription(parent, child);
8491
}
8592
}
8693

@@ -89,24 +96,25 @@ private static class SwitchObserver<T> implements Observer<Observable<? extends
8996
private final Object gate;
9097
private final Observer<? super T> observer;
9198
private final SafeObservableSubscription parent;
92-
private final MultipleAssignmentSubscription innerSubscription;
99+
private final MultipleAssignmentSubscription child;
93100
private long latest;
94101
private boolean stopped;
95102
private boolean hasLatest;
96103

97-
public SwitchObserver(Observer<? super T> observer, SafeObservableSubscription parent) {
104+
public SwitchObserver(Observer<? super T> observer, SafeObservableSubscription parent,
105+
MultipleAssignmentSubscription child) {
98106
this.observer = observer;
99107
this.parent = parent;
108+
this.child = child;
100109
this.gate = new Object();
101-
this.innerSubscription = new MultipleAssignmentSubscription();
102110
}
103111

104112
@Override
105113
public void onNext(Observable<? extends T> args) {
106114
final long id;
107115
synchronized (gate) {
108116
id = ++latest;
109-
hasLatest = true;
117+
this.hasLatest = true;
110118
}
111119

112120
final SafeObservableSubscription sub;
@@ -116,7 +124,7 @@ public void onNext(Observable<? extends T> args) {
116124
public void onNext(T args) {
117125
synchronized (gate) {
118126
if (latest == id) {
119-
observer.onNext(args);
127+
SwitchObserver.this.observer.onNext(args);
120128
}
121129
}
122130
}
@@ -126,8 +134,8 @@ public void onError(Throwable e) {
126134
synchronized (gate) {
127135
sub.unsubscribe();
128136
if (latest == id) {
129-
observer.onError(e);
130-
parent.unsubscribe();
137+
SwitchObserver.this.observer.onError(e);
138+
SwitchObserver.this.parent.unsubscribe();
131139
}
132140
}
133141
}
@@ -137,39 +145,39 @@ public void onCompleted() {
137145
synchronized (gate) {
138146
sub.unsubscribe();
139147
if (latest == id) {
140-
hasLatest = false;
148+
SwitchObserver.this.hasLatest = false;
141149
}
142150

143151
if (stopped) {
144-
observer.onCompleted();
145-
parent.unsubscribe();
152+
SwitchObserver.this.observer.onCompleted();
153+
SwitchObserver.this.parent.unsubscribe();
146154
}
147155

148156
}
149157
}
150158

151159
}));
152160

153-
innerSubscription.setSubscription(sub);
161+
this.child.setSubscription(sub);
154162
}
155163

156164
@Override
157165
public void onError(Throwable e) {
158166
synchronized (gate) {
159-
observer.onError(e);
167+
this.observer.onError(e);
160168
}
161169

162-
parent.unsubscribe();
170+
this.parent.unsubscribe();
163171
}
164172

165173
@Override
166174
public void onCompleted() {
167175
synchronized (gate) {
168-
innerSubscription.unsubscribe();
169-
stopped = true;
170-
if (!hasLatest) {
171-
observer.onCompleted();
172-
parent.unsubscribe();
176+
this.child.unsubscribe();
177+
this.stopped = true;
178+
if (!this.hasLatest) {
179+
this.observer.onCompleted();
180+
this.parent.unsubscribe();
173181
}
174182
}
175183
}

0 commit comments

Comments
 (0)