Skip to content

Commit 584ef6e

Browse files
Subject reverted to Observable+Observer
- rxjava-core is passing all unit tests - other modules are failing
1 parent 60af29e commit 584ef6e

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

52 files changed

+320
-343
lines changed

rxjava-core/src/main/java/rx/Notification.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -142,7 +142,7 @@ public boolean isOnNext() {
142142
return getKind() == Kind.OnNext;
143143
}
144144

145-
public void accept(Subscriber<? super T> observer) {
145+
public void accept(Observer<? super T> observer) {
146146
if (isOnNext()) {
147147
observer.onNext(getValue());
148148
} else if (isOnCompleted()) {

rxjava-core/src/main/java/rx/operators/OperationCache.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ public Subscription onSubscribe(Subscriber<? super T> observer) {
5656
*/
5757
}
5858

59-
return cache.toObservable().subscribe(observer);
59+
return cache.subscribe(observer);
6060
}
6161

6262
};

rxjava-core/src/main/java/rx/operators/OperationGroupByUntil.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -221,7 +221,7 @@ public GroupedObservable<K, V> toObservable() {
221221
return new GroupedObservable<K, V>(key, new OnSubscribe<V>() {
222222
@Override
223223
public void call(Subscriber<? super V> o) {
224-
publish.toObservable().subscribe(o);
224+
publish.subscribe(o);
225225
}
226226
});
227227
}

rxjava-core/src/main/java/rx/operators/OperationGroupJoin.java

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222

2323
import rx.Observable;
2424
import rx.Observable.OnSubscribeFunc;
25+
import rx.Observer;
2526
import rx.Subscriber;
2627
import rx.Subscription;
2728
import rx.subjects.PublishSubject;
@@ -72,7 +73,7 @@ class ResultManager implements Subscription {
7273
final Object guard = new Object();
7374
int leftIds;
7475
int rightIds;
75-
final Map<Integer, Subscriber<T2>> leftMap = new HashMap<Integer, Subscriber<T2>>();
76+
final Map<Integer, Observer<T2>> leftMap = new HashMap<Integer, Observer<T2>>();
7677
final Map<Integer, T2> rightMap = new HashMap<Integer, T2>();
7778
boolean leftDone;
7879
boolean rightDone;
@@ -101,10 +102,10 @@ public void unsubscribe() {
101102
}
102103

103104
void groupsOnCompleted() {
104-
List<Subscriber<T2>> list = new ArrayList<Subscriber<T2>>(leftMap.values());
105+
List<Observer<T2>> list = new ArrayList<Observer<T2>>(leftMap.values());
105106
leftMap.clear();
106107
rightMap.clear();
107-
for (Subscriber<T2> o : list) {
108+
for (Observer<T2> o : list) {
108109
o.onCompleted();
109110
}
110111
}
@@ -127,7 +128,7 @@ public void onNext(T1 args) {
127128
leftMap.put(id, subj);
128129
}
129130

130-
Observable<T2> window = Observable.create(new WindowObservableFunc<T2>(subj.toObservable(), cancel));
131+
Observable<T2> window = Observable.create(new WindowObservableFunc<T2>(subj, cancel));
131132

132133
Observable<D1> duration = leftDuration.call(args);
133134

@@ -164,7 +165,7 @@ public void onCompleted() {
164165
@Override
165166
public void onError(Throwable e) {
166167
synchronized (guard) {
167-
for (Subscriber<T2> o : leftMap.values()) {
168+
for (Observer<T2> o : leftMap.values()) {
168169
o.onError(e);
169170
}
170171
observer.onError(e);
@@ -197,7 +198,7 @@ public void onNext(T2 args) {
197198
sduration.setSubscription(duration.subscribe(new RightDurationObserver(id, sduration)));
198199

199200
synchronized (guard) {
200-
for (Subscriber<T2> o : leftMap.values()) {
201+
for (Observer<T2> o : leftMap.values()) {
201202
o.onNext(args);
202203
}
203204
}
@@ -222,7 +223,7 @@ public void onCompleted() {
222223
@Override
223224
public void onError(Throwable e) {
224225
synchronized (guard) {
225-
for (Subscriber<T2> o : leftMap.values()) {
226+
for (Observer<T2> o : leftMap.values()) {
226227
o.onError(e);
227228
}
228229

@@ -236,9 +237,9 @@ public void onError(Throwable e) {
236237
class LeftDurationObserver extends Subscriber<D1> {
237238
final int id;
238239
final Subscription sduration;
239-
final Subscriber<T2> gr;
240+
final Observer<T2> gr;
240241

241-
public LeftDurationObserver(int id, Subscription sduration, Subscriber<T2> gr) {
242+
public LeftDurationObserver(int id, Subscription sduration, Observer<T2> gr) {
242243
this.id = id;
243244
this.sduration = sduration;
244245
this.gr = gr;

rxjava-core/src/main/java/rx/operators/OperationMulticast.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ public MulticastConnectableObservable(Observable<? extends T> source, final Subj
4444
super(new OnSubscribe<R>() {
4545
@Override
4646
public void call(Subscriber<? super R> observer) {
47-
subject.toObservable().subscribe(observer);
47+
subject.subscribe(observer);
4848
}
4949
});
5050
this.source = source;

rxjava-core/src/main/java/rx/operators/OperationReplay.java

Lines changed: 10 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ public static <T> Subject<T, T> replayBuffered(int bufferSize) {
6262
* propagated through the given wrapped subject.
6363
*/
6464
public static <T> Subject<T, T> createScheduledSubject(Subject<T, T> subject, Scheduler scheduler) {
65-
final Observable<T> observedOn = subject.toObservable().observeOn(scheduler);
65+
final Observable<T> observedOn = subject.observeOn(scheduler);
6666
SubjectWrapper<T> s = new SubjectWrapper<T>(new OnSubscribe<T>() {
6767

6868
@Override
@@ -198,18 +198,12 @@ public Subscription onSubscribe(Subscriber<? super T> t1) {
198198
public static final class SubjectWrapper<T> extends Subject<T, T> {
199199
/** The wrapped subject. */
200200
final Subject<T, T> subject;
201-
private final OnSubscribe<T> func;
202201

203202
public SubjectWrapper(OnSubscribe<T> func, Subject<T, T> subject) {
204-
this.func = func;
203+
super(func);
205204
this.subject = subject;
206205
}
207206

208-
@Override
209-
public Observable<T> toObservable() {
210-
return Observable.create(func);
211-
}
212-
213207
@Override
214208
public void onNext(T args) {
215209
subject.onNext(args);
@@ -727,13 +721,19 @@ public static <T> CustomReplaySubject<T, T, T> create(int maxSize) {
727721
protected final ReplayState<TIntermediate, TResult> state;
728722
/** The result selector. */
729723
protected final Func1<? super TInput, ? extends TIntermediate> intermediateSelector;
730-
private final OnSubscribeFunc<TResult> onSubscribe;
731724

732725
private CustomReplaySubject(
733726
final OnSubscribeFunc<TResult> onSubscribe,
734727
ReplayState<TIntermediate, TResult> state,
735728
Func1<? super TInput, ? extends TIntermediate> intermediateSelector) {
736-
this.onSubscribe = onSubscribe;
729+
super(new OnSubscribe<TResult>() {
730+
731+
@Override
732+
public void call(Subscriber<? super TResult> sub) {
733+
onSubscribe.onSubscribe(sub);
734+
}
735+
736+
});
737737
this.state = state;
738738
this.intermediateSelector = intermediateSelector;
739739
}
@@ -793,11 +793,6 @@ protected void replayValues() {
793793
rp.replayTill(s);
794794
}
795795
}
796-
797-
@Override
798-
public Observable<TResult> toObservable() {
799-
return Observable.create(onSubscribe);
800-
}
801796
}
802797

803798
/**

rxjava-core/src/main/java/rx/operators/OperationTimeout.java

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,9 @@
2222

2323
import rx.Observable;
2424
import rx.Observable.OnSubscribeFunc;
25-
import rx.Subscriber;
25+
import rx.Observer;
2626
import rx.Scheduler;
27+
import rx.Subscriber;
2728
import rx.Subscription;
2829
import rx.schedulers.Schedulers;
2930
import rx.subscriptions.CompositeSubscription;
@@ -107,7 +108,7 @@ public void call() {
107108
}
108109
};
109110
SafeObservableSubscription subscription = new SafeObservableSubscription();
110-
composite.add(subscription.wrap(source.subscribe(new Subscriber<T>() {
111+
composite.add(subscription.wrap(source.subscribe(new Observer<T>() {
111112
@Override
112113
public void onNext(T value) {
113114
boolean onNextWins = false;
@@ -197,8 +198,8 @@ public Subscription onSubscribe(Subscriber<? super T> t1) {
197198
}
198199

199200
/** Observe the source. */
200-
private static final class SourceObserver<T, V> extends Subscriber<T> implements TimeoutCallback {
201-
final Subscriber<? super T> observer;
201+
private static final class SourceObserver<T, V> implements Observer<T>, TimeoutCallback {
202+
final Observer<? super T> observer;
202203
final Func1<? super T, ? extends Observable<V>> valueTimeout;
203204
final Observable<? extends T> other;
204205
final CompositeSubscription cancel;
@@ -207,7 +208,7 @@ private static final class SourceObserver<T, V> extends Subscriber<T> implements
207208
final SerialSubscription tsub;
208209
final TimeoutObserver<V> to;
209210

210-
public SourceObserver(Subscriber<? super T> observer, Func1<? super T, ? extends Observable<V>> valueTimeout, Observable<? extends T> other, CompositeSubscription cancel) {
211+
public SourceObserver(Observer<? super T> observer, Func1<? super T, ? extends Observable<V>> valueTimeout, Observable<? extends T> other, CompositeSubscription cancel) {
211212
this.observer = observer;
212213
this.valueTimeout = valueTimeout;
213214
this.other = other;
@@ -292,7 +293,7 @@ private interface TimeoutCallback {
292293
}
293294

294295
/** Observe the timeout. */
295-
private static final class TimeoutObserver<V> extends Subscriber<V> {
296+
private static final class TimeoutObserver<V> implements Observer<V> {
296297
final TimeoutCallback parent;
297298

298299
public TimeoutObserver(TimeoutCallback parent) {
@@ -315,4 +316,4 @@ public void onCompleted() {
315316
}
316317
}
317318
}
318-
}
319+
}

rxjava-core/src/main/java/rx/operators/OperationWindow.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -386,7 +386,7 @@ public Subscription onSubscribe(Subscriber<? super Observable<T>> t1) {
386386

387387
final SourceObserver<T> so = new SourceObserver<T>(t1, csub);
388388
try {
389-
t1.onNext(so.subject.toObservable());
389+
t1.onNext(so.subject);
390390
} catch (Throwable t) {
391391
t1.onError(t);
392392
return Subscriptions.empty();
@@ -470,7 +470,7 @@ public void replace() {
470470
s.onCompleted();
471471

472472
subject = create();
473-
observer.onNext(subject.toObservable());
473+
observer.onNext(subject);
474474
}
475475
} catch (Throwable t) {
476476
onError(t);

rxjava-core/src/main/java/rx/operators/OperatorGroupBy.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,7 @@ public void call() {
9797
}
9898

9999
}));
100-
_gps.toObservable().subscribe(new Subscriber<T>(o) {
100+
_gps.subscribe(new Subscriber<T>(o) {
101101

102102
@Override
103103
public void onCompleted() {

rxjava-core/src/main/java/rx/subjects/AsyncSubject.java

Lines changed: 9 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -19,14 +19,12 @@
1919
import java.util.concurrent.atomic.AtomicReference;
2020

2121
import rx.Notification;
22-
import rx.Observable;
23-
import rx.Observable.OnSubscribe;
24-
import rx.Subscriber;
22+
import rx.Observer;
2523
import rx.subjects.SubjectSubscriptionManager.SubjectObserver;
2624
import rx.util.functions.Action1;
2725

2826
/**
29-
* Subject that publishes only the last event to each {@link Subscriber} that has subscribed when the
27+
* Subject that publishes only the last event to each {@link Observer} that has subscribed when the
3028
* sequence completes.
3129
* <p>
3230
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/S.AsyncSubject.png">
@@ -89,7 +87,7 @@ public void call(SubjectObserver<? super T> o) {
8987
return new AsyncSubject<T>(onSubscribe, subscriptionManager, lastNotification);
9088
}
9189

92-
protected static <T> void emitValueToObserver(Notification<T> n, Subscriber<? super T> o) {
90+
protected static <T> void emitValueToObserver(Notification<T> n, Observer<? super T> o) {
9391
n.accept(o);
9492
if (n.isOnNext()) {
9593
o.onCompleted();
@@ -98,17 +96,11 @@ protected static <T> void emitValueToObserver(Notification<T> n, Subscriber<? su
9896

9997
private final SubjectSubscriptionManager<T> subscriptionManager;
10098
final AtomicReference<Notification<T>> lastNotification;
101-
private final Observable<T> observable;
10299

103100
protected AsyncSubject(OnSubscribe<T> onSubscribe, SubjectSubscriptionManager<T> subscriptionManager, AtomicReference<Notification<T>> lastNotification) {
101+
super(onSubscribe);
104102
this.subscriptionManager = subscriptionManager;
105103
this.lastNotification = lastNotification;
106-
this.observable = Observable.create(onSubscribe);
107-
}
108-
109-
@Override
110-
public Observable<T> toObservable() {
111-
return observable;
112104
}
113105

114106
@Override
@@ -117,7 +109,7 @@ public void onCompleted() {
117109

118110
@Override
119111
public void call(Collection<SubjectObserver<? super T>> observers) {
120-
for (Subscriber<? super T> o : observers) {
112+
for (Observer<? super T> o : observers) {
121113
emitValueToObserver(lastNotification.get(), o);
122114
}
123115
}
@@ -130,8 +122,8 @@ public void onError(final Throwable e) {
130122

131123
@Override
132124
public void call(Collection<SubjectObserver<? super T>> observers) {
133-
lastNotification.set(Notification.<T>createOnError(e));
134-
for (Subscriber<? super T> o : observers) {
125+
lastNotification.set(new Notification<T>(e));
126+
for (Observer<? super T> o : observers) {
135127
emitValueToObserver(lastNotification.get(), o);
136128
}
137129
}
@@ -141,7 +133,7 @@ public void call(Collection<SubjectObserver<? super T>> observers) {
141133

142134
@Override
143135
public void onNext(T v) {
144-
lastNotification.set(Notification.<T>createOnNext(v));
136+
lastNotification.set(new Notification<T>(v));
145137
}
146138

147-
}
139+
}

0 commit comments

Comments
 (0)