Skip to content

Commit 44c3739

Browse files
Merge pull request #651 from benjchristensen/subjects-fixes
Subjects Refactor - Non-Blocking, Common Abstraction, Performance
2 parents 1586113 + 4af68d8 commit 44c3739

13 files changed

+1246
-660
lines changed

rxjava-core/src/main/java/rx/subjects/AbstractSubject.java

Lines changed: 0 additions & 167 deletions
This file was deleted.

rxjava-core/src/main/java/rx/subjects/AsyncSubject.java

Lines changed: 66 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,13 @@
1515
*/
1616
package rx.subjects;
1717

18+
import java.util.Collection;
19+
import java.util.concurrent.atomic.AtomicReference;
20+
1821
import rx.Notification;
1922
import rx.Observer;
20-
import rx.util.functions.Action2;
23+
import rx.subjects.SubjectSubscriptionManager.SubjectObserver;
24+
import rx.util.functions.Action1;
2125

2226
/**
2327
* Subject that publishes only the last event to each {@link Observer} that has subscribed when the
@@ -28,8 +32,8 @@
2832
* Example usage:
2933
* <p>
3034
* <pre> {@code
31-
32-
* / observer will receive no onNext events because the subject.onCompleted() isn't called.
35+
*
36+
* // observer will receive no onNext events because the subject.onCompleted() isn't called.
3337
AsyncSubject<Object> subject = AsyncSubject.create();
3438
subject.subscribe(observer);
3539
subject.onNext("one");
@@ -48,68 +52,88 @@
4852
*
4953
* @param <T>
5054
*/
51-
public class AsyncSubject<T> extends AbstractSubject<T> {
55+
public final class AsyncSubject<T> extends Subject<T, T> {
5256

53-
/**
54-
* Create a new AsyncSubject
55-
*
56-
* @return a new AsyncSubject
57-
*/
5857
public static <T> AsyncSubject<T> create() {
59-
final SubjectState<T> state = new SubjectState<T>();
60-
OnSubscribeFunc<T> onSubscribe = getOnSubscribeFunc(state, new Action2<SubjectState<T>, Observer<? super T>>() {
58+
final SubjectSubscriptionManager<T> subscriptionManager = new SubjectSubscriptionManager<T>();
59+
final AtomicReference<Notification<T>> lastNotification = new AtomicReference<Notification<T>>(new Notification<T>());
6160

62-
@Override
63-
public void call(SubjectState<T> state, Observer<? super T> o) {
64-
// we want the last value + completed so add this extra logic
65-
// to send onCompleted if the last value is an onNext
66-
if (state.completed.get()) {
67-
Notification<T> value = state.currentValue.get();
68-
if (value != null && value.isOnNext()) {
69-
o.onCompleted();
61+
OnSubscribeFunc<T> onSubscribe = subscriptionManager.getOnSubscribeFunc(
62+
/**
63+
* This function executes at beginning of subscription.
64+
*
65+
* This will always run, even if Subject is in terminal state.
66+
*/
67+
new Action1<SubjectObserver<? super T>>() {
68+
69+
@Override
70+
public void call(SubjectObserver<? super T> o) {
71+
// nothing to do if not terminated
7072
}
71-
}
72-
}
73-
});
74-
return new AsyncSubject<T>(onSubscribe, state);
73+
},
74+
/**
75+
* This function executes if the Subject is terminated.
76+
*/
77+
new Action1<SubjectObserver<? super T>>() {
78+
79+
@Override
80+
public void call(SubjectObserver<? super T> o) {
81+
// we want the last value + completed so add this extra logic
82+
// to send onCompleted if the last value is an onNext
83+
emitValueToObserver(lastNotification.get(), o);
84+
}
85+
});
86+
87+
return new AsyncSubject<T>(onSubscribe, subscriptionManager, lastNotification);
88+
}
89+
90+
protected static <T> void emitValueToObserver(Notification<T> n, Observer<? super T> o) {
91+
n.accept(o);
92+
if (n.isOnNext()) {
93+
o.onCompleted();
94+
}
7595
}
7696

77-
private final SubjectState<T> state;
97+
private final SubjectSubscriptionManager<T> subscriptionManager;
98+
final AtomicReference<Notification<T>> lastNotification;
7899

79-
protected AsyncSubject(OnSubscribeFunc<T> onSubscribe, SubjectState<T> state) {
100+
protected AsyncSubject(OnSubscribeFunc<T> onSubscribe, SubjectSubscriptionManager<T> subscriptionManager, AtomicReference<Notification<T>> lastNotification) {
80101
super(onSubscribe);
81-
this.state = state;
102+
this.subscriptionManager = subscriptionManager;
103+
this.lastNotification = lastNotification;
82104
}
83105

84106
@Override
85107
public void onCompleted() {
86-
/**
87-
* Mark this subject as completed and emit latest value + 'onCompleted' to all Observers
88-
*/
89-
emitNotificationAndTerminate(state, new Action2<SubjectState<T>, Observer<? super T>>() {
108+
subscriptionManager.terminate(new Action1<Collection<SubjectObserver<? super T>>>() {
90109

91110
@Override
92-
public void call(SubjectState<T> state, Observer<? super T> o) {
93-
o.onCompleted();
111+
public void call(Collection<SubjectObserver<? super T>> observers) {
112+
for (Observer<? super T> o : observers) {
113+
emitValueToObserver(lastNotification.get(), o);
114+
}
94115
}
95116
});
96117
}
97118

98119
@Override
99-
public void onError(Throwable e) {
100-
/**
101-
* Mark this subject as completed with an error as the last value and emit 'onError' to all Observers
102-
*/
103-
state.currentValue.set(new Notification<T>(e));
104-
emitNotificationAndTerminate(state, null);
120+
public void onError(final Throwable e) {
121+
subscriptionManager.terminate(new Action1<Collection<SubjectObserver<? super T>>>() {
122+
123+
@Override
124+
public void call(Collection<SubjectObserver<? super T>> observers) {
125+
lastNotification.set(new Notification<T>(e));
126+
for (Observer<? super T> o : observers) {
127+
emitValueToObserver(lastNotification.get(), o);
128+
}
129+
}
130+
});
131+
105132
}
106133

107134
@Override
108135
public void onNext(T v) {
109-
/**
110-
* Store the latest value but do not send it. It only gets sent when 'onCompleted' occurs.
111-
*/
112-
state.currentValue.set(new Notification<T>(v));
136+
lastNotification.set(new Notification<T>(v));
113137
}
114138

115139
}

0 commit comments

Comments
 (0)