Skip to content

Commit d40b684

Browse files
Merge pull request #822 from zsxwing/subscribeOn
Reimplement 'subscribeOn' using 'lift'
2 parents c506509 + 90f814f commit d40b684

File tree

5 files changed

+250
-153
lines changed

5 files changed

+250
-153
lines changed

rxjava-core/src/main/java/rx/Observable.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,6 @@
8080
import rx.operators.OperationSkipLast;
8181
import rx.operators.OperationSkipUntil;
8282
import rx.operators.OperationSkipWhile;
83-
import rx.operators.OperationSubscribeOn;
8483
import rx.operators.OperationSum;
8584
import rx.operators.OperationSwitch;
8685
import rx.operators.OperationSynchronize;
@@ -97,6 +96,7 @@
9796
import rx.operators.OperationToObservableFuture;
9897
import rx.operators.OperationUsing;
9998
import rx.operators.OperationWindow;
99+
import rx.operators.OperatorSubscribeOn;
100100
import rx.operators.OperatorZip;
101101
import rx.operators.OperatorCast;
102102
import rx.operators.OperatorFromIterable;
@@ -7028,7 +7028,7 @@ public final Subscription subscribe(Subscriber<? super T> observer, Scheduler sc
70287028
* @see <a href="https://github.com/Netflix/RxJava/wiki/Observable-Utility-Operators#wiki-subscribeon">RxJava Wiki: subscribeOn()</a>
70297029
*/
70307030
public final Observable<T> subscribeOn(Scheduler scheduler) {
7031-
return create(OperationSubscribeOn.subscribeOn(this, scheduler));
7031+
return from(this).lift(new OperatorSubscribeOn<T>(scheduler));
70327032
}
70337033

70347034
/**

rxjava-core/src/main/java/rx/operators/OperationSubscribeOn.java

Lines changed: 0 additions & 97 deletions
This file was deleted.
Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
/**
2+
* Copyright 2014 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package rx.operators;
17+
18+
import rx.Observable;
19+
import rx.Scheduler;
20+
import rx.Scheduler.Inner;
21+
import rx.Subscriber;
22+
import rx.subscriptions.CompositeSubscription;
23+
import rx.subscriptions.Subscriptions;
24+
import rx.util.functions.Action0;
25+
import rx.util.functions.Action1;
26+
27+
/**
28+
* Asynchronously subscribes and unsubscribes Observers on the specified Scheduler.
29+
* <p>
30+
* <img width="640" src="https://github.com/Netflix/RxJava/wiki/images/rx-operators/subscribeOn.png">
31+
*/
32+
public class OperatorSubscribeOn<T> implements Operator<T, Observable<T>> {
33+
34+
private final Scheduler scheduler;
35+
36+
public OperatorSubscribeOn(Scheduler scheduler) {
37+
this.scheduler = scheduler;
38+
}
39+
40+
@Override
41+
public Subscriber<? super Observable<T>> call(
42+
final Subscriber<? super T> subscriber) {
43+
return new Subscriber<Observable<T>>() {
44+
45+
@Override
46+
public void onCompleted() {
47+
// ignore
48+
}
49+
50+
@Override
51+
public void onError(Throwable e) {
52+
subscriber.onError(e);
53+
}
54+
55+
@Override
56+
public void onNext(final Observable<T> o) {
57+
scheduler.schedule(new Action1<Inner>() {
58+
59+
@Override
60+
public void call(final Inner inner) {
61+
final CompositeSubscription cs = new CompositeSubscription();
62+
subscriber.add(Subscriptions.create(new Action0() {
63+
64+
@Override
65+
public void call() {
66+
inner.schedule(new Action1<Inner>() {
67+
68+
@Override
69+
public void call(final Inner inner) {
70+
cs.unsubscribe();
71+
}
72+
73+
});
74+
}
75+
76+
}));
77+
cs.add(subscriber);
78+
o.subscribe(new Subscriber<T>(cs) {
79+
80+
@Override
81+
public void onCompleted() {
82+
subscriber.onCompleted();
83+
}
84+
85+
@Override
86+
public void onError(Throwable e) {
87+
subscriber.onError(e);
88+
}
89+
90+
@Override
91+
public void onNext(T t) {
92+
subscriber.onNext(t);
93+
}
94+
});
95+
}
96+
});
97+
}
98+
99+
};
100+
}
101+
}

rxjava-core/src/test/java/rx/operators/OperationSubscribeOnTest.java

Lines changed: 0 additions & 54 deletions
This file was deleted.

0 commit comments

Comments
 (0)