Skip to content

Commit 89bb9db

Browse files
committed
Reimplement 'subscribeOn' using 'lift'
1 parent a797c56 commit 89bb9db

File tree

5 files changed

+280
-153
lines changed

5 files changed

+280
-153
lines changed

rxjava-core/src/main/java/rx/Observable.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,6 @@
8080
import rx.operators.OperationSkipLast;
8181
import rx.operators.OperationSkipUntil;
8282
import rx.operators.OperationSkipWhile;
83-
import rx.operators.OperationSubscribeOn;
8483
import rx.operators.OperationSum;
8584
import rx.operators.OperationSwitch;
8685
import rx.operators.OperationSynchronize;
@@ -97,6 +96,7 @@
9796
import rx.operators.OperationToObservableFuture;
9897
import rx.operators.OperationUsing;
9998
import rx.operators.OperationWindow;
99+
import rx.operators.OperatorSubscribeOn;
100100
import rx.operators.OperatorZip;
101101
import rx.operators.OperatorCast;
102102
import rx.operators.OperatorFromIterable;
@@ -6967,7 +6967,7 @@ public final Subscription subscribe(Subscriber<? super T> observer, Scheduler sc
69676967
* @see <a href="https://github.com/Netflix/RxJava/wiki/Observable-Utility-Operators#wiki-subscribeon">RxJava Wiki: subscribeOn()</a>
69686968
*/
69696969
public final Observable<T> subscribeOn(Scheduler scheduler) {
6970-
return create(OperationSubscribeOn.subscribeOn(this, scheduler));
6970+
return from(this).lift(new OperatorSubscribeOn<T>(scheduler));
69716971
}
69726972

69736973
/**

rxjava-core/src/main/java/rx/operators/OperationSubscribeOn.java

Lines changed: 0 additions & 97 deletions
This file was deleted.
Lines changed: 105 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,105 @@
1+
/**
2+
* Copyright 2014 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package rx.operators;
17+
18+
import rx.Observable;
19+
import rx.Scheduler;
20+
import rx.Scheduler.Inner;
21+
import rx.Subscriber;
22+
import rx.subscriptions.CompositeSubscription;
23+
import rx.subscriptions.Subscriptions;
24+
import rx.util.functions.Action0;
25+
import rx.util.functions.Action1;
26+
27+
/**
28+
* Asynchronously subscribes and unsubscribes Observers on the specified Scheduler.
29+
* <p>
30+
* <img width="640" src="https://github.com/Netflix/RxJava/wiki/images/rx-operators/subscribeOn.png">
31+
*/
32+
public class OperatorSubscribeOn<T> implements Operator<T, Observable<T>> {
33+
34+
private final Scheduler scheduler;
35+
36+
public OperatorSubscribeOn(Scheduler scheduler) {
37+
this.scheduler = scheduler;
38+
}
39+
40+
@Override
41+
public Subscriber<? super Observable<T>> call(
42+
final Subscriber<? super T> subscriber) {
43+
return new Subscriber<Observable<T>>() {
44+
45+
@Override
46+
public void onCompleted() {
47+
// ignore
48+
}
49+
50+
@Override
51+
public void onError(Throwable e) {
52+
subscriber.onError(e);
53+
}
54+
55+
@Override
56+
public void onNext(final Observable<T> o) {
57+
scheduler.schedule(new Action1<Inner>() {
58+
59+
@Override
60+
public void call(final Inner inner) {
61+
if (!inner.isUnsubscribed()) {
62+
final CompositeSubscription cs = new CompositeSubscription();
63+
subscriber.add(Subscriptions.create(new Action0() {
64+
65+
@Override
66+
public void call() {
67+
scheduler.schedule(new Action1<Inner>() {
68+
69+
@Override
70+
public void call(final Inner inner) {
71+
cs.unsubscribe();
72+
inner.unsubscribe();
73+
}
74+
75+
});
76+
}
77+
78+
}));
79+
cs.add(subscriber);
80+
o.subscribe(new Subscriber<T>(cs) {
81+
82+
@Override
83+
public void onCompleted() {
84+
subscriber.onCompleted();
85+
}
86+
87+
@Override
88+
public void onError(Throwable e) {
89+
subscriber.onError(e);
90+
}
91+
92+
@Override
93+
public void onNext(T t) {
94+
subscriber.onNext(t);
95+
}
96+
});
97+
}
98+
}
99+
100+
});
101+
}
102+
103+
};
104+
}
105+
}

rxjava-core/src/test/java/rx/operators/OperationSubscribeOnTest.java

Lines changed: 0 additions & 54 deletions
This file was deleted.

0 commit comments

Comments
 (0)