Skip to content

Commit 3d5474f

Browse files
Zip NULL and COMPLETE Sentinels
1 parent cf28bce commit 3d5474f

File tree

2 files changed

+76
-10
lines changed

2 files changed

+76
-10
lines changed

rxjava-core/src/main/java/rx/operators/OperatorZip.java

Lines changed: 16 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
import java.util.concurrent.ConcurrentLinkedQueue;
1919
import java.util.concurrent.atomic.AtomicLong;
2020

21-
import rx.Notification;
2221
import rx.Observable;
2322
import rx.Observer;
2423
import rx.Subscriber;
@@ -131,6 +130,9 @@ private static class Zip<R> {
131130
final FuncN<? extends R> zipFunction;
132131
final CompositeSubscription childSubscription = new CompositeSubscription();
133132

133+
static Object NULL_SENTINEL = new Object();
134+
static Object COMPLETE_SENTINEL = new Object();
135+
134136
@SuppressWarnings("rawtypes")
135137
public Zip(Observable[] os, final Subscriber<? super R> observer, FuncN<? extends R> zipFunction) {
136138
this.os = os;
@@ -170,13 +172,16 @@ void tick() {
170172
boolean allHaveValues = true;
171173
for (int i = 0; i < observers.length; i++) {
172174
vs[i] = ((InnerObserver) observers[i]).items.peek();
173-
if (vs[i] instanceof Notification) {
175+
if (vs[i] == NULL_SENTINEL) {
176+
// special handling for null
177+
vs[i] = null;
178+
} else if (vs[i] == COMPLETE_SENTINEL) {
179+
// special handling for onComplete
174180
observer.onCompleted();
175181
// we need to unsubscribe from all children since children are independently subscribed
176182
childSubscription.unsubscribe();
177183
return;
178-
}
179-
if (vs[i] == null) {
184+
} else if (vs[i] == null) {
180185
allHaveValues = false;
181186
// we continue as there may be an onCompleted on one of the others
182187
continue;
@@ -189,7 +194,7 @@ void tick() {
189194
for (int i = 0; i < observers.length; i++) {
190195
((InnerObserver) observers[i]).items.poll();
191196
// eagerly check if the next item on this queue is an onComplete
192-
if (((InnerObserver) observers[i]).items.peek() instanceof Notification) {
197+
if (((InnerObserver) observers[i]).items.peek() == COMPLETE_SENTINEL) {
193198
// it is an onComplete so shut down
194199
observer.onCompleted();
195200
// we need to unsubscribe from all children since children are independently subscribed
@@ -213,7 +218,7 @@ final class InnerObserver extends Subscriber {
213218
@SuppressWarnings("unchecked")
214219
@Override
215220
public void onCompleted() {
216-
items.add(Notification.createOnCompleted());
221+
items.add(COMPLETE_SENTINEL);
217222
tick();
218223
}
219224

@@ -226,8 +231,11 @@ public void onError(Throwable e) {
226231
@SuppressWarnings("unchecked")
227232
@Override
228233
public void onNext(Object t) {
229-
// TODO use a placeholder for NULL, such as Notification<T>(null)
230-
items.add(t);
234+
if (t == null) {
235+
items.add(NULL_SENTINEL);
236+
} else {
237+
items.add(t);
238+
}
231239
tick();
232240
}
233241
};

rxjava-core/src/test/java/rx/operators/OperatorZipTest.java

Lines changed: 60 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@
2222
import java.util.ArrayList;
2323
import java.util.Arrays;
2424
import java.util.Collection;
25-
import java.util.Iterator;
2625
import java.util.List;
2726
import java.util.concurrent.CountDownLatch;
2827
import java.util.concurrent.TimeUnit;
@@ -32,12 +31,12 @@
3231
import org.junit.Test;
3332
import org.mockito.InOrder;
3433

34+
import rx.Notification;
3535
import rx.Observable;
3636
import rx.Observable.OnSubscribe;
3737
import rx.Observer;
3838
import rx.Subscriber;
3939
import rx.Subscription;
40-
import rx.operators.OperationReduceTest.CustomException;
4140
import rx.subjects.PublishSubject;
4241
import rx.subscriptions.Subscriptions;
4342
import rx.util.functions.Action1;
@@ -928,6 +927,65 @@ public void onNext(String s) {
928927
assertEquals("5-5", list.get(4));
929928
}
930929

930+
@Test
931+
public void testEmitNull() {
932+
Observable<Integer> oi = Observable.from(1, null, 3);
933+
Observable<String> os = Observable.from("a", "b", null);
934+
Observable<String> o = Observable.zip(oi, os, new Func2<Integer, String, String>() {
935+
936+
@Override
937+
public String call(Integer t1, String t2) {
938+
return t1 + "-" + t2;
939+
}
940+
941+
});
942+
943+
final ArrayList<String> list = new ArrayList<String>();
944+
o.subscribe(new Action1<String>() {
945+
946+
@Override
947+
public void call(String s) {
948+
System.out.println(s);
949+
list.add(s);
950+
}
951+
});
952+
953+
assertEquals(3, list.size());
954+
assertEquals("1-a", list.get(0));
955+
assertEquals("null-b", list.get(1));
956+
assertEquals("3-null", list.get(2));
957+
}
958+
959+
@Test
960+
public void testEmitMaterializedNotifications() {
961+
Observable<Notification<Integer>> oi = Observable.from(1, 2, 3).materialize();
962+
Observable<Notification<String>> os = Observable.from("a", "b", "c").materialize();
963+
Observable<String> o = Observable.zip(oi, os, new Func2<Notification<Integer>, Notification<String>, String>() {
964+
965+
@Override
966+
public String call(Notification<Integer> t1, Notification<String> t2) {
967+
return t1.getKind() + "_" + t1.getValue() + "-" + t2.getKind() + "_" + t2.getValue();
968+
}
969+
970+
});
971+
972+
final ArrayList<String> list = new ArrayList<String>();
973+
o.subscribe(new Action1<String>() {
974+
975+
@Override
976+
public void call(String s) {
977+
System.out.println(s);
978+
list.add(s);
979+
}
980+
});
981+
982+
assertEquals(4, list.size());
983+
assertEquals("OnNext_1-OnNext_a", list.get(0));
984+
assertEquals("OnNext_2-OnNext_b", list.get(1));
985+
assertEquals("OnNext_3-OnNext_c", list.get(2));
986+
assertEquals("OnCompleted_null-OnCompleted_null", list.get(3));
987+
}
988+
931989
Observable<Integer> OBSERVABLE_OF_5_INTEGERS = OBSERVABLE_OF_5_INTEGERS(new AtomicInteger());
932990

933991
Observable<Integer> OBSERVABLE_OF_5_INTEGERS(final AtomicInteger numEmitted) {

0 commit comments

Comments
 (0)