Skip to content

Commit 00703cf

Browse files
committed
Notifications for the allocation adverse.
1 parent d3ead3a commit 00703cf

File tree

6 files changed

+226
-163
lines changed

6 files changed

+226
-163
lines changed

rxjava-core/src/main/java/rx/observers/SerializedObserver.java

Lines changed: 9 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,11 @@
22

33
import java.util.ArrayList;
44

5+
import javax.management.NotificationListener;
6+
7+
import rx.Notification;
58
import rx.Observer;
9+
import rx.operators.NotificationLite;
610

711
/**
812
* Enforce single-threaded, serialized, ordered execution of onNext, onCompleted, onError.
@@ -22,21 +26,7 @@ public class SerializedObserver<T> implements Observer<T> {
2226
private boolean emitting = false;
2327
private boolean terminated = false;
2428
private ArrayList<Object> queue = new ArrayList<Object>();
25-
26-
private static Sentinel NULL_SENTINEL = new Sentinel();
27-
private static Sentinel COMPLETE_SENTINEL = new Sentinel();
28-
29-
private static class Sentinel {
30-
31-
}
32-
33-
private static class ErrorSentinel extends Sentinel {
34-
final Throwable e;
35-
36-
ErrorSentinel(Throwable e) {
37-
this.e = e;
38-
}
39-
}
29+
private NotificationLite<T> on = NotificationLite.instance();
4030

4131
public SerializedObserver(Observer<? super T> s) {
4232
this.actual = s;
@@ -61,7 +51,7 @@ public void onCompleted() {
6151
}
6252
} else {
6353
// someone else is already emitting so just queue it
64-
queue.add(COMPLETE_SENTINEL);
54+
queue.add(on.completed());
6555
}
6656
}
6757
if (canEmit) {
@@ -97,7 +87,7 @@ public void onError(final Throwable e) {
9787
} else {
9888
// someone else is already emitting so just queue it ... after eliminating the queue to shortcut
9989
queue.clear();
100-
queue.add(new ErrorSentinel(e));
90+
queue.add(on.error(e));
10191
}
10292
}
10393
if (canEmit) {
@@ -131,11 +121,7 @@ public void onNext(T t) {
131121
}
132122
} else {
133123
// someone else is already emitting so just queue it
134-
if (t == null) {
135-
queue.add(NULL_SENTINEL);
136-
} else {
137-
queue.add(t);
138-
}
124+
queue.add(on.next(t));
139125
}
140126
}
141127
if (canEmit) {
@@ -168,19 +154,7 @@ public void drainQueue(ArrayList<Object> list) {
168154
return;
169155
}
170156
for (Object v : list) {
171-
if (v != null) {
172-
if (v instanceof Sentinel) {
173-
if (v == NULL_SENTINEL) {
174-
actual.onNext(null);
175-
} else if (v == COMPLETE_SENTINEL) {
176-
actual.onCompleted();
177-
} else if (v instanceof ErrorSentinel) {
178-
actual.onError(((ErrorSentinel) v).e);
179-
}
180-
} else {
181-
actual.onNext((T) v);
182-
}
183-
}
157+
on.accept(actual, v);
184158
}
185159
}
186160
}

rxjava-core/src/main/java/rx/operators/BufferUntilSubscriber.java

Lines changed: 6 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
import java.util.LinkedList;
1919
import java.util.Queue;
20+
2021
import rx.Subscriber;
2122
import rx.subscriptions.CompositeSubscription;
2223

@@ -36,21 +37,8 @@ public class BufferUntilSubscriber<T> extends Subscriber<T> {
3637
private final Queue<Object> queue = new LinkedList<Object>();
3738
/** The queue capacity. */
3839
private final int capacity;
39-
/** Null sentinel (in case queue type is changed). */
40-
private static final Object NULL_SENTINEL = new Object();
41-
/** Complete sentinel. */
42-
private static final Object COMPLETE_SENTINEL = new Object();
43-
/**
44-
* Container for an onError event.
45-
*/
46-
private static final class ErrorSentinel {
47-
final Throwable t;
40+
private final NotificationLite<T> on = NotificationLite.instance();
4841

49-
public ErrorSentinel(Throwable t) {
50-
this.t = t;
51-
}
52-
53-
}
5442
/**
5543
* Constructor that wraps the actual subscriber and shares its subscription.
5644
* @param capacity the queue capacity to accept before blocking, negative value indicates an unbounded queue
@@ -85,22 +73,7 @@ public void enterPassthroughMode() {
8573
while (!queue.isEmpty()) {
8674
Object o = queue.poll();
8775
if (!actual.isUnsubscribed()) {
88-
if (o == NULL_SENTINEL) {
89-
actual.onNext(null);
90-
} else
91-
if (o == COMPLETE_SENTINEL) {
92-
actual.onCompleted();
93-
} else
94-
if (o instanceof ErrorSentinel) {
95-
actual.onError(((ErrorSentinel)o).t);
96-
} else
97-
if (o != null) {
98-
@SuppressWarnings("unchecked")
99-
T v = (T)o;
100-
actual.onNext(v);
101-
} else {
102-
throw new NullPointerException();
103-
}
76+
on.accept(actual, o);
10477
}
10578
}
10679
passthroughMode = true;
@@ -115,7 +88,7 @@ public void onNext(T t) {
11588
synchronized (gate) {
11689
if (!passthroughMode) {
11790
if (capacity < 0 || queue.size() < capacity) {
118-
queue.offer(t != null ? t : NULL_SENTINEL);
91+
queue.offer(on.next(t));
11992
return;
12093
}
12194
try {
@@ -142,7 +115,7 @@ public void onError(Throwable e) {
142115
synchronized (gate) {
143116
if (!passthroughMode) {
144117
if (capacity < 0 || queue.size() < capacity) {
145-
queue.offer(new ErrorSentinel(e));
118+
queue.offer(on.error(e));
146119
return;
147120
}
148121
try {
@@ -169,7 +142,7 @@ public void onCompleted() {
169142
synchronized (gate) {
170143
if (!passthroughMode) {
171144
if (capacity < 0 || queue.size() < capacity) {
172-
queue.offer(COMPLETE_SENTINEL);
145+
queue.offer(on.completed());
173146
return;
174147
}
175148
try {
Lines changed: 167 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,167 @@
1+
package rx.operators;
2+
3+
import java.io.ObjectStreamException;
4+
import java.io.Serializable;
5+
6+
import rx.Notification;
7+
import rx.Notification.Kind;
8+
import rx.Observer;
9+
10+
/**
11+
* For use in internal operators that need something like materialize and dematerialize wholly
12+
* within the implementation of the operator but don't want to incur the allocation cost of actually
13+
* creating {@link rx.Notification} objects for every onNext and onComplete.
14+
*
15+
* An object is allocated inside {@link #error(Throwable)} to wrap the {@link Throwable} but this
16+
* shouldn't effect performance because exceptions should be exceptionally rare.
17+
*
18+
* It's implemented as a singleton to maintain some semblance of type safety that is completely
19+
* non-existent.
20+
*
21+
* @author gscampbell
22+
*
23+
* @param <T>
24+
*/
25+
public final class NotificationLite<T> {
26+
private NotificationLite() {
27+
}
28+
29+
private static final NotificationLite INSTANCE = new NotificationLite();
30+
31+
@SuppressWarnings("unchecked")
32+
public static <T> NotificationLite<T> instance() {
33+
return INSTANCE;
34+
}
35+
36+
private static final Object ON_COMPLETED_SENTINEL = new Serializable() {
37+
private static final long serialVersionUID = 1;
38+
};
39+
40+
private static final Object ON_NEXT_NULL_SENTINEL = new Serializable() {
41+
private static final long serialVersionUID = 2;
42+
};
43+
44+
private static class OnErrorSentinel implements Serializable {
45+
private static final long serialVersionUID = 3;
46+
private final Throwable e;
47+
48+
public OnErrorSentinel(Throwable e) {
49+
this.e = e;
50+
}
51+
}
52+
53+
/**
54+
* Creates a lite onNext notification for the value passed in without doing any allocation. Can
55+
* be unwrapped and sent with the {@link #accept} method.
56+
*
57+
* @param t
58+
* @return
59+
*/
60+
public Object next(T t) {
61+
if (t == null)
62+
return ON_NEXT_NULL_SENTINEL;
63+
else
64+
return t;
65+
}
66+
67+
/**
68+
* Creates a lite onComplete notification without doing any allocation. Can be unwrapped and
69+
* sent with the {@link #accept} method.
70+
*
71+
* @return
72+
*/
73+
public Object completed() {
74+
return ON_COMPLETED_SENTINEL;
75+
}
76+
77+
/**
78+
* Create a lite onError notification. This call does new up an object to wrap the
79+
* {@link Throwable} but since there should only be one of these the performance impact should
80+
* be small. Can be unwrapped and sent with the {@link #accept} method.
81+
*
82+
* @param e
83+
* @return
84+
*/
85+
public Object error(Throwable e) {
86+
return new OnErrorSentinel(e);
87+
}
88+
89+
/**
90+
* Unwraps the lite notification and calls the appropriate method on the {@link Observer}.
91+
*
92+
* @param o
93+
* the {@link Observer} to call onNext, onCompleted or onError.
94+
* @param n
95+
* @throws IllegalArgumentException
96+
* if the notification is null.
97+
* @throws NullPointerException
98+
* if the {@link Observer} is null.
99+
*/
100+
@SuppressWarnings("unchecked")
101+
public void accept(Observer<? super T> o, Object n) {
102+
switch (kind(n)) {
103+
case OnNext:
104+
o.onNext(getValue(n));
105+
break;
106+
case OnCompleted:
107+
o.onCompleted();
108+
break;
109+
case OnError:
110+
o.onError(getError(n));
111+
break;
112+
}
113+
}
114+
115+
public boolean isCompleted(Object n) {
116+
return n == ON_COMPLETED_SENTINEL;
117+
}
118+
119+
public boolean isError(Object n) {
120+
return n instanceof OnErrorSentinel;
121+
}
122+
123+
/**
124+
* If there is custom logic that isn't as simple as call the right method on an {@link Observer}
125+
* then this method can be used to get the {@link Notification.Kind}
126+
*
127+
* @param n
128+
* @return
129+
*/
130+
public Kind kind(Object n) {
131+
if (n == null)
132+
throw new IllegalArgumentException("The lite notification can not be null");
133+
else if (n == ON_COMPLETED_SENTINEL)
134+
return Kind.OnCompleted;
135+
else if (n instanceof OnErrorSentinel)
136+
return Kind.OnError;
137+
else
138+
// value or ON_NEXT_NULL_SENTINEL but either way it's an OnNext
139+
return Kind.OnNext;
140+
}
141+
142+
/**
143+
* returns value passed in {@link #next(Object)} method call. Bad things happen if you call this
144+
* the onComplete or onError notification type. For performance you are expected to use this
145+
* when it is appropriate.
146+
*
147+
* @param n
148+
* @return
149+
*/
150+
@SuppressWarnings("unchecked")
151+
public T getValue(Object n) {
152+
return n == ON_NEXT_NULL_SENTINEL ? null : (T) n;
153+
}
154+
155+
/**
156+
* returns {@link Throwable} passed in {@link #error(Throwable)} method call. Bad things happen
157+
* if you
158+
* call this the onComplete or onNext notification type. For performance you are expected to use
159+
* this when it is appropriate.
160+
*
161+
* @param n
162+
* @return The {@link Throwable} wrapped inside n
163+
*/
164+
public Throwable getError(Object n) {
165+
return ((OnErrorSentinel) n).e;
166+
}
167+
}

0 commit comments

Comments
 (0)