Skip to content

Commit 6b9367b

Browse files
Merge branch 'OperatorDefer' of github.com:akarnokd/RxJava into merge-prs
Conflicts: rxjava-core/src/main/java/rx/Observable.java
2 parents 200b673 + a6b3990 commit 6b9367b

File tree

3 files changed

+40
-23
lines changed

3 files changed

+40
-23
lines changed

rxjava-core/src/main/java/rx/Observable.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,6 @@
4949
import rx.operators.OnSubscribeFromIterable;
5050
import rx.operators.OnSubscribeRange;
5151
import rx.operators.OperationDebounce;
52-
import rx.operators.OperationDefer;
5352
import rx.operators.OperationDelay;
5453
import rx.operators.OperationDematerialize;
5554
import rx.operators.OperationDistinct;
@@ -98,6 +97,7 @@
9897
import rx.operators.OperatorCombineLatest;
9998
import rx.operators.OperatorConcat;
10099
import rx.operators.OperatorDefaultIfEmpty;
100+
import rx.operators.OperatorDefer;
101101
import rx.operators.OperatorDoOnEach;
102102
import rx.operators.OperatorElementAt;
103103
import rx.operators.OperatorFilter;
@@ -1007,7 +1007,7 @@ public final static <T> Observable<T> concat(Observable<? extends T> t1, Observa
10071007
* @see <a href="https://github.com/Netflix/RxJava/wiki/Creating-Observables#wiki-defer">RxJava Wiki: defer()</a>
10081008
*/
10091009
public final static <T> Observable<T> defer(Func0<? extends Observable<? extends T>> observableFactory) {
1010-
return create(OperationDefer.defer(observableFactory));
1010+
return create(new OperatorDefer<T>(observableFactory));
10111011
}
10121012

10131013
/**

rxjava-core/src/main/java/rx/operators/OperationDefer.java renamed to rxjava-core/src/main/java/rx/operators/OperatorDefer.java

Lines changed: 18 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -16,11 +16,9 @@
1616
package rx.operators;
1717

1818
import rx.Observable;
19-
import rx.Observable.OnSubscribeFunc;
20-
import rx.Observer;
21-
import rx.Subscription;
19+
import rx.Observable.OnSubscribe;
20+
import rx.Subscriber;
2221
import rx.functions.Func0;
23-
import rx.observers.Subscribers;
2422

2523
/**
2624
* Do not create the Observable until an Observer subscribes; create a fresh Observable on each
@@ -32,17 +30,23 @@
3230
* return an Observable that will call this function to generate its Observable sequence afresh
3331
* each time a new Observer subscribes.
3432
*/
35-
public final class OperationDefer {
33+
public final class OperatorDefer<T> implements OnSubscribe<T> {
34+
final Func0<? extends Observable<? extends T>> observableFactory;
3635

37-
public static <T> OnSubscribeFunc<T> defer(final Func0<? extends Observable<? extends T>> observableFactory) {
38-
39-
return new OnSubscribeFunc<T>() {
40-
@Override
41-
public Subscription onSubscribe(Observer<? super T> observer) {
42-
Observable<? extends T> obs = observableFactory.call();
43-
return obs.unsafeSubscribe(Subscribers.from(observer));
44-
}
45-
};
36+
public OperatorDefer(Func0<? extends Observable<? extends T>> observableFactory) {
37+
this.observableFactory = observableFactory;
38+
}
4639

40+
@Override
41+
public void call(Subscriber<? super T> s) {
42+
Observable<? extends T> o;
43+
try {
44+
o = observableFactory.call();
45+
} catch (Throwable t) {
46+
s.onError(t);
47+
return;
48+
}
49+
o.unsafeSubscribe(s);
4750
}
51+
4852
}

rxjava-core/src/test/java/rx/operators/OperationDeferTest.java renamed to rxjava-core/src/test/java/rx/operators/OperatorDeferTest.java

Lines changed: 20 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -15,22 +15,18 @@
1515
*/
1616
package rx.operators;
1717

18-
import static org.mockito.Mockito.mock;
19-
import static org.mockito.Mockito.times;
20-
import static org.mockito.Mockito.verify;
21-
import static org.mockito.Mockito.verifyZeroInteractions;
22-
import static org.mockito.Mockito.when;
18+
import static org.mockito.Mockito.*;
2319

2420
import org.junit.Test;
2521

2622
import rx.Observable;
2723
import rx.Observer;
2824
import rx.functions.Func0;
2925

30-
public class OperationDeferTest {
26+
@SuppressWarnings("unchecked")
27+
public class OperatorDeferTest {
3128

3229
@Test
33-
@SuppressWarnings("unchecked")
3430
public void testDefer() throws Throwable {
3531

3632
Func0<Observable<String>> factory = mock(Func0.class);
@@ -64,4 +60,21 @@ public void testDefer() throws Throwable {
6460
verify(secondObserver, times(1)).onCompleted();
6561

6662
}
63+
64+
@Test
65+
public void testDeferFunctionThrows() {
66+
Func0<Observable<String>> factory = mock(Func0.class);
67+
68+
when(factory.call()).thenThrow(new OperationReduceTest.CustomException());
69+
70+
Observable<String> result = Observable.defer(factory);
71+
72+
Observer<String> o = mock(Observer.class);
73+
74+
result.subscribe(o);
75+
76+
verify(o).onError(any(OperationReduceTest.CustomException.class));
77+
verify(o, never()).onNext(any(String.class));
78+
verify(o, never()).onCompleted();
79+
}
6780
}

0 commit comments

Comments
 (0)