Skip to content

Commit d4fd741

Browse files
Merge branch 'OperatorDematerialize' of github.com:akarnokd/RxJava into merge-prs
Conflicts: rxjava-core/src/main/java/rx/Observable.java
2 parents 6b9367b + 08085ac commit d4fd741

File tree

4 files changed

+112
-93
lines changed

4 files changed

+112
-93
lines changed

rxjava-core/src/main/java/rx/Observable.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,6 @@
5050
import rx.operators.OnSubscribeRange;
5151
import rx.operators.OperationDebounce;
5252
import rx.operators.OperationDelay;
53-
import rx.operators.OperationDematerialize;
5453
import rx.operators.OperationDistinct;
5554
import rx.operators.OperationDistinctUntilChanged;
5655
import rx.operators.OperationFinally;
@@ -98,6 +97,7 @@
9897
import rx.operators.OperatorConcat;
9998
import rx.operators.OperatorDefaultIfEmpty;
10099
import rx.operators.OperatorDefer;
100+
import rx.operators.OperatorDematerialize;
101101
import rx.operators.OperatorDoOnEach;
102102
import rx.operators.OperatorElementAt;
103103
import rx.operators.OperatorFilter;
@@ -3609,9 +3609,9 @@ public final Observable<T> delaySubscription(long delay, TimeUnit unit, Schedule
36093609
* @see <a href="https://github.com/Netflix/RxJava/wiki/Observable-Utility-Operators#wiki-dematerialize">RxJava Wiki: dematerialize()</a>
36103610
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229047.aspx">MSDN: Observable.dematerialize</a>
36113611
*/
3612-
@SuppressWarnings("unchecked")
3612+
@SuppressWarnings({"unchecked", "rawtypes"})
36133613
public final <T2> Observable<T2> dematerialize() {
3614-
return create(OperationDematerialize.dematerialize((Observable<? extends Notification<? extends T2>>) this));
3614+
return lift(new OperatorDematerialize());
36153615
}
36163616

36173617
/**

rxjava-core/src/main/java/rx/operators/OperationDematerialize.java

Lines changed: 0 additions & 86 deletions
This file was deleted.
Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
/**
2+
* Copyright 2014 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package rx.operators;
17+
18+
import rx.Notification;
19+
import rx.Observable.Operator;
20+
import rx.Subscriber;
21+
22+
/**
23+
* Reverses the effect of {@link OperatorMaterialize} by transforming the Notification objects
24+
* emitted by a source Observable into the items or notifications they represent.
25+
* <p>
26+
* <img width="640" src="https://github.com/Netflix/RxJava/wiki/images/rx-operators/dematerialize.png">
27+
* <p>
28+
* See <a href="http://msdn.microsoft.com/en-us/library/hh229047(v=vs.103).aspx">here</a> for the
29+
* Microsoft Rx equivalent.
30+
*
31+
* @param <T> the wrapped value type
32+
*/
33+
public final class OperatorDematerialize<T> implements Operator<T, Notification<T>> {
34+
35+
@Override
36+
public Subscriber<? super Notification<T>> call(final Subscriber<? super T> child) {
37+
return new Subscriber<Notification<T>>(child) {
38+
/** Do not send two onCompleted events. */
39+
boolean terminated;
40+
@Override
41+
public void onNext(Notification<T> t) {
42+
switch (t.getKind()) {
43+
case OnNext:
44+
if (!terminated) {
45+
child.onNext(t.getValue());
46+
}
47+
break;
48+
case OnError:
49+
onError(t.getThrowable());
50+
break;
51+
case OnCompleted:
52+
onCompleted();
53+
break;
54+
}
55+
}
56+
57+
@Override
58+
public void onError(Throwable e) {
59+
if (!terminated) {
60+
terminated = true;
61+
child.onError(e);
62+
}
63+
}
64+
65+
@Override
66+
public void onCompleted() {
67+
if (!terminated) {
68+
terminated = true;
69+
child.onCompleted();
70+
}
71+
}
72+
73+
};
74+
}
75+
76+
}

rxjava-core/src/test/java/rx/operators/OperationDematerializeTest.java renamed to rxjava-core/src/test/java/rx/operators/OperatorDematerializeTest.java

Lines changed: 33 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,16 +20,16 @@
2020
import static org.mockito.Mockito.never;
2121
import static org.mockito.Mockito.times;
2222
import static org.mockito.Mockito.verify;
23-
import static rx.operators.OperationDematerialize.dematerialize;
2423

2524
import org.junit.Test;
2625

2726
import rx.Notification;
2827
import rx.Observable;
2928
import rx.Observer;
29+
import rx.observers.Subscribers;
3030
import rx.observers.TestSubscriber;
3131

32-
public class OperationDematerializeTest {
32+
public class OperatorDematerializeTest {
3333

3434
@Test
3535
@SuppressWarnings("unchecked")
@@ -51,7 +51,7 @@ public void testDematerialize1() {
5151
public void testDematerialize2() {
5252
Throwable exception = new Throwable("test");
5353
Observable<Integer> observable = Observable.error(exception);
54-
Observable<Integer> dematerialize = Observable.create(dematerialize(observable.materialize()));
54+
Observable<Integer> dematerialize = observable.materialize().dematerialize();
5555

5656
Observer<Integer> observer = mock(Observer.class);
5757
dematerialize.subscribe(observer);
@@ -66,7 +66,7 @@ public void testDematerialize2() {
6666
public void testDematerialize3() {
6767
Exception exception = new Exception("test");
6868
Observable<Integer> observable = Observable.error(exception);
69-
Observable<Integer> dematerialize = Observable.create(dematerialize(observable.materialize()));
69+
Observable<Integer> dematerialize = observable.materialize().dematerialize();
7070

7171
Observer<Integer> observer = mock(Observer.class);
7272
dematerialize.subscribe(observer);
@@ -106,4 +106,33 @@ public void testCompletePassThru() {
106106
verify(observer, times(0)).onNext(any(Integer.class));
107107
}
108108

109+
@Test
110+
public void testHonorsContractWhenCompleted() {
111+
Observable<Integer> source = Observable.just(1);
112+
113+
Observable<Integer> result = source.materialize().dematerialize();
114+
115+
Observer<Integer> o = mock(Observer.class);
116+
117+
result.unsafeSubscribe(Subscribers.from(o));
118+
119+
verify(o).onNext(1);
120+
verify(o).onCompleted();
121+
verify(o, never()).onError(any(Throwable.class));
122+
}
123+
124+
@Test
125+
public void testHonorsContractWhenThrows() {
126+
Observable<Integer> source = Observable.error(new OperationReduceTest.CustomException());
127+
128+
Observable<Integer> result = source.materialize().dematerialize();
129+
130+
Observer<Integer> o = mock(Observer.class);
131+
132+
result.unsafeSubscribe(Subscribers.from(o));
133+
134+
verify(o, never()).onNext(any(Integer.class));
135+
verify(o, never()).onCompleted();
136+
verify(o).onError(any(OperationReduceTest.CustomException.class));
137+
}
109138
}

0 commit comments

Comments
 (0)