Skip to content

Commit d27b8c7

Browse files
akarnokdakarnokd
authored andcommitted
PublishSubject to match BehaviorSubject
1 parent 26cd9dd commit d27b8c7

File tree

2 files changed

+24
-68
lines changed

2 files changed

+24
-68
lines changed

rxjava-core/src/main/java/rx/subjects/BehaviorSubject.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -274,7 +274,7 @@ public void onNext(T v) {
274274
return state.observers.get().observers.length;
275275
}
276276

277-
private static final class BehaviorObserver<T> {
277+
static final class BehaviorObserver<T> {
278278
final Observer<? super T> actual;
279279
final NotificationLite<T> nl = NotificationLite.instance();
280280
/** Guarded by this. */

rxjava-core/src/main/java/rx/subjects/PublishSubject.java

Lines changed: 23 additions & 67 deletions
Original file line numberDiff line numberDiff line change
@@ -15,14 +15,10 @@
1515
*/
1616
package rx.subjects;
1717

18-
import java.util.Collection;
19-
import java.util.concurrent.atomic.AtomicReference;
20-
21-
import rx.Notification;
2218
import rx.Observer;
23-
import rx.functions.Action0;
24-
import rx.functions.Action1;
25-
import rx.subjects.SubjectSubscriptionManager.SubjectObserver;
19+
import rx.operators.NotificationLite;
20+
import rx.subjects.BehaviorSubject.BehaviorOnSubscribe;
21+
import rx.subjects.BehaviorSubject.State;
2622

2723
/**
2824
* Subject that, once and {@link Observer} has subscribed, publishes all subsequent events to the subscriber.
@@ -50,85 +46,45 @@
5046
public final class PublishSubject<T> extends Subject<T, T> {
5147

5248
public static <T> PublishSubject<T> create() {
53-
final SubjectSubscriptionManager<T> subscriptionManager = new SubjectSubscriptionManager<T>();
54-
// set a default value so subscriptions will immediately receive this until a new notification is received
55-
final AtomicReference<Notification<T>> lastNotification = new AtomicReference<Notification<T>>();
56-
57-
OnSubscribe<T> onSubscribe = subscriptionManager.getOnSubscribeFunc(
58-
/**
59-
* This function executes at beginning of subscription.
60-
*
61-
* This will always run, even if Subject is in terminal state.
62-
*/
63-
new Action1<SubjectObserver<? super T>>() {
64-
65-
@Override
66-
public void call(SubjectObserver<? super T> o) {
67-
// nothing onSubscribe unless in terminal state which is the next function
68-
}
69-
},
70-
/**
71-
* This function executes if the Subject is terminated before subscription occurs.
72-
*/
73-
new Action1<SubjectObserver<? super T>>() {
74-
75-
@Override
76-
public void call(SubjectObserver<? super T> o) {
77-
/*
78-
* If we are already terminated, or termination happens while trying to subscribe
79-
* this will be invoked and we emit whatever the last terminal value was.
80-
*/
81-
lastNotification.get().accept(o);
82-
}
83-
}, null);
84-
85-
return new PublishSubject<T>(onSubscribe, subscriptionManager, lastNotification);
49+
State<T> state = new State<T>();
50+
return new PublishSubject<T>(new BehaviorOnSubscribe<T>(state), state);
8651
}
8752

88-
private final SubjectSubscriptionManager<T> subscriptionManager;
89-
final AtomicReference<Notification<T>> lastNotification;
90-
91-
protected PublishSubject(OnSubscribe<T> onSubscribe, SubjectSubscriptionManager<T> subscriptionManager, AtomicReference<Notification<T>> lastNotification) {
53+
final State<T> state;
54+
private final NotificationLite<T> nl = NotificationLite.instance();
55+
56+
protected PublishSubject(OnSubscribe<T> onSubscribe, State<T> state) {
9257
super(onSubscribe);
93-
this.subscriptionManager = subscriptionManager;
94-
this.lastNotification = lastNotification;
58+
this.state = state;
9559
}
9660

9761
@Override
9862
public void onCompleted() {
99-
Collection<SubjectObserver<? super T>> observers = subscriptionManager.terminate(new Action0() {
100-
@Override
101-
public void call() {
102-
lastNotification.set(Notification.<T> createOnCompleted());
103-
}
104-
});
105-
if (observers != null) {
106-
for (Observer<? super T> o : observers) {
107-
o.onCompleted();
63+
Object last = state.get();
64+
if (last == null || state.active) {
65+
Object n = nl.completed();
66+
for (BehaviorSubject.BehaviorObserver<T> bo : state.terminate(n)) {
67+
bo.emitNext(n);
10868
}
10969
}
70+
11071
}
11172

11273
@Override
11374
public void onError(final Throwable e) {
114-
Collection<SubjectObserver<? super T>> observers = subscriptionManager.terminate(new Action0() {
115-
116-
@Override
117-
public void call() {
118-
lastNotification.set(Notification.<T> createOnError(e));
119-
}
120-
});
121-
if (observers != null) {
122-
for (Observer<? super T> o : observers) {
123-
o.onError(e);
75+
Object last = state.get();
76+
if (last == null || state.active) {
77+
Object n = nl.error(e);
78+
for (BehaviorSubject.BehaviorObserver<T> bo : state.terminate(n)) {
79+
bo.emitNext(n);
12480
}
12581
}
12682
}
12783

12884
@Override
12985
public void onNext(T v) {
130-
for (Observer<? super T> o : subscriptionManager.rawSnapshot()) {
131-
o.onNext(v);
86+
for (BehaviorSubject.BehaviorObserver<T> bo : state.observers()) {
87+
bo.emitNext(nl.next(v));
13288
}
13389
}
13490
}

0 commit comments

Comments
 (0)