Skip to content

Commit f17e934

Browse files
Merge pull request #767 from akarnokd/ZipFixes
Zip fix for multiple onCompleted and moved unsubscribe outside the lock.
2 parents db0440a + 51b8acf commit f17e934

File tree

2 files changed

+71
-17
lines changed

2 files changed

+71
-17
lines changed

rxjava-core/src/main/java/rx/operators/OperationZip.java

Lines changed: 26 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -152,19 +152,28 @@ public Subscription onSubscribe(final Observer<? super U> observer) {
152152
final List<ItemObserver<T>> all = new ArrayList<ItemObserver<T>>();
153153

154154
Observer<List<T>> o2 = new Observer<List<T>>() {
155+
boolean done;
155156
@Override
156157
public void onCompleted() {
157-
observer.onCompleted();
158+
if (!done) {
159+
done = true;
160+
observer.onCompleted();
161+
}
158162
}
159163

160164
@Override
161165
public void onError(Throwable t) {
162-
observer.onError(t);
166+
if (!done) {
167+
done = true;
168+
observer.onError(t);
169+
}
163170
}
164171

165172
@Override
166173
public void onNext(List<T> value) {
167-
observer.onNext(selector.call(value.toArray(new Object[value.size()])));
174+
if (!done) {
175+
observer.onNext(selector.call(value.toArray(new Object[value.size()])));
176+
}
168177
}
169178
};
170179

@@ -251,14 +260,15 @@ public void onNext(T value) {
251260
}
252261
// run collector
253262
if (rwLock.writeLock().tryLock()) {
263+
boolean cu = false;
254264
try {
255265
while (true) {
256266
List<T> values = new ArrayList<T>(all.size());
257267
for (ItemObserver<T> io : all) {
258268
if (io.queue.isEmpty()) {
259269
if (io.done) {
260270
observer.onCompleted();
261-
cancel.unsubscribe();
271+
cu = true;
262272
return;
263273
}
264274
continue;
@@ -280,61 +290,60 @@ public void onNext(T value) {
280290
}
281291
} finally {
282292
rwLock.writeLock().unlock();
293+
if (cu) {
294+
cancel.unsubscribe();
295+
}
283296
}
284297
}
285298
}
286299

287300
@Override
288301
public void onError(Throwable ex) {
289-
boolean c = false;
290302
rwLock.writeLock().lock();
291303
try {
292304
if (done) {
293305
return;
294306
}
295307
done = true;
296-
c = true;
297308
observer.onError(ex);
298-
cancel.unsubscribe();
299309
} finally {
300310
rwLock.writeLock().unlock();
301311
}
302-
if (c) {
303-
unsubscribe();
304-
}
312+
cancel.unsubscribe();
313+
unsubscribe();
305314
}
306315

307316
@Override
308317
public void onCompleted() {
309-
boolean c = false;
310318
rwLock.readLock().lock();
311319
try {
312320
done = true;
313-
c = true;
314321
} finally {
315322
rwLock.readLock().unlock();
316323
}
317324
if (rwLock.writeLock().tryLock()) {
325+
boolean cu = false;
318326
try {
319327
for (ItemObserver<T> io : all) {
320328
if (io.queue.isEmpty() && io.done) {
321329
observer.onCompleted();
322-
cancel.unsubscribe();
330+
cu = true;
323331
return;
324332
}
325333
}
326334
} finally {
327335
rwLock.writeLock().unlock();
336+
if (cu) {
337+
cancel.unsubscribe();
338+
}
328339
}
329340
}
330-
if (c) {
331-
unsubscribe();
332-
}
341+
unsubscribe();
333342
}
334343

335344
/** Connect to the source observable. */
336345
public void connect() {
337-
toSource.setSubscription(source.subscribe(this));
346+
toSource.set(source.subscribe(this));
338347
}
339348

340349
@Override

rxjava-core/src/test/java/rx/operators/OperationZipTest.java

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1018,4 +1018,49 @@ public void remove() {
10181018
verify(o, never()).onCompleted();
10191019

10201020
}
1021+
1022+
@Test
1023+
public void testZipWithOnCompletedTwice() {
1024+
// issue: https://groups.google.com/forum/#!topic/rxjava/79cWTv3TFp0
1025+
// The problem is the original "zip" implementation does not wrap
1026+
// an internal observer with a SafeObserver. However, in the "zip",
1027+
// it may calls "onCompleted" twice. That breaks the Rx contract.
1028+
1029+
// This test tries to emulate this case.
1030+
// As "mock(Observer.class)" will create an instance in the package "rx",
1031+
// we need to wrap "mock(Observer.class)" with an observer instance
1032+
// which is in the package "rx.operators".
1033+
@SuppressWarnings("unchecked")
1034+
final Observer<Integer> observer = mock(Observer.class);
1035+
1036+
Observable.zip(Observable.from(1),
1037+
Observable.from(1), new Func2<Integer, Integer, Integer>() {
1038+
@Override
1039+
public Integer call(Integer a, Integer b) {
1040+
return a + b;
1041+
}
1042+
}).subscribe(new Observer<Integer>() {
1043+
1044+
@Override
1045+
public void onCompleted() {
1046+
observer.onCompleted();
1047+
}
1048+
1049+
@Override
1050+
public void onError(Throwable e) {
1051+
observer.onError(e);
1052+
}
1053+
1054+
@Override
1055+
public void onNext(Integer args) {
1056+
observer.onNext(args);
1057+
}
1058+
1059+
});
1060+
1061+
InOrder inOrder = inOrder(observer);
1062+
inOrder.verify(observer, times(1)).onNext(2);
1063+
inOrder.verify(observer, times(1)).onCompleted();
1064+
inOrder.verifyNoMoreInteractions();
1065+
}
10211066
}

0 commit comments

Comments
 (0)