Skip to content

Commit 1b88781

Browse files
akarnokdakarnokd
authored andcommitted
Unified the Subject management.
1 parent d27b8c7 commit 1b88781

File tree

6 files changed

+420
-652
lines changed

6 files changed

+420
-652
lines changed

rxjava-core/src/main/java/rx/subjects/AsyncSubject.java

Lines changed: 32 additions & 65 deletions
Original file line numberDiff line numberDiff line change
@@ -15,13 +15,9 @@
1515
*/
1616
package rx.subjects;
1717

18-
import java.util.Collection;
19-
import java.util.concurrent.atomic.AtomicReference;
20-
21-
import rx.Notification;
2218
import rx.Observer;
23-
import rx.functions.Action0;
2419
import rx.functions.Action1;
20+
import rx.operators.NotificationLite;
2521
import rx.subjects.SubjectSubscriptionManager.SubjectObserver;
2622

2723
/**
@@ -56,88 +52,59 @@
5652
public final class AsyncSubject<T> extends Subject<T, T> {
5753

5854
public static <T> AsyncSubject<T> create() {
59-
final SubjectSubscriptionManager<T> subscriptionManager = new SubjectSubscriptionManager<T>();
60-
final AtomicReference<Notification<T>> lastNotification = new AtomicReference<Notification<T>>(Notification.<T>createOnCompleted());
61-
62-
OnSubscribe<T> onSubscribe = subscriptionManager.getOnSubscribeFunc(
63-
/**
64-
* This function executes at beginning of subscription.
65-
*
66-
* This will always run, even if Subject is in terminal state.
67-
*/
68-
new Action1<SubjectObserver<? super T>>() {
69-
70-
@Override
71-
public void call(SubjectObserver<? super T> o) {
72-
// nothing to do if not terminated
73-
}
74-
},
75-
/**
76-
* This function executes if the Subject is terminated.
77-
*/
78-
new Action1<SubjectObserver<? super T>>() {
79-
80-
@Override
81-
public void call(SubjectObserver<? super T> o) {
82-
// we want the last value + completed so add this extra logic
83-
// to send onCompleted if the last value is an onNext
84-
emitValueToObserver(lastNotification.get(), o);
85-
}
86-
}, null);
87-
88-
return new AsyncSubject<T>(onSubscribe, subscriptionManager, lastNotification);
55+
final SubjectSubscriptionManager<T> state = new SubjectSubscriptionManager<T>();
56+
state.onTerminated = new Action1<SubjectObserver<T>>() {
57+
@Override
58+
public void call(SubjectObserver<T> o) {
59+
Object v = state.get();
60+
o.accept(v);
61+
o.completeSingle(v);
62+
}
63+
};
64+
return new AsyncSubject<T>(state, state);
8965
}
9066

91-
protected static <T> void emitValueToObserver(Notification<T> n, Observer<? super T> o) {
92-
n.accept(o);
93-
if (n.isOnNext()) {
94-
o.onCompleted();
95-
}
96-
}
67+
final SubjectSubscriptionManager<T> state;
68+
volatile Object lastValue;
69+
private final NotificationLite<T> nl = NotificationLite.instance();
9770

98-
private final SubjectSubscriptionManager<T> subscriptionManager;
99-
final AtomicReference<Notification<T>> lastNotification;
10071

101-
protected AsyncSubject(OnSubscribe<T> onSubscribe, SubjectSubscriptionManager<T> subscriptionManager, AtomicReference<Notification<T>> lastNotification) {
72+
protected AsyncSubject(OnSubscribe<T> onSubscribe, SubjectSubscriptionManager<T> state) {
10273
super(onSubscribe);
103-
this.subscriptionManager = subscriptionManager;
104-
this.lastNotification = lastNotification;
74+
this.state = state;
10575
}
10676

10777
@Override
10878
public void onCompleted() {
109-
Collection<SubjectObserver<? super T>> observers = subscriptionManager.terminate(new Action0() {
110-
111-
@Override
112-
public void call() {
79+
if (state.active) {
80+
Object last = lastValue;
81+
if (last == null) {
82+
last = nl.completed();
11383
}
114-
});
115-
if (observers != null) {
116-
for (Observer<? super T> o : observers) {
117-
emitValueToObserver(lastNotification.get(), o);
84+
for (SubjectObserver<T> bo : state.terminate(last)) {
85+
if (last == nl.completed()) {
86+
bo.onCompleted();
87+
} else {
88+
bo.onNext(nl.getValue(last));
89+
bo.onCompleted();
90+
}
11891
}
11992
}
12093
}
12194

12295
@Override
12396
public void onError(final Throwable e) {
124-
Collection<SubjectObserver<? super T>> observers = subscriptionManager.terminate(new Action0() {
125-
@Override
126-
public void call() {
127-
lastNotification.set(Notification.<T> createOnError(e));
128-
}
129-
});
130-
if (observers != null) {
131-
for (Observer<? super T> o : observers) {
132-
emitValueToObserver(lastNotification.get(), o);
97+
if (state.active) {
98+
Object n = nl.error(e);
99+
for (SubjectObserver<T> bo : state.terminate(n)) {
100+
bo.onError(e);
133101
}
134102
}
135-
136103
}
137104

138105
@Override
139106
public void onNext(T v) {
140-
lastNotification.set(Notification.createOnNext(v));
107+
lastValue = nl.next(v);
141108
}
142109

143110
}

0 commit comments

Comments
 (0)