Skip to content

Commit 8425eff

Browse files
committed
Performance improvement suggestions.
1 parent a144b0e commit 8425eff

File tree

5 files changed

+109
-32
lines changed

5 files changed

+109
-32
lines changed

rxjava-core/src/main/java/rx/operators/SafeObserver.java

Lines changed: 26 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -57,10 +57,25 @@
5757
*/
5858
public class SafeObserver<T> implements Observer<T> {
5959

60-
private final Observer<? super T> actual;
60+
private volatile Observer<? super T> actual;
6161
private final AtomicBoolean isFinished = new AtomicBoolean(false);
6262
private final SafeObservableSubscription subscription;
63+
/**
64+
* If the observer completes, this is swapped in place of the actual
65+
* should avoid the overhead of isFinished.get() on every onNext call. */
66+
private static final Observer<Object> nopObserver = new Observer<Object>() {
67+
@Override
68+
public void onNext(Object args) {
69+
}
70+
@Override
71+
public void onError(Throwable e) {
72+
}
6373

74+
@Override
75+
public void onCompleted() {
76+
}
77+
78+
};
6479
public SafeObserver(SafeObservableSubscription subscription, Observer<? super T> actual) {
6580
this.subscription = subscription;
6681
this.actual = actual;
@@ -69,8 +84,10 @@ public SafeObserver(SafeObservableSubscription subscription, Observer<? super T>
6984
@Override
7085
public void onCompleted() {
7186
if (isFinished.compareAndSet(false, true)) {
87+
Observer<? super T> a = actual;
88+
actual = nopObserver;
7289
try {
73-
actual.onCompleted();
90+
a.onCompleted();
7491
} catch (Throwable e) {
7592
// handle errors if the onCompleted implementation fails, not just if the Observable fails
7693
onError(e);
@@ -83,8 +100,11 @@ public void onCompleted() {
83100
@Override
84101
public void onError(Throwable e) {
85102
if (isFinished.compareAndSet(false, true)) {
103+
Observer<? super T> a = actual;
104+
// will prevent onNext from sending a new value after completion
105+
actual = nopObserver;
86106
try {
87-
actual.onError(e);
107+
a.onError(e);
88108
} catch (Throwable e2) {
89109
if (e2 instanceof OnErrorNotImplementedException) {
90110
/**
@@ -117,12 +137,10 @@ public void onError(Throwable e) {
117137
@Override
118138
public void onNext(T args) {
119139
try {
120-
if (!isFinished.get()) {
121-
actual.onNext(args);
122-
}
123-
} catch (Throwable e) {
140+
actual.onNext(args);
141+
} catch (Throwable t) {
124142
// handle errors if the onNext implementation fails, not just if the Observable fails
125-
onError(e);
143+
onError(t);
126144
}
127145
}
128146

rxjava-core/src/main/java/rx/subjects/BehaviorSubject.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -173,7 +173,7 @@ public void onNext(T v) {
173173
* Store the latest value but do not send it. It only gets sent when 'onCompleted' occurs.
174174
*/
175175
lastNotification.set(new Notification<T>(v));
176-
for (Observer<? super T> o : subscriptionManager.snapshotOfObservers()) {
176+
for (Observer<? super T> o : subscriptionManager.rawSnapshot()) {
177177
o.onNext(v);
178178
}
179179
}

rxjava-core/src/main/java/rx/subjects/PublishSubject.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -124,7 +124,7 @@ public void call(Collection<SubjectObserver<? super T>> observers) {
124124

125125
@Override
126126
public void onNext(T v) {
127-
for (Observer<? super T> o : subscriptionManager.snapshotOfObservers()) {
127+
for (Observer<? super T> o : subscriptionManager.rawSnapshot()) {
128128
o.onNext(v);
129129
}
130130
}

rxjava-core/src/main/java/rx/subjects/ReplaySubject.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -143,7 +143,7 @@ public void onNext(T v) {
143143
return;
144144
}
145145
state.history.next(v);
146-
for (SubjectObserver<? super T> o : subscriptionManager.snapshotOfObservers()) {
146+
for (SubjectObserver<? super T> o : subscriptionManager.rawSnapshot()) {
147147
if (caughtUp(o)) {
148148
o.onNext(v);
149149
}
@@ -198,7 +198,7 @@ private static <T> int replayObserverFromIndex(History<T> history, Integer l, Su
198198
*/
199199
private static class History<T> {
200200
private AtomicInteger index = new AtomicInteger(0);
201-
private final ArrayList<T> list = new ArrayList<T>();
201+
private final ArrayList<T> list = new ArrayList<T>(/* 1024 */);
202202
private AtomicReference<Notification<T>> terminalValue = new AtomicReference<Notification<T>>();
203203

204204
public boolean next(T n) {

rxjava-core/src/main/java/rx/subjects/SubjectSubscriptionManager.java

Lines changed: 79 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -15,12 +15,12 @@
1515
*/
1616
package rx.subjects;
1717

18+
import java.util.ArrayList;
19+
import java.util.Arrays;
1820
import java.util.Collection;
1921
import java.util.Collections;
20-
import java.util.HashMap;
21-
import java.util.Map;
22+
import java.util.List;
2223
import java.util.concurrent.CountDownLatch;
23-
import java.util.concurrent.atomic.AtomicBoolean;
2424
import java.util.concurrent.atomic.AtomicReference;
2525

2626
import rx.Observable.OnSubscribeFunc;
@@ -126,7 +126,8 @@ protected void terminate(Action1<Collection<SubjectObserver<? super T>>> onTermi
126126
* inated)
127127
*/
128128
try {
129-
onTerminate.call(newState.observers.values());
129+
// had to circumvent type check, we know what the array contains
130+
onTerminate.call((Collection)newState.observersList);
130131
} finally {
131132
// mark that termination is completed
132133
newState.terminationLatch.countDown();
@@ -143,46 +144,104 @@ protected void terminate(Action1<Collection<SubjectObserver<? super T>>> onTermi
143144
*
144145
* @return List<Observer<T>>
145146
*/
146-
public Collection<SubjectObserver<? super T>> snapshotOfObservers() {
147-
// we don't need to copy since state is immutable
148-
return state.get().observers.values();
147+
private Collection<SubjectObserver<? super T>> snapshotOfObservers() {
148+
// had to circumvent type check, we know what the array contains
149+
return (Collection)state.get().observersList;
150+
}
151+
/**
152+
* Returns the array of observers directly.
153+
* <em>Don't modify the array!</em>
154+
* @return the array of current observers
155+
*/
156+
public SubjectObserver<Object>[] rawSnapshot() {
157+
return state.get().observers;
149158
}
150159

151160
protected static class State<T> {
152161
final boolean terminated;
153162
final CountDownLatch terminationLatch;
154-
final Map<Subscription, SubjectObserver<? super T>> observers;
155-
156-
private State(boolean isTerminated, CountDownLatch terminationLatch, Map<Subscription, SubjectObserver<? super T>> observers) {
163+
final Subscription[] subscriptions;
164+
final SubjectObserver<Object>[] observers;
165+
// to avoid lots of empty arrays
166+
final Subscription[] EMPTY_S = new Subscription[0];
167+
@SuppressWarnings("rawtypes")
168+
// to avoid lots of empty arrays
169+
final SubjectObserver[] EMPTY_O = new SubjectObserver[0];
170+
@SuppressWarnings("rawtypes")
171+
final List<SubjectObserver<Object>> observersList;
172+
private State(boolean isTerminated, CountDownLatch terminationLatch,
173+
Subscription[] subscriptions, SubjectObserver[] observers) {
157174
this.terminationLatch = terminationLatch;
158175
this.terminated = isTerminated;
159-
this.observers = Collections.unmodifiableMap(observers);
176+
this.subscriptions = subscriptions;
177+
this.observers = observers;
178+
this.observersList = Arrays.asList(this.observers);
160179
}
161180

162181
State() {
163182
this.terminated = false;
164183
this.terminationLatch = null;
165-
this.observers = Collections.emptyMap();
184+
this.subscriptions = EMPTY_S;
185+
this.observers = EMPTY_O;
186+
observersList = Collections.emptyList();
166187
}
167188

168189
public State<T> terminate() {
169190
if (terminated) {
170191
throw new IllegalStateException("Already terminated.");
171192
}
172-
return new State<T>(true, new CountDownLatch(1), observers);
193+
return new State<T>(true, new CountDownLatch(1), subscriptions, observers);
173194
}
174195

175196
public State<T> addObserver(Subscription s, SubjectObserver<? super T> observer) {
176-
Map<Subscription, SubjectObserver<? super T>> newMap = new HashMap<Subscription, SubjectObserver<? super T>>();
177-
newMap.putAll(observers);
178-
newMap.put(s, observer);
179-
return new State<T>(terminated, terminationLatch, newMap);
197+
int n = this.observers.length;
198+
199+
Subscription[] newsubscriptions = Arrays.copyOf(this.subscriptions, n + 1);
200+
SubjectObserver[] newobservers = Arrays.copyOf(this.observers, n + 1);
201+
202+
newsubscriptions[n] = s;
203+
newobservers[n] = observer;
204+
205+
return createNewWith(newsubscriptions, newobservers);
206+
}
207+
private State<T> createNewWith(Subscription[] newsubscriptions, SubjectObserver[] newobservers) {
208+
return new State<T>(terminated, terminationLatch, newsubscriptions, newobservers);
180209
}
181210

182211
public State<T> removeObserver(Subscription s) {
183-
Map<Subscription, SubjectObserver<? super T>> newMap = new HashMap<Subscription, SubjectObserver<? super T>>(observers);
184-
newMap.remove(s);
185-
return new State<T>(terminated, terminationLatch, newMap);
212+
// we are empty, nothing to remove
213+
if (this.observers.length == 0) {
214+
return this;
215+
}
216+
int n = Math.max(this.observers.length - 1, 1);
217+
int copied = 0;
218+
Subscription[] newsubscriptions = Arrays.copyOf(this.subscriptions, n);
219+
SubjectObserver[] newobservers = Arrays.copyOf(this.observers, n);
220+
221+
for (int i = 0; i < this.subscriptions.length; i++) {
222+
Subscription s0 = this.subscriptions[i];
223+
if (s0 != s) {
224+
if (copied == n) {
225+
// if s was not found till the end of the iteration
226+
// we return ourselves since no modification should
227+
// have happened
228+
return this;
229+
}
230+
newsubscriptions[copied] = s0;
231+
newobservers[copied] = this.observers[i];
232+
copied++;
233+
}
234+
}
235+
236+
if (copied == 0) {
237+
return createNewWith(EMPTY_S, EMPTY_O);
238+
}
239+
// if somehow copied less than expected, truncate the arrays
240+
// if s is unique, this should never happen
241+
if (copied < n) {
242+
return createNewWith(Arrays.copyOf(newsubscriptions, copied), Arrays.copyOf(newobservers, copied));
243+
}
244+
return createNewWith(newsubscriptions, newobservers);
186245
}
187246
}
188247

0 commit comments

Comments
 (0)