Skip to content

Commit 299649b

Browse files
committed
Merge branch 'master' into single
2 parents 8867952 + bcded86 commit 299649b

21 files changed

+796
-857
lines changed

rxjava-core/src/main/java/rx/observers/SerializedObserver.java

Lines changed: 9 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,11 @@
22

33
import java.util.ArrayList;
44

5+
import javax.management.NotificationListener;
6+
7+
import rx.Notification;
58
import rx.Observer;
9+
import rx.operators.NotificationLite;
610

711
/**
812
* Enforce single-threaded, serialized, ordered execution of onNext, onCompleted, onError.
@@ -22,21 +26,7 @@ public class SerializedObserver<T> implements Observer<T> {
2226
private boolean emitting = false;
2327
private boolean terminated = false;
2428
private ArrayList<Object> queue = new ArrayList<Object>();
25-
26-
private static Sentinel NULL_SENTINEL = new Sentinel();
27-
private static Sentinel COMPLETE_SENTINEL = new Sentinel();
28-
29-
private static class Sentinel {
30-
31-
}
32-
33-
private static class ErrorSentinel extends Sentinel {
34-
final Throwable e;
35-
36-
ErrorSentinel(Throwable e) {
37-
this.e = e;
38-
}
39-
}
29+
private NotificationLite<T> on = NotificationLite.instance();
4030

4131
public SerializedObserver(Observer<? super T> s) {
4232
this.actual = s;
@@ -61,7 +51,7 @@ public void onCompleted() {
6151
}
6252
} else {
6353
// someone else is already emitting so just queue it
64-
queue.add(COMPLETE_SENTINEL);
54+
queue.add(on.completed());
6555
}
6656
}
6757
if (canEmit) {
@@ -97,7 +87,7 @@ public void onError(final Throwable e) {
9787
} else {
9888
// someone else is already emitting so just queue it ... after eliminating the queue to shortcut
9989
queue.clear();
100-
queue.add(new ErrorSentinel(e));
90+
queue.add(on.error(e));
10191
}
10292
}
10393
if (canEmit) {
@@ -131,11 +121,7 @@ public void onNext(T t) {
131121
}
132122
} else {
133123
// someone else is already emitting so just queue it
134-
if (t == null) {
135-
queue.add(NULL_SENTINEL);
136-
} else {
137-
queue.add(t);
138-
}
124+
queue.add(on.next(t));
139125
}
140126
}
141127
if (canEmit) {
@@ -168,19 +154,7 @@ public void drainQueue(ArrayList<Object> list) {
168154
return;
169155
}
170156
for (Object v : list) {
171-
if (v != null) {
172-
if (v instanceof Sentinel) {
173-
if (v == NULL_SENTINEL) {
174-
actual.onNext(null);
175-
} else if (v == COMPLETE_SENTINEL) {
176-
actual.onCompleted();
177-
} else if (v instanceof ErrorSentinel) {
178-
actual.onError(((ErrorSentinel) v).e);
179-
}
180-
} else {
181-
actual.onNext((T) v);
182-
}
183-
}
157+
on.accept(actual, v);
184158
}
185159
}
186160
}

rxjava-core/src/main/java/rx/observers/TestObserver.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@
2727
*/
2828
public class TestObserver<T> implements Observer<T> {
2929

30-
3130
private final Observer<T> delegate;
3231
private final ArrayList<T> onNextEvents = new ArrayList<T>();
3332
private final ArrayList<Throwable> onErrorEvents = new ArrayList<Throwable>();
@@ -91,7 +90,8 @@ public void assertReceivedOnNext(List<T> items) {
9190
throw new AssertionError("Value at index: " + i + " expected to be [null] but was: [" + onNextEvents.get(i) + "]");
9291
}
9392
} else if (!items.get(i).equals(onNextEvents.get(i))) {
94-
throw new AssertionError("Value at index: " + i + " expected to be [" + items.get(i) + "] but was: [" + onNextEvents.get(i) + "]");
93+
throw new AssertionError("Value at index: " + i + " expected to be [" + items.get(i) + "] (" + items.get(i).getClass().getSimpleName() + ") but was: [" + onNextEvents.get(i) + "] (" + onNextEvents.get(i).getClass().getSimpleName() + ")");
94+
9595
}
9696
}
9797

0 commit comments

Comments
 (0)