Skip to content

Commit 08085ac

Browse files
committed
OperatorDematerialize
1 parent 4e77f8a commit 08085ac

File tree

4 files changed

+112
-93
lines changed

4 files changed

+112
-93
lines changed

rxjava-core/src/main/java/rx/Observable.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,6 @@
5555
import rx.operators.OperationDefaultIfEmpty;
5656
import rx.operators.OperationDefer;
5757
import rx.operators.OperationDelay;
58-
import rx.operators.OperationDematerialize;
5958
import rx.operators.OperationDistinct;
6059
import rx.operators.OperationDistinctUntilChanged;
6160
import rx.operators.OperationFinally;
@@ -95,6 +94,7 @@
9594
import rx.operators.OperatorAsObservable;
9695
import rx.operators.OperatorCache;
9796
import rx.operators.OperatorCast;
97+
import rx.operators.OperatorDematerialize;
9898
import rx.operators.OperatorDoOnEach;
9999
import rx.operators.OperatorElementAt;
100100
import rx.operators.OperatorFilter;
@@ -3590,9 +3590,9 @@ public final Observable<T> delaySubscription(long delay, TimeUnit unit, Schedule
35903590
* @see <a href="https://github.com/Netflix/RxJava/wiki/Observable-Utility-Operators#wiki-dematerialize">RxJava Wiki: dematerialize()</a>
35913591
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229047.aspx">MSDN: Observable.dematerialize</a>
35923592
*/
3593-
@SuppressWarnings("unchecked")
3593+
@SuppressWarnings({"unchecked", "rawtypes"})
35943594
public final <T2> Observable<T2> dematerialize() {
3595-
return create(OperationDematerialize.dematerialize((Observable<? extends Notification<? extends T2>>) this));
3595+
return lift(new OperatorDematerialize());
35963596
}
35973597

35983598
/**

rxjava-core/src/main/java/rx/operators/OperationDematerialize.java

Lines changed: 0 additions & 86 deletions
This file was deleted.
Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
/**
2+
* Copyright 2014 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package rx.operators;
17+
18+
import rx.Notification;
19+
import rx.Observable.Operator;
20+
import rx.Subscriber;
21+
22+
/**
23+
* Reverses the effect of {@link OperatorMaterialize} by transforming the Notification objects
24+
* emitted by a source Observable into the items or notifications they represent.
25+
* <p>
26+
* <img width="640" src="https://github.com/Netflix/RxJava/wiki/images/rx-operators/dematerialize.png">
27+
* <p>
28+
* See <a href="http://msdn.microsoft.com/en-us/library/hh229047(v=vs.103).aspx">here</a> for the
29+
* Microsoft Rx equivalent.
30+
*
31+
* @param <T> the wrapped value type
32+
*/
33+
public final class OperatorDematerialize<T> implements Operator<T, Notification<T>> {
34+
35+
@Override
36+
public Subscriber<? super Notification<T>> call(final Subscriber<? super T> child) {
37+
return new Subscriber<Notification<T>>(child) {
38+
/** Do not send two onCompleted events. */
39+
boolean terminated;
40+
@Override
41+
public void onNext(Notification<T> t) {
42+
switch (t.getKind()) {
43+
case OnNext:
44+
if (!terminated) {
45+
child.onNext(t.getValue());
46+
}
47+
break;
48+
case OnError:
49+
onError(t.getThrowable());
50+
break;
51+
case OnCompleted:
52+
onCompleted();
53+
break;
54+
}
55+
}
56+
57+
@Override
58+
public void onError(Throwable e) {
59+
if (!terminated) {
60+
terminated = true;
61+
child.onError(e);
62+
}
63+
}
64+
65+
@Override
66+
public void onCompleted() {
67+
if (!terminated) {
68+
terminated = true;
69+
child.onCompleted();
70+
}
71+
}
72+
73+
};
74+
}
75+
76+
}

rxjava-core/src/test/java/rx/operators/OperationDematerializeTest.java renamed to rxjava-core/src/test/java/rx/operators/OperatorDematerializeTest.java

Lines changed: 33 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,16 +20,16 @@
2020
import static org.mockito.Mockito.never;
2121
import static org.mockito.Mockito.times;
2222
import static org.mockito.Mockito.verify;
23-
import static rx.operators.OperationDematerialize.dematerialize;
2423

2524
import org.junit.Test;
2625

2726
import rx.Notification;
2827
import rx.Observable;
2928
import rx.Observer;
29+
import rx.observers.Subscribers;
3030
import rx.observers.TestSubscriber;
3131

32-
public class OperationDematerializeTest {
32+
public class OperatorDematerializeTest {
3333

3434
@Test
3535
@SuppressWarnings("unchecked")
@@ -51,7 +51,7 @@ public void testDematerialize1() {
5151
public void testDematerialize2() {
5252
Throwable exception = new Throwable("test");
5353
Observable<Integer> observable = Observable.error(exception);
54-
Observable<Integer> dematerialize = Observable.create(dematerialize(observable.materialize()));
54+
Observable<Integer> dematerialize = observable.materialize().dematerialize();
5555

5656
Observer<Integer> observer = mock(Observer.class);
5757
dematerialize.subscribe(observer);
@@ -66,7 +66,7 @@ public void testDematerialize2() {
6666
public void testDematerialize3() {
6767
Exception exception = new Exception("test");
6868
Observable<Integer> observable = Observable.error(exception);
69-
Observable<Integer> dematerialize = Observable.create(dematerialize(observable.materialize()));
69+
Observable<Integer> dematerialize = observable.materialize().dematerialize();
7070

7171
Observer<Integer> observer = mock(Observer.class);
7272
dematerialize.subscribe(observer);
@@ -106,4 +106,33 @@ public void testCompletePassThru() {
106106
verify(observer, times(0)).onNext(any(Integer.class));
107107
}
108108

109+
@Test
110+
public void testHonorsContractWhenCompleted() {
111+
Observable<Integer> source = Observable.just(1);
112+
113+
Observable<Integer> result = source.materialize().dematerialize();
114+
115+
Observer<Integer> o = mock(Observer.class);
116+
117+
result.unsafeSubscribe(Subscribers.from(o));
118+
119+
verify(o).onNext(1);
120+
verify(o).onCompleted();
121+
verify(o, never()).onError(any(Throwable.class));
122+
}
123+
124+
@Test
125+
public void testHonorsContractWhenThrows() {
126+
Observable<Integer> source = Observable.error(new OperationReduceTest.CustomException());
127+
128+
Observable<Integer> result = source.materialize().dematerialize();
129+
130+
Observer<Integer> o = mock(Observer.class);
131+
132+
result.unsafeSubscribe(Subscribers.from(o));
133+
134+
verify(o, never()).onNext(any(Integer.class));
135+
verify(o, never()).onCompleted();
136+
verify(o).onError(any(OperationReduceTest.CustomException.class));
137+
}
109138
}

0 commit comments

Comments
 (0)