Skip to content

Commit 1c00aa6

Browse files
committed
Removal of ConcurrentHashMap from ReplaySubject and some NotificationLite cleanup.
1 parent 263a032 commit 1c00aa6

File tree

6 files changed

+58
-80
lines changed

6 files changed

+58
-80
lines changed

rxjava-core/src/main/java/rx/subjects/AsyncSubject.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ public final class AsyncSubject<T> extends Subject<T, T> {
5454

5555
/**
5656
* Creates and returns a new {@code AsyncSubject}.
57-
*
57+
* @param <T> the result value type
5858
* @return the new {@code AsyncSubject}
5959
*/
6060
public static <T> AsyncSubject<T> create() {
@@ -63,7 +63,7 @@ public static <T> AsyncSubject<T> create() {
6363
@Override
6464
public void call(SubjectObserver<T> o) {
6565
Object v = state.get();
66-
o.accept(v);
66+
o.accept(v, state.nl);
6767
NotificationLite<T> nl = NotificationLite.instance();
6868
if (v == null || (!nl.isCompleted(v) && !nl.isError(v))) {
6969
o.onCompleted();

rxjava-core/src/main/java/rx/subjects/BehaviorSubject.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,7 @@ private static <T> BehaviorSubject<T> create(T defaultValue, boolean hasDefault)
9999

100100
@Override
101101
public void call(SubjectObserver<T> o) {
102-
o.emitFirst(state.get());
102+
o.emitFirst(state.get(), state.nl);
103103
}
104104

105105
};
@@ -121,7 +121,7 @@ public void onCompleted() {
121121
if (last == null || state.active) {
122122
Object n = nl.completed();
123123
for (SubjectObserver<T> bo : state.terminate(n)) {
124-
bo.emitNext(n);
124+
bo.emitNext(n, state.nl);
125125
}
126126
}
127127
}
@@ -132,7 +132,7 @@ public void onError(Throwable e) {
132132
if (last == null || state.active) {
133133
Object n = nl.error(e);
134134
for (SubjectObserver<T> bo : state.terminate(n)) {
135-
bo.emitNext(n);
135+
bo.emitNext(n, state.nl);
136136
}
137137
}
138138
}
@@ -143,7 +143,7 @@ public void onNext(T v) {
143143
if (last == null || state.active) {
144144
Object n = nl.next(v);
145145
for (SubjectObserver<T> bo : state.next(n)) {
146-
bo.emitNext(n);
146+
bo.emitNext(n, state.nl);
147147
}
148148
}
149149
}

rxjava-core/src/main/java/rx/subjects/PublishSubject.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -50,19 +50,19 @@ public final class PublishSubject<T> extends Subject<T, T> {
5050
/**
5151
* Creates and returns a new {@code PublishSubject}.
5252
*
53+
* @param <T> the value type
5354
* @return the new {@code PublishSubject}
5455
*/
5556
public static <T> PublishSubject<T> create() {
5657
final SubjectSubscriptionManager<T> state = new SubjectSubscriptionManager<T>();
57-
state.onAdded = new Action1<SubjectObserver<T>>() {
58+
state.onTerminated = new Action1<SubjectObserver<T>>() {
5859

5960
@Override
6061
public void call(SubjectObserver<T> o) {
61-
o.emitFirst(state.get());
62+
o.emitFirst(state.get(), state.nl);
6263
}
6364

6465
};
65-
state.onTerminated = state.onAdded;
6666
return new PublishSubject<T>(state, state);
6767
}
6868

@@ -79,7 +79,7 @@ public void onCompleted() {
7979
if (state.active) {
8080
Object n = nl.completed();
8181
for (SubjectObserver<T> bo : state.terminate(n)) {
82-
bo.emitNext(n);
82+
bo.emitNext(n, state.nl);
8383
}
8484
}
8585

@@ -90,7 +90,7 @@ public void onError(final Throwable e) {
9090
if (state.active) {
9191
Object n = nl.error(e);
9292
for (SubjectObserver<T> bo : state.terminate(n)) {
93-
bo.emitNext(n);
93+
bo.emitNext(n, state.nl);
9494
}
9595
}
9696
}

rxjava-core/src/main/java/rx/subjects/ReplaySubject.java

Lines changed: 14 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,8 @@
1616
package rx.subjects;
1717

1818
import java.util.ArrayList;
19-
import java.util.concurrent.ConcurrentHashMap;
2019
import java.util.concurrent.TimeUnit;
2120
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
22-
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
2321

2422
import rx.Observer;
2523
import rx.Scheduler;
@@ -98,26 +96,20 @@ public void call(SubjectObserver<T> o) {
9896
int lastIndex = state.replayObserverFromIndex(0, o);
9997

10098
// now that it is caught up add to observers
101-
state.replayState.put(o, lastIndex);
99+
o.index(lastIndex);
102100
}
103101
};
104102
ssm.onTerminated = new Action1<SubjectObserver<T>>() {
105103
@Override
106104
public void call(SubjectObserver<T> o) {
107-
Integer idx = state.replayState.remove(o);
105+
Integer idx = o.index();
108106
if (idx == null) {
109107
idx = 0;
110108
}
111109
// we will finish replaying if there is anything left
112110
state.replayObserverFromIndex(idx, o);
113111
}
114112
};
115-
ssm.onUnsubscribed = new Action1<SubjectObserver<T>>() {
116-
@Override
117-
public void call(SubjectObserver<T> o) {
118-
state.replayState.remove(o);
119-
}
120-
};
121113

122114
return new ReplaySubject<T>(ssm, ssm, state);
123115
}
@@ -273,20 +265,13 @@ static final <T> ReplaySubject<T> createWithState(final BoundedState<T> state,
273265

274266
@Override
275267
public void call(SubjectObserver<T> t1) {
276-
NodeList.Node<Object> l = state.removeState(t1);
268+
NodeList.Node<Object> l = t1.index();
277269
if (l == null) {
278270
l = state.head();
279271
}
280272
state.replayObserverFromIndex(l, t1);
281273
}
282274

283-
};
284-
ssm.onUnsubscribed = new Action1<SubjectObserver<T>>() {
285-
@Override
286-
public void call(SubjectObserver<T> t1) {
287-
state.removeState(t1);
288-
}
289-
290275
};
291276

292277
return new ReplaySubject<T>(ssm, ssm, state);
@@ -341,7 +326,7 @@ public void onCompleted() {
341326
* @return Returns the number of subscribers.
342327
*/
343328
/* Support test. */int subscriberCount() {
344-
return state.replayStateSize();
329+
return ssm.state.observers.length;
345330
}
346331

347332
private boolean caughtUp(SubjectObserver<? super T> o) {
@@ -364,8 +349,6 @@ private boolean caughtUp(SubjectObserver<? super T> o) {
364349
* @param <T> the input and output type
365350
*/
366351
static final class UnboundedReplayState<T> implements ReplayState<T, Integer> {
367-
/** Each Observer is tracked here for what events they have received. */
368-
final ConcurrentHashMap<Observer<? super T>, Integer> replayState;
369352
private final NotificationLite<T> nl = NotificationLite.instance();
370353
/** The buffer. */
371354
private final ArrayList<Object> list;
@@ -378,7 +361,6 @@ static final class UnboundedReplayState<T> implements ReplayState<T, Integer> {
378361
= AtomicIntegerFieldUpdater.newUpdater(UnboundedReplayState.class, "index");
379362
public UnboundedReplayState(int initialCapacity) {
380363
list = new ArrayList<Object>(initialCapacity);
381-
replayState = new ConcurrentHashMap<Observer<? super T>, Integer>();
382364
}
383365

384366
@Override
@@ -417,10 +399,10 @@ public boolean terminated() {
417399

418400
@Override
419401
public void replayObserver(SubjectObserver<? super T> observer) {
420-
Integer lastEmittedLink = replayState.get(observer);
402+
Integer lastEmittedLink = observer.index();
421403
if (lastEmittedLink != null) {
422404
int l = replayObserverFromIndex(lastEmittedLink, observer);
423-
replayState.put(observer, l);
405+
observer.index(l);
424406
} else {
425407
throw new IllegalStateException("failed to find lastEmittedLink for: " + observer);
426408
}
@@ -441,12 +423,6 @@ public Integer replayObserverFromIndex(Integer idx, SubjectObserver<? super T> o
441423
public Integer replayObserverFromIndexTest(Integer idx, SubjectObserver<? super T> observer, long now) {
442424
return replayObserverFromIndex(idx, observer);
443425
}
444-
445-
@Override
446-
public int replayStateSize() {
447-
return replayState.size();
448-
}
449-
450426
}
451427

452428

@@ -456,7 +432,6 @@ public int replayStateSize() {
456432
*/
457433
static final class BoundedState<T> implements ReplayState<T, NodeList.Node<Object>> {
458434
final NodeList<Object> list;
459-
final ConcurrentHashMap<Observer<? super T>, NodeList.Node<Object>> replayState;
460435
final EvictionPolicy evictionPolicy;
461436
final Func1<Object, Object> enterTransform;
462437
final Func1<Object, Object> leaveTransform;
@@ -468,7 +443,6 @@ public BoundedState(EvictionPolicy evictionPolicy, Func1<Object, Object> enterTr
468443
Func1<Object, Object> leaveTransform) {
469444
this.list = new NodeList<Object>();
470445
this.tail = list.tail;
471-
this.replayState = new ConcurrentHashMap<Observer<? super T>, NodeList.Node<Object>>();
472446
this.evictionPolicy = evictionPolicy;
473447
this.enterTransform = enterTransform;
474448
this.leaveTransform = leaveTransform;
@@ -525,21 +499,11 @@ public Node<Object> head() {
525499
public Node<Object> tail() {
526500
return tail;
527501
}
528-
public Node<Object> removeState(SubjectObserver<? super T> o) {
529-
return replayState.remove(o);
530-
}
531-
public void addState(SubjectObserver<? super T> o, Node<Object> state) {
532-
if (state == null) {
533-
throw new IllegalStateException("Null state!");
534-
} else {
535-
replayState.put(o, state);
536-
}
537-
}
538502
@Override
539503
public void replayObserver(SubjectObserver<? super T> observer) {
540-
NodeList.Node<Object> lastEmittedLink = replayState.get(observer);
504+
NodeList.Node<Object> lastEmittedLink = observer.index();
541505
NodeList.Node<Object> l = replayObserverFromIndex(lastEmittedLink, observer);
542-
addState(observer, l);
506+
observer.index(l);
543507
}
544508

545509
@Override
@@ -565,11 +529,6 @@ public NodeList.Node<Object> replayObserverFromIndexTest(
565529
public boolean terminated() {
566530
return terminated;
567531
}
568-
569-
@Override
570-
public int replayStateSize() {
571-
return replayState.size();
572-
}
573532
}
574533

575534
// **************
@@ -584,6 +543,10 @@ public int replayStateSize() {
584543
interface ReplayState<T, I> {
585544
/** @return true if the subject has reached a terminal state. */
586545
boolean terminated();
546+
/**
547+
* Replay contents to the given observer.
548+
* @param observer the receiver of events
549+
*/
587550
void replayObserver(SubjectObserver<? super T> observer);
588551
/**
589552
* Replay the buffered values from an index position and return a new index
@@ -601,10 +564,6 @@ I replayObserverFromIndex(
601564
*/
602565
I replayObserverFromIndexTest(
603566
I idx, SubjectObserver<? super T> observer, long now);
604-
/**
605-
* @return the size of the replay state map for testing purposes.
606-
*/
607-
int replayStateSize();
608567
/**
609568
* Add an OnNext value to the buffer
610569
* @param value the value to add
@@ -756,7 +715,7 @@ public DefaultOnAdd(BoundedState<T> state) {
756715
@Override
757716
public void call(SubjectObserver<T> t1) {
758717
NodeList.Node<Object> l = state.replayObserverFromIndex(state.head(), t1);
759-
state.addState(t1, l);
718+
t1.index(l);
760719
}
761720

762721
}
@@ -783,7 +742,7 @@ public void call(SubjectObserver<T> t1) {
783742
// accept all if terminated
784743
l = state.replayObserverFromIndex(state.head(), t1);
785744
}
786-
state.addState(t1, l);
745+
t1.index(l);
787746
}
788747

789748
}

0 commit comments

Comments
 (0)