Skip to content

Commit 6b036e8

Browse files
Subject Refactor
- Common logic composed inside SubjectSubscriptionManager - ReplaySubject does not block while replaying to new subscribers - Added unit tests and fixed behavior while reviewing with @headinthebox compared to Rx.Net - Uses mostly non-blocking approach (I believe it’s all correct, unit and long running tests have been used to prove it. The tests found concurrency problems during development and became stable once I got the design correct. As with all concurrent code I may be missing something.)
1 parent 1586113 commit 6b036e8

12 files changed

+982
-662
lines changed

rxjava-core/src/main/java/rx/subjects/AbstractSubject.java

Lines changed: 0 additions & 167 deletions
This file was deleted.

rxjava-core/src/main/java/rx/subjects/AsyncSubject.java

Lines changed: 65 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,12 @@
1515
*/
1616
package rx.subjects;
1717

18+
import java.util.Collection;
19+
import java.util.concurrent.atomic.AtomicReference;
20+
1821
import rx.Notification;
1922
import rx.Observer;
20-
import rx.util.functions.Action2;
23+
import rx.util.functions.Action1;
2124

2225
/**
2326
* Subject that publishes only the last event to each {@link Observer} that has subscribed when the
@@ -28,8 +31,8 @@
2831
* Example usage:
2932
* <p>
3033
* <pre> {@code
31-
32-
* / observer will receive no onNext events because the subject.onCompleted() isn't called.
34+
*
35+
* // observer will receive no onNext events because the subject.onCompleted() isn't called.
3336
AsyncSubject<Object> subject = AsyncSubject.create();
3437
subject.subscribe(observer);
3538
subject.onNext("one");
@@ -48,68 +51,88 @@
4851
*
4952
* @param <T>
5053
*/
51-
public class AsyncSubject<T> extends AbstractSubject<T> {
54+
public final class AsyncSubject<T> extends Subject<T, T> {
5255

53-
/**
54-
* Create a new AsyncSubject
55-
*
56-
* @return a new AsyncSubject
57-
*/
5856
public static <T> AsyncSubject<T> create() {
59-
final SubjectState<T> state = new SubjectState<T>();
60-
OnSubscribeFunc<T> onSubscribe = getOnSubscribeFunc(state, new Action2<SubjectState<T>, Observer<? super T>>() {
57+
final SubjectSubscriptionManager<T> subscriptionManager = new SubjectSubscriptionManager<T>();
58+
final AtomicReference<Notification<T>> lastNotification = new AtomicReference<Notification<T>>(new Notification<T>());
6159

62-
@Override
63-
public void call(SubjectState<T> state, Observer<? super T> o) {
64-
// we want the last value + completed so add this extra logic
65-
// to send onCompleted if the last value is an onNext
66-
if (state.completed.get()) {
67-
Notification<T> value = state.currentValue.get();
68-
if (value != null && value.isOnNext()) {
69-
o.onCompleted();
60+
OnSubscribeFunc<T> onSubscribe = subscriptionManager.getOnSubscribeFunc(
61+
/**
62+
* This function executes at beginning of subscription.
63+
*
64+
* This will always run, even if Subject is in terminal state.
65+
*/
66+
new Action1<Observer<? super T>>() {
67+
68+
@Override
69+
public void call(Observer<? super T> o) {
70+
// nothing to do if not terminated
7071
}
71-
}
72-
}
73-
});
74-
return new AsyncSubject<T>(onSubscribe, state);
72+
},
73+
/**
74+
* This function executes if the Subject is terminated.
75+
*/
76+
new Action1<Observer<? super T>>() {
77+
78+
@Override
79+
public void call(Observer<? super T> o) {
80+
// we want the last value + completed so add this extra logic
81+
// to send onCompleted if the last value is an onNext
82+
emitValueToObserver(lastNotification.get(), o);
83+
}
84+
});
85+
86+
return new AsyncSubject<T>(onSubscribe, subscriptionManager, lastNotification);
87+
}
88+
89+
protected static <T> void emitValueToObserver(Notification<T> n, Observer<? super T> o) {
90+
n.accept(o);
91+
if (n.isOnNext()) {
92+
o.onCompleted();
93+
}
7594
}
7695

77-
private final SubjectState<T> state;
96+
private final SubjectSubscriptionManager<T> subscriptionManager;
97+
final AtomicReference<Notification<T>> lastNotification;
7898

79-
protected AsyncSubject(OnSubscribeFunc<T> onSubscribe, SubjectState<T> state) {
99+
protected AsyncSubject(OnSubscribeFunc<T> onSubscribe, SubjectSubscriptionManager<T> subscriptionManager, AtomicReference<Notification<T>> lastNotification) {
80100
super(onSubscribe);
81-
this.state = state;
101+
this.subscriptionManager = subscriptionManager;
102+
this.lastNotification = lastNotification;
82103
}
83104

84105
@Override
85106
public void onCompleted() {
86-
/**
87-
* Mark this subject as completed and emit latest value + 'onCompleted' to all Observers
88-
*/
89-
emitNotificationAndTerminate(state, new Action2<SubjectState<T>, Observer<? super T>>() {
107+
subscriptionManager.terminate(new Action1<Collection<Observer<? super T>>>() {
90108

91109
@Override
92-
public void call(SubjectState<T> state, Observer<? super T> o) {
93-
o.onCompleted();
110+
public void call(Collection<Observer<? super T>> observers) {
111+
for (Observer<? super T> o : observers) {
112+
emitValueToObserver(lastNotification.get(), o);
113+
}
94114
}
95115
});
96116
}
97117

98118
@Override
99-
public void onError(Throwable e) {
100-
/**
101-
* Mark this subject as completed with an error as the last value and emit 'onError' to all Observers
102-
*/
103-
state.currentValue.set(new Notification<T>(e));
104-
emitNotificationAndTerminate(state, null);
119+
public void onError(final Throwable e) {
120+
subscriptionManager.terminate(new Action1<Collection<Observer<? super T>>>() {
121+
122+
@Override
123+
public void call(Collection<Observer<? super T>> observers) {
124+
lastNotification.set(new Notification<T>(e));
125+
for (Observer<? super T> o : observers) {
126+
emitValueToObserver(lastNotification.get(), o);
127+
}
128+
}
129+
});
130+
105131
}
106132

107133
@Override
108134
public void onNext(T v) {
109-
/**
110-
* Store the latest value but do not send it. It only gets sent when 'onCompleted' occurs.
111-
*/
112-
state.currentValue.set(new Notification<T>(v));
135+
lastNotification.set(new Notification<T>(v));
113136
}
114137

115138
}

0 commit comments

Comments
 (0)