Skip to content

Commit 9561bfc

Browse files
committed
Added create with initial capacity.
1 parent 8425eff commit 9561bfc

File tree

2 files changed

+20
-26
lines changed

2 files changed

+20
-26
lines changed

rxjava-core/src/main/java/rx/subjects/ReplaySubject.java

Lines changed: 18 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -50,10 +50,12 @@
5050
* @param <T>
5151
*/
5252
public final class ReplaySubject<T> extends Subject<T, T> {
53-
5453
public static <T> ReplaySubject<T> create() {
54+
return create(16);
55+
}
56+
public static <T> ReplaySubject<T> create(int initialCapacity) {
5557
final SubjectSubscriptionManager<T> subscriptionManager = new SubjectSubscriptionManager<T>();
56-
final ReplayState<T> state = new ReplayState<T>();
58+
final ReplayState<T> state = new ReplayState<T>(initialCapacity);
5759

5860
OnSubscribeFunc<T> onSubscribe = subscriptionManager.getOnSubscribeFunc(
5961
/**
@@ -91,9 +93,13 @@ public void call(SubjectObserver<? super T> o) {
9193

9294
private static class ReplayState<T> {
9395
// single-producer, multi-consumer
94-
final History<T> history = new History<T>();
96+
final History<T> history;
9597
// each Observer is tracked here for what events they have received
96-
final ConcurrentHashMap<Observer<? super T>, Integer> replayState = new ConcurrentHashMap<Observer<? super T>, Integer>();
98+
final ConcurrentHashMap<Observer<? super T>, Integer> replayState;
99+
public ReplayState(int initialCapacity) {
100+
history = new History<T>(initialCapacity);
101+
replayState = new ConcurrentHashMap<Observer<? super T>, Integer>();
102+
}
97103
}
98104

99105
private final SubjectSubscriptionManager<T> subscriptionManager;
@@ -197,10 +203,14 @@ private static <T> int replayObserverFromIndex(History<T> history, Integer l, Su
197203
* @param <T>
198204
*/
199205
private static class History<T> {
200-
private AtomicInteger index = new AtomicInteger(0);
201-
private final ArrayList<T> list = new ArrayList<T>(/* 1024 */);
202-
private AtomicReference<Notification<T>> terminalValue = new AtomicReference<Notification<T>>();
203-
206+
private final AtomicInteger index;
207+
private final ArrayList<T> list;
208+
private final AtomicReference<Notification<T>> terminalValue;
209+
public History(int initialCapacity) {
210+
index = new AtomicInteger(0);
211+
list = new ArrayList<T>(initialCapacity);
212+
terminalValue = new AtomicReference<Notification<T>>();
213+
}
204214
public boolean next(T n) {
205215
if (terminalValue.get() == null) {
206216
list.add(n);

rxjava-core/src/main/java/rx/subjects/SubjectSubscriptionManager.java

Lines changed: 2 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@
1515
*/
1616
package rx.subjects;
1717

18-
import java.util.ArrayList;
1918
import java.util.Arrays;
2019
import java.util.Collection;
2120
import java.util.Collections;
@@ -52,7 +51,7 @@ public Subscription onSubscribe(Observer<? super T> actualObserver) {
5251
onSubscribe.call(observer);
5352
}
5453

55-
State<T> current = null;
54+
State<T> current;
5655
State<T> newState = null;
5756
boolean addedObserver = false;
5857
Subscription s;
@@ -107,7 +106,7 @@ public void unsubscribe() {
107106
}
108107

109108
protected void terminate(Action1<Collection<SubjectObserver<? super T>>> onTerminate) {
110-
State<T> current = null;
109+
State<T> current;
111110
State<T> newState = null;
112111
do {
113112
current = state.get();
@@ -133,21 +132,6 @@ protected void terminate(Action1<Collection<SubjectObserver<? super T>>> onTermi
133132
newState.terminationLatch.countDown();
134133
}
135134
}
136-
137-
/**
138-
* Current snapshot of 'state.observers.keySet()' so that concurrent modifications aren't included.
139-
*
140-
* This makes it behave deterministically in a single-threaded execution when nesting subscribes.
141-
*
142-
* In multi-threaded execution it will cause new subscriptions to wait until the following onNext instead
143-
* of possibly being included in the current onNext iteration.
144-
*
145-
* @return List<Observer<T>>
146-
*/
147-
private Collection<SubjectObserver<? super T>> snapshotOfObservers() {
148-
// had to circumvent type check, we know what the array contains
149-
return (Collection)state.get().observersList;
150-
}
151135
/**
152136
* Returns the array of observers directly.
153137
* <em>Don't modify the array!</em>

0 commit comments

Comments
 (0)