Skip to content

Commit 073418f

Browse files
committed
OperatorAsObservable
1 parent 4e0ce47 commit 073418f

File tree

3 files changed

+73
-15
lines changed

3 files changed

+73
-15
lines changed

rxjava-core/src/main/java/rx/Observable.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@
5050
import rx.operators.OnSubscribeRange;
5151
import rx.operators.OperationAll;
5252
import rx.operators.OperationAny;
53-
import rx.operators.OperationAsObservable;
53+
import rx.operators.OperatorAsObservable;
5454
import rx.operators.OperationBuffer;
5555
import rx.operators.OperationCombineLatest;
5656
import rx.operators.OperationConcat;
@@ -2952,7 +2952,7 @@ public final Observable<Boolean> all(Func1<? super T, Boolean> predicate) {
29522952
* @return an Observable that hides the identity of this Observable
29532953
*/
29542954
public final Observable<T> asObservable() {
2955-
return create(new OperationAsObservable<T>(this));
2955+
return lift(new OperatorAsObservable<T>());
29562956
}
29572957

29582958
/**

rxjava-core/src/main/java/rx/operators/OperationAsObservable.java renamed to rxjava-core/src/main/java/rx/operators/OperatorAsObservable.java

Lines changed: 6 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -15,27 +15,20 @@
1515
*/
1616
package rx.operators;
1717

18-
import rx.Observable;
19-
import rx.Observable.OnSubscribeFunc;
20-
import rx.Observer;
21-
import rx.Subscription;
22-
import rx.observers.Subscribers;
18+
import rx.Observable.Operator;
19+
import rx.Subscriber;
2320

2421
/**
2522
* Hides the identity of another observable.
2623
*
2724
* @param <T>
2825
* the return value type of the wrapped observable.
2926
*/
30-
public final class OperationAsObservable<T> implements OnSubscribeFunc<T> {
31-
private final Observable<? extends T> source;
32-
33-
public OperationAsObservable(Observable<? extends T> source) {
34-
this.source = source;
35-
}
27+
public final class OperatorAsObservable<T> implements Operator<T, T> {
3628

3729
@Override
38-
public Subscription onSubscribe(final Observer<? super T> t1) {
39-
return source.unsafeSubscribe(Subscribers.from(t1));
30+
public Subscriber<? super T> call(Subscriber<? super T> t1) {
31+
return t1;
4032
}
33+
4134
}
Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
/**
2+
* Copyright 2014 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package rx.operators;
18+
19+
import static org.junit.Assert.assertFalse;
20+
import org.junit.Test;
21+
import static org.mockito.Matchers.any;
22+
import static org.mockito.Mockito.*;
23+
import rx.Observable;
24+
import rx.Observer;
25+
import rx.subjects.PublishSubject;
26+
27+
public class OperatorAsObservableTest {
28+
@Test
29+
public void testHiding() {
30+
PublishSubject<Integer> src = PublishSubject.create();
31+
32+
Observable<Integer> dst = src.asObservable();
33+
34+
assertFalse(dst instanceof PublishSubject);
35+
36+
Observer<Object> o = mock(Observer.class);
37+
38+
dst.subscribe(o);
39+
40+
src.onNext(1);
41+
src.onCompleted();
42+
43+
verify(o).onNext(1);
44+
verify(o).onCompleted();
45+
verify(o, never()).onError(any(Throwable.class));
46+
}
47+
@Test
48+
public void testHidingError() {
49+
PublishSubject<Integer> src = PublishSubject.create();
50+
51+
Observable<Integer> dst = src.asObservable();
52+
53+
assertFalse(dst instanceof PublishSubject);
54+
55+
Observer<Object> o = mock(Observer.class);
56+
57+
dst.subscribe(o);
58+
59+
src.onError(new OperationReduceTest.CustomException());
60+
61+
verify(o, never()).onNext(any());
62+
verify(o, never()).onCompleted();
63+
verify(o).onError(any(OperationReduceTest.CustomException.class));
64+
}
65+
}

0 commit comments

Comments
 (0)