Skip to content

Commit 2b47efe

Browse files
davidmotenakarnokd
authored andcommitted
NotificationLite - reduce allocations (#4621)
1 parent 2e06d4e commit 2b47efe

33 files changed

+194
-284
lines changed

src/main/java/rx/internal/operators/BlockingOperatorMostRecent.java

Lines changed: 9 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -62,26 +62,25 @@ public Iterator<T> iterator() {
6262
}
6363

6464
static final class MostRecentObserver<T> extends Subscriber<T> {
65-
final NotificationLite<T> nl = NotificationLite.instance();
6665
volatile Object value;
6766

6867
MostRecentObserver(T value) {
69-
this.value = nl.next(value);
68+
this.value = NotificationLite.next(value);
7069
}
7170

7271
@Override
7372
public void onCompleted() {
74-
value = nl.completed();
73+
value = NotificationLite.completed();
7574
}
7675

7776
@Override
7877
public void onError(Throwable e) {
79-
value = nl.error(e);
78+
value = NotificationLite.error(e);
8079
}
8180

8281
@Override
8382
public void onNext(T args) {
84-
value = nl.next(args);
83+
value = NotificationLite.next(args);
8584
}
8685

8786
/**
@@ -99,7 +98,7 @@ public Iterator<T> getIterable() {
9998
@Override
10099
public boolean hasNext() {
101100
buf = value;
102-
return !nl.isCompleted(buf);
101+
return !NotificationLite.isCompleted(buf);
103102
}
104103

105104
@Override
@@ -109,13 +108,13 @@ public T next() {
109108
if (buf == null) {
110109
buf = value;
111110
}
112-
if (nl.isCompleted(buf)) {
111+
if (NotificationLite.isCompleted(buf)) {
113112
throw new NoSuchElementException();
114113
}
115-
if (nl.isError(buf)) {
116-
throw Exceptions.propagate(nl.getError(buf));
114+
if (NotificationLite.isError(buf)) {
115+
throw Exceptions.propagate(NotificationLite.getError(buf));
117116
}
118-
return nl.getValue(buf);
117+
return NotificationLite.getValue(buf);
119118
}
120119
finally {
121120
buf = null;

src/main/java/rx/internal/operators/BufferUntilSubscriber.java

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,6 @@ static final class State<T> extends AtomicReference<Observer<? super T>> {
7171
boolean emitting;
7272

7373
final ConcurrentLinkedQueue<Object> buffer = new ConcurrentLinkedQueue<Object>();
74-
final NotificationLite<T> nl = NotificationLite.instance();
7574

7675
boolean casObserverRef(Observer<? super T> expected, Observer<? super T> next) {
7776
return compareAndSet(expected, next);
@@ -103,11 +102,10 @@ public void call() {
103102
}
104103
}
105104
if (win) {
106-
final NotificationLite<T> nl = NotificationLite.instance();
107105
while(true) {
108106
Object o;
109107
while ((o = state.buffer.poll()) != null) {
110-
nl.accept(state.get(), o);
108+
NotificationLite.accept(state.get(), o);
111109
}
112110
synchronized (state.guard) {
113111
if (state.buffer.isEmpty()) {
@@ -145,7 +143,7 @@ private void emit(Object v) {
145143
if (forward) {
146144
Object o;
147145
while ((o = state.buffer.poll()) != null) {
148-
state.nl.accept(state.get(), o);
146+
NotificationLite.accept(state.get(), o);
149147
}
150148
// Because `emit(Object v)` will be called in sequence,
151149
// no event will be put into `buffer` after we drain it.
@@ -158,7 +156,7 @@ public void onCompleted() {
158156
state.get().onCompleted();
159157
}
160158
else {
161-
emit(state.nl.completed());
159+
emit(NotificationLite.completed());
162160
}
163161
}
164162

@@ -168,7 +166,7 @@ public void onError(Throwable e) {
168166
state.get().onError(e);
169167
}
170168
else {
171-
emit(state.nl.error(e));
169+
emit(NotificationLite.error(e));
172170
}
173171
}
174172

@@ -178,7 +176,7 @@ public void onNext(T t) {
178176
state.get().onNext(t);
179177
}
180178
else {
181-
emit(state.nl.next(t));
179+
emit(NotificationLite.next(t));
182180
}
183181
}
184182

src/main/java/rx/internal/operators/CachedObservable.java

Lines changed: 9 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -102,8 +102,6 @@ static final class CacheState<T> extends LinkedArrayList implements Observer<T>
102102
/** The default empty array of producers. */
103103
static final ReplayProducer<?>[] EMPTY = new ReplayProducer<?>[0];
104104

105-
final NotificationLite<T> nl;
106-
107105
/** Set to true after connection. */
108106
volatile boolean isConnected;
109107
/**
@@ -116,7 +114,6 @@ public CacheState(Observable<? extends T> source, int capacityHint) {
116114
super(capacityHint);
117115
this.source = source;
118116
this.producers = EMPTY;
119-
this.nl = NotificationLite.instance();
120117
this.connection = new SerialSubscription();
121118
}
122119
/**
@@ -189,7 +186,7 @@ public void onCompleted() {
189186
@Override
190187
public void onNext(T t) {
191188
if (!sourceDone) {
192-
Object o = nl.next(t);
189+
Object o = NotificationLite.next(t);
193190
add(o);
194191
dispatch();
195192
}
@@ -198,7 +195,7 @@ public void onNext(T t) {
198195
public void onError(Throwable e) {
199196
if (!sourceDone) {
200197
sourceDone = true;
201-
Object o = nl.error(e);
198+
Object o = NotificationLite.error(e);
202199
add(o);
203200
connection.unsubscribe();
204201
dispatch();
@@ -208,7 +205,7 @@ public void onError(Throwable e) {
208205
public void onCompleted() {
209206
if (!sourceDone) {
210207
sourceDone = true;
211-
Object o = nl.completed();
208+
Object o = NotificationLite.completed();
212209
add(o);
213210
connection.unsubscribe();
214211
dispatch();
@@ -347,7 +344,6 @@ public void replay() {
347344
}
348345
boolean skipFinal = false;
349346
try {
350-
final NotificationLite<T> nl = state.nl;
351347
final Subscriber<? super T> child = this.child;
352348

353349
for (;;) {
@@ -376,14 +372,14 @@ public void replay() {
376372
// eagerly emit any terminal event
377373
if (r == 0) {
378374
Object o = b[k];
379-
if (nl.isCompleted(o)) {
375+
if (NotificationLite.isCompleted(o)) {
380376
child.onCompleted();
381377
skipFinal = true;
382378
unsubscribe();
383379
return;
384380
} else
385-
if (nl.isError(o)) {
386-
child.onError(nl.getError(o));
381+
if (NotificationLite.isError(o)) {
382+
child.onError(NotificationLite.getError(o));
387383
skipFinal = true;
388384
unsubscribe();
389385
return;
@@ -404,7 +400,7 @@ public void replay() {
404400
Object o = b[k];
405401

406402
try {
407-
if (nl.accept(child, o)) {
403+
if (NotificationLite.accept(child, o)) {
408404
skipFinal = true;
409405
unsubscribe();
410406
return;
@@ -413,8 +409,8 @@ public void replay() {
413409
Exceptions.throwIfFatal(err);
414410
skipFinal = true;
415411
unsubscribe();
416-
if (!nl.isError(o) && !nl.isCompleted(o)) {
417-
child.onError(OnErrorThrowable.addValueAsLastCause(err, nl.getValue(o)));
412+
if (!NotificationLite.isError(o) && !NotificationLite.isCompleted(o)) {
413+
child.onError(OnErrorThrowable.addValueAsLastCause(err, NotificationLite.getValue(o)));
418414
}
419415
return;
420416
}

src/main/java/rx/internal/operators/NotificationLite.java

Lines changed: 11 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -34,24 +34,11 @@
3434
* @param <T> the element type
3535
*/
3636
public final class NotificationLite<T> {
37-
@SuppressWarnings("rawtypes")
38-
private static final NotificationLite INSTANCE = new NotificationLite();
3937

4038
private NotificationLite() {
4139
// singleton
4240
}
4341

44-
/**
45-
* Gets the {@code NotificationLite} singleton.
46-
*
47-
* @param <T> the value type
48-
* @return the sole {@code NotificationLite} object
49-
*/
50-
@SuppressWarnings("unchecked")
51-
public static <T> NotificationLite<T> instance() {
52-
return INSTANCE;
53-
}
54-
5542
private static final Object ON_COMPLETED_SENTINEL = new Serializable() {
5643
private static final long serialVersionUID = 1;
5744

@@ -92,7 +79,7 @@ public String toString() {
9279
* the item emitted to {@code onNext}
9380
* @return the item, or a null token representing the item if the item is {@code null}
9481
*/
95-
public Object next(T t) {
82+
public static <T> Object next(T t) {
9683
if (t == null) {
9784
return ON_NEXT_NULL_SENTINEL;
9885
} else {
@@ -106,7 +93,7 @@ public Object next(T t) {
10693
*
10794
* @return a completion token
10895
*/
109-
public Object completed() {
96+
public static Object completed() {
11097
return ON_COMPLETED_SENTINEL;
11198
}
11299

@@ -119,7 +106,7 @@ public Object completed() {
119106
* the {@code Throwable} in the {@code onError} notification
120107
* @return an object encapsulating the exception
121108
*/
122-
public Object error(Throwable e) {
109+
public static Object error(Throwable e) {
123110
return new OnErrorSentinel(e);
124111
}
125112

@@ -137,7 +124,7 @@ public Object error(Throwable e) {
137124
* if the {@link Observer} is null.
138125
*/
139126
@SuppressWarnings("unchecked")
140-
public boolean accept(Observer<? super T> o, Object n) {
127+
public static <T> boolean accept(Observer<? super T> o, Object n) {
141128
if (n == ON_COMPLETED_SENTINEL) {
142129
o.onCompleted();
143130
return true;
@@ -163,7 +150,7 @@ public boolean accept(Observer<? super T> o, Object n) {
163150
* the lite notification
164151
* @return {@code true} if {@code n} represents an {@code onCompleted} event; {@code false} otherwise
165152
*/
166-
public boolean isCompleted(Object n) {
153+
public static boolean isCompleted(Object n) {
167154
return n == ON_COMPLETED_SENTINEL;
168155
}
169156

@@ -174,7 +161,7 @@ public boolean isCompleted(Object n) {
174161
* the lite notification
175162
* @return {@code true} if {@code n} represents an {@code onError} event; {@code false} otherwise
176163
*/
177-
public boolean isError(Object n) {
164+
public static boolean isError(Object n) {
178165
return n instanceof OnErrorSentinel;
179166
}
180167

@@ -183,7 +170,7 @@ public boolean isError(Object n) {
183170
* @param n the lite notification
184171
* @return {@code true} if {@code n} represents a wrapped {@code null} {@code onNext} event, {@code false} otherwise
185172
*/
186-
public boolean isNull(Object n) {
173+
public static boolean isNull(Object n) {
187174
return n == ON_NEXT_NULL_SENTINEL;
188175
}
189176

@@ -192,7 +179,7 @@ public boolean isNull(Object n) {
192179
* @param n the lite notification
193180
* @return {@code true} if {@code n} represents an {@code onNext} event, {@code false} otherwise
194181
*/
195-
public boolean isNext(Object n) {
182+
public static boolean isNext(Object n) {
196183
return n != null && !isError(n) && !isCompleted(n);
197184
}
198185
/**
@@ -207,7 +194,7 @@ public boolean isNext(Object n) {
207194
* @return the {@link Kind} of lite notification {@code n} is: either {@code Kind.OnCompleted},
208195
* {@code Kind.OnError}, or {@code Kind.OnNext}
209196
*/
210-
public Kind kind(Object n) {
197+
public static Kind kind(Object n) {
211198
if (n == null) {
212199
throw new IllegalArgumentException("The lite notification can not be null");
213200
} else if (n == ON_COMPLETED_SENTINEL) {
@@ -230,7 +217,7 @@ public Kind kind(Object n) {
230217
* @return the unwrapped value, which can be null
231218
*/
232219
@SuppressWarnings("unchecked")
233-
public T getValue(Object n) {
220+
public static <T> T getValue(Object n) {
234221
return n == ON_NEXT_NULL_SENTINEL ? null : (T) n;
235222
}
236223

@@ -243,7 +230,7 @@ public T getValue(Object n) {
243230
* the lite notification (of type {@code Kind.OnError})
244231
* @return the {@link Throwable} wrapped inside {@code n}
245232
*/
246-
public Throwable getError(Object n) {
233+
public static Throwable getError(Object n) {
247234
return ((OnErrorSentinel) n).e;
248235
}
249236
}

src/main/java/rx/internal/operators/OnSubscribeCombineLatest.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -202,7 +202,7 @@ void combine(Object value, int index) {
202202
if (value == null) {
203203
complete = ++completedCount;
204204
} else {
205-
latest[index] = combinerSubscriber.nl.getValue(value);
205+
latest[index] = NotificationLite.getValue(value);
206206
}
207207
allSourcesFinished = activeCount == sourceCount;
208208
// see if either all sources completed
@@ -358,14 +358,12 @@ void onError(Throwable e) {
358358
static final class CombinerSubscriber<T, R> extends Subscriber<T> {
359359
final LatestCoordinator<T, R> parent;
360360
final int index;
361-
final NotificationLite<T> nl;
362361

363362
boolean done;
364363

365364
public CombinerSubscriber(LatestCoordinator<T, R> parent, int index) {
366365
this.parent = parent;
367366
this.index = index;
368-
this.nl = NotificationLite.instance();
369367
request(parent.bufferSize);
370368
}
371369

@@ -374,7 +372,7 @@ public void onNext(T t) {
374372
if (done) {
375373
return;
376374
}
377-
parent.combine(nl.next(t), index);
375+
parent.combine(NotificationLite.next(t), index);
378376
}
379377

380378
@Override

src/main/java/rx/internal/operators/OnSubscribeConcatMap.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -137,7 +137,7 @@ public ConcatMapSubscriber(Subscriber<? super R> actual,
137137

138138
@Override
139139
public void onNext(T t) {
140-
if (!queue.offer(NotificationLite.instance().next(t))) {
140+
if (!queue.offer(NotificationLite.next(t))) {
141141
unsubscribe();
142142
onError(new MissingBackpressureException());
143143
} else {
@@ -256,7 +256,7 @@ void drain() {
256256
Observable<? extends R> source;
257257

258258
try {
259-
source = mapper.call(NotificationLite.<T>instance().getValue(v));
259+
source = mapper.call(NotificationLite.<T>getValue(v));
260260
} catch (Throwable mapperError) {
261261
Exceptions.throwIfFatal(mapperError);
262262
drainError(mapperError);

0 commit comments

Comments
 (0)