Skip to content

Commit f5eb8f7

Browse files
Fix Subject.toObservable
- now correctly creates only 1 Observable instance for the life of the Subject - this fixes the OperationJoinsTest - thanks @akarnokd for pointing out my mistake! - all rxjava-core unit tests are now passing
1 parent f26b7b2 commit f5eb8f7

File tree

4 files changed

+21
-21
lines changed

4 files changed

+21
-21
lines changed

rxjava-core/src/main/java/rx/subjects/AsyncSubject.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -96,19 +96,19 @@ protected static <T> void emitValueToObserver(Notification<T> n, Observer<? supe
9696
}
9797
}
9898

99-
private final OnSubscribe<T> onSubscribe;
10099
private final SubjectSubscriptionManager<T> subscriptionManager;
101100
final AtomicReference<Notification<T>> lastNotification;
101+
private final Observable<T> observable;
102102

103103
protected AsyncSubject(OnSubscribe<T> onSubscribe, SubjectSubscriptionManager<T> subscriptionManager, AtomicReference<Notification<T>> lastNotification) {
104-
this.onSubscribe = onSubscribe;
105104
this.subscriptionManager = subscriptionManager;
106105
this.lastNotification = lastNotification;
106+
this.observable = Observable.create(onSubscribe);
107107
}
108108

109109
@Override
110110
public Observable<T> toObservable() {
111-
return Observable.create(onSubscribe);
111+
return observable;
112112
}
113113

114114
@Override
@@ -130,7 +130,7 @@ public void onError(final Throwable e) {
130130

131131
@Override
132132
public void call(Collection<SubjectObserver<? super T>> observers) {
133-
lastNotification.set(new Notification<T>(e));
133+
lastNotification.set(Notification.<T>createOnError(e));
134134
for (Observer<? super T> o : observers) {
135135
emitValueToObserver(lastNotification.get(), o);
136136
}
@@ -141,7 +141,7 @@ public void call(Collection<SubjectObserver<? super T>> observers) {
141141

142142
@Override
143143
public void onNext(T v) {
144-
lastNotification.set(new Notification<T>(v));
144+
lastNotification.set(Notification.<T>createOnNext(v));
145145
}
146146

147147
}

rxjava-core/src/main/java/rx/subjects/BehaviorSubject.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -119,19 +119,19 @@ public void call(SubjectObserver<? super T> o) {
119119
return new BehaviorSubject<T>(onSubscribe, subscriptionManager, lastNotification);
120120
}
121121

122-
private final OnSubscribe<T> onSubscribe;
123122
private final SubjectSubscriptionManager<T> subscriptionManager;
124123
final AtomicReference<Notification<T>> lastNotification;
124+
private final Observable<T> observable;
125125

126126
protected BehaviorSubject(OnSubscribe<T> onSubscribe, SubjectSubscriptionManager<T> subscriptionManager, AtomicReference<Notification<T>> lastNotification) {
127-
this.onSubscribe = onSubscribe;
128127
this.subscriptionManager = subscriptionManager;
129128
this.lastNotification = lastNotification;
129+
this.observable = Observable.create(onSubscribe);
130130
}
131131

132132
@Override
133133
public Observable<T> toObservable() {
134-
return Observable.create(onSubscribe);
134+
return observable;
135135
}
136136

137137
@Override
@@ -140,7 +140,7 @@ public void onCompleted() {
140140

141141
@Override
142142
public void call(Collection<SubjectObserver<? super T>> observers) {
143-
lastNotification.set(new Notification<T>());
143+
lastNotification.set(Notification.<T>createOnCompleted());
144144
for (Observer<? super T> o : observers) {
145145
o.onCompleted();
146146
}
@@ -154,7 +154,7 @@ public void onError(final Throwable e) {
154154

155155
@Override
156156
public void call(Collection<SubjectObserver<? super T>> observers) {
157-
lastNotification.set(new Notification<T>(e));
157+
lastNotification.set(Notification.<T>createOnError(e));
158158
for (Observer<? super T> o : observers) {
159159
o.onError(e);
160160
}
@@ -168,7 +168,7 @@ public void onNext(T v) {
168168
// do not overwrite a terminal notification
169169
// so new subscribers can get them
170170
if (lastNotification.get().isOnNext()) {
171-
lastNotification.set(new Notification<T>(v));
171+
lastNotification.set(Notification.<T>createOnNext(v));
172172
for (Observer<? super T> o : subscriptionManager.rawSnapshot()) {
173173
o.onNext(v);
174174
}

rxjava-core/src/main/java/rx/subjects/PublishSubject.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -86,19 +86,19 @@ public void call(SubjectObserver<? super T> o) {
8686
return new PublishSubject<T>(onSubscribe, subscriptionManager, lastNotification);
8787
}
8888

89-
private final OnSubscribe<T> onSubscribe;
9089
private final SubjectSubscriptionManager<T> subscriptionManager;
9190
final AtomicReference<Notification<T>> lastNotification;
91+
private final Observable<T> observable;
9292

9393
protected PublishSubject(OnSubscribe<T> onSubscribe, SubjectSubscriptionManager<T> subscriptionManager, AtomicReference<Notification<T>> lastNotification) {
94-
this.onSubscribe = onSubscribe;
9594
this.subscriptionManager = subscriptionManager;
9695
this.lastNotification = lastNotification;
96+
this.observable = Observable.create(onSubscribe);
9797
}
9898

9999
@Override
100100
public Observable<T> toObservable() {
101-
return Observable.create(onSubscribe);
101+
return observable;
102102
}
103103

104104
@Override
@@ -107,7 +107,7 @@ public void onCompleted() {
107107

108108
@Override
109109
public void call(Collection<SubjectObserver<? super T>> observers) {
110-
lastNotification.set(new Notification<T>());
110+
lastNotification.set(Notification.<T>createOnCompleted());
111111
for (Observer<? super T> o : observers) {
112112
o.onCompleted();
113113
}
@@ -121,7 +121,7 @@ public void onError(final Throwable e) {
121121

122122
@Override
123123
public void call(Collection<SubjectObserver<? super T>> observers) {
124-
lastNotification.set(new Notification<T>(e));
124+
lastNotification.set(Notification.<T>createOnError(e));
125125
for (Observer<? super T> o : observers) {
126126
o.onError(e);
127127
}

rxjava-core/src/main/java/rx/subjects/ReplaySubject.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -108,17 +108,17 @@ public ReplayState(int initialCapacity) {
108108

109109
private final SubjectSubscriptionManager<T> subscriptionManager;
110110
private final ReplayState<T> state;
111-
private final OnSubscribe<T> onSubscribe;
111+
private final Observable<T> observable;
112112

113113
protected ReplaySubject(OnSubscribe<T> onSubscribe, SubjectSubscriptionManager<T> subscriptionManager, ReplayState<T> state) {
114-
this.onSubscribe = onSubscribe;
115114
this.subscriptionManager = subscriptionManager;
116115
this.state = state;
116+
this.observable = Observable.create(onSubscribe);
117117
}
118118

119119
@Override
120120
public Observable<T> toObservable() {
121-
return Observable.create(onSubscribe);
121+
return observable;
122122
}
123123

124124
@Override
@@ -127,7 +127,7 @@ public void onCompleted() {
127127

128128
@Override
129129
public void call(Collection<SubjectObserver<? super T>> observers) {
130-
state.history.complete(new Notification<T>());
130+
state.history.complete(Notification.<T>createOnCompleted());
131131
for (SubjectObserver<? super T> o : observers) {
132132
if (caughtUp(o)) {
133133
o.onCompleted();
@@ -143,7 +143,7 @@ public void onError(final Throwable e) {
143143

144144
@Override
145145
public void call(Collection<SubjectObserver<? super T>> observers) {
146-
state.history.complete(new Notification<T>(e));
146+
state.history.complete(Notification.<T>createOnError(e));
147147
for (SubjectObserver<? super T> o : observers) {
148148
if (caughtUp(o)) {
149149
o.onError(e);

0 commit comments

Comments
 (0)