Skip to content

Commit 497f35f

Browse files
vanniktechakarnokd
authored andcommitted
2.x: Test static from methods and add Maybe.fromSingle & fromCompletable (#4685)
1 parent d9dabab commit 497f35f

15 files changed

+945
-33
lines changed

src/main/java/io/reactivex/Maybe.java

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -563,6 +563,42 @@ public static <T> Maybe<T> fromAction(final Action run) {
563563
return RxJavaPlugins.onAssembly(new MaybeFromAction<T>(run));
564564
}
565565

566+
/**
567+
* Wraps a CompletableSource into a Maybe.
568+
*
569+
* <dl>
570+
* <dt><b>Scheduler:</b></dt>
571+
* <dd>{@code fromCompletable} does not operate by default on a particular {@link Scheduler}.</dd>
572+
* </dl>
573+
* @param <T> the target type
574+
* @param completableSource the CompletableSource to convert from
575+
* @return the new Maybe instance
576+
* @throws NullPointerException if completable is null
577+
*/
578+
@SchedulerSupport(SchedulerSupport.NONE)
579+
public static <T> Maybe<T> fromCompletable(CompletableSource completableSource) {
580+
ObjectHelper.requireNonNull(completableSource, "completableSource is null");
581+
return RxJavaPlugins.onAssembly(new MaybeFromCompletable<T>(completableSource));
582+
}
583+
584+
/**
585+
* Wraps a SingleSource into a Maybe.
586+
*
587+
* <dl>
588+
* <dt><b>Scheduler:</b></dt>
589+
* <dd>{@code fromSingle} does not operate by default on a particular {@link Scheduler}.</dd>
590+
* </dl>
591+
* @param <T> the target type
592+
* @param singleSource the SingleSource to convert from
593+
* @return the new Maybe instance
594+
* @throws NullPointerException if single is null
595+
*/
596+
@SchedulerSupport(SchedulerSupport.NONE)
597+
public static <T> Maybe<T> fromSingle(SingleSource singleSource) {
598+
ObjectHelper.requireNonNull(singleSource, "singleSource is null");
599+
return RxJavaPlugins.onAssembly(new MaybeFromSingle<T>(singleSource));
600+
}
601+
566602
/**
567603
* Returns a {@link Maybe} that invokes passed function and emits its result for each new MaybeObserver that subscribes
568604
* while considering {@code null} value from the callable as indication for valueless completion.

src/main/java/io/reactivex/internal/operators/completable/CompletableFromObservable.java

Lines changed: 24 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -26,28 +26,34 @@ public CompletableFromObservable(ObservableSource<T> observable) {
2626

2727
@Override
2828
protected void subscribeActual(final CompletableObserver s) {
29-
observable.subscribe(new Observer<T>() {
29+
observable.subscribe(new CompletableFromObservableObserver<T>(s));
30+
}
31+
32+
static final class CompletableFromObservableObserver<T> implements Observer<T> {
33+
final CompletableObserver co;
3034

31-
@Override
32-
public void onComplete() {
33-
s.onComplete();
34-
}
35+
CompletableFromObservableObserver(CompletableObserver co) {
36+
this.co = co;
37+
}
3538

36-
@Override
37-
public void onError(Throwable e) {
38-
s.onError(e);
39-
}
39+
@Override
40+
public void onSubscribe(Disposable d) {
41+
co.onSubscribe(d);
42+
}
4043

41-
@Override
42-
public void onNext(T value) {
43-
// ignored
44-
}
44+
@Override
45+
public void onNext(T value) {
46+
// Deliberately ignored.
47+
}
4548

46-
@Override
47-
public void onSubscribe(Disposable d) {
48-
s.onSubscribe(d);
49-
}
49+
@Override
50+
public void onError(Throwable e) {
51+
co.onError(e);
52+
}
5053

51-
});
54+
@Override
55+
public void onComplete() {
56+
co.onComplete();
57+
}
5258
}
5359
}

src/main/java/io/reactivex/internal/operators/completable/CompletableFromSingle.java

Lines changed: 20 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -26,24 +26,29 @@ public CompletableFromSingle(SingleSource<T> single) {
2626

2727
@Override
2828
protected void subscribeActual(final CompletableObserver s) {
29-
single.subscribe(new SingleObserver<T>() {
29+
single.subscribe(new CompletableFromSingleObserver<T>(s));
30+
}
3031

31-
@Override
32-
public void onError(Throwable e) {
33-
s.onError(e);
34-
}
32+
static final class CompletableFromSingleObserver<T> implements SingleObserver<T> {
33+
final CompletableObserver co;
3534

36-
@Override
37-
public void onSubscribe(Disposable d) {
38-
s.onSubscribe(d);
39-
}
35+
CompletableFromSingleObserver(CompletableObserver co) {
36+
this.co = co;
37+
}
4038

41-
@Override
42-
public void onSuccess(T value) {
43-
s.onComplete();
44-
}
39+
@Override
40+
public void onError(Throwable e) {
41+
co.onError(e);
42+
}
4543

46-
});
47-
}
44+
@Override
45+
public void onSubscribe(Disposable d) {
46+
co.onSubscribe(d);
47+
}
4848

49+
@Override
50+
public void onSuccess(T value) {
51+
co.onComplete();
52+
}
53+
}
4954
}
Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,100 @@
1+
/**
2+
* Copyright 2016 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
5+
* compliance with the License. You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is
10+
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
11+
* the License for the specific language governing permissions and limitations under the License.
12+
*/
13+
14+
package io.reactivex.internal.operators.completable;
15+
16+
import io.reactivex.Completable;
17+
import io.reactivex.functions.Action;
18+
import java.util.concurrent.atomic.AtomicInteger;
19+
import org.junit.Test;
20+
21+
import static org.junit.Assert.assertEquals;
22+
23+
public class CompletableFromActionTest {
24+
@Test(expected = NullPointerException.class)
25+
public void fromActionNull() {
26+
Completable.fromAction(null);
27+
}
28+
29+
@Test
30+
public void fromAction() {
31+
final AtomicInteger atomicInteger = new AtomicInteger();
32+
33+
Completable.fromAction(new Action() {
34+
@Override
35+
public void run() throws Exception {
36+
atomicInteger.incrementAndGet();
37+
}
38+
})
39+
.test()
40+
.assertResult();
41+
42+
assertEquals(1, atomicInteger.get());
43+
}
44+
45+
@Test
46+
public void fromActionTwice() {
47+
final AtomicInteger atomicInteger = new AtomicInteger();
48+
49+
Action run = new Action() {
50+
@Override
51+
public void run() throws Exception {
52+
atomicInteger.incrementAndGet();
53+
}
54+
};
55+
56+
Completable.fromAction(run)
57+
.test()
58+
.assertResult();
59+
60+
assertEquals(1, atomicInteger.get());
61+
62+
Completable.fromAction(run)
63+
.test()
64+
.assertResult();
65+
66+
assertEquals(2, atomicInteger.get());
67+
}
68+
69+
@Test
70+
public void fromActionInvokesLazy() {
71+
final AtomicInteger atomicInteger = new AtomicInteger();
72+
73+
Completable completable = Completable.fromAction(new Action() {
74+
@Override
75+
public void run() throws Exception {
76+
atomicInteger.incrementAndGet();
77+
}
78+
});
79+
80+
assertEquals(0, atomicInteger.get());
81+
82+
completable
83+
.test()
84+
.assertResult();
85+
86+
assertEquals(1, atomicInteger.get());
87+
}
88+
89+
@Test
90+
public void fromActionThrows() {
91+
Completable.fromAction(new Action() {
92+
@Override
93+
public void run() throws Exception {
94+
throw new UnsupportedOperationException();
95+
}
96+
})
97+
.test()
98+
.assertFailure(UnsupportedOperationException.class);
99+
}
100+
}
Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,103 @@
1+
/**
2+
* Copyright 2016 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
5+
* compliance with the License. You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is
10+
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
11+
* the License for the specific language governing permissions and limitations under the License.
12+
*/
13+
14+
package io.reactivex.internal.operators.completable;
15+
16+
import io.reactivex.Completable;
17+
import java.util.concurrent.Callable;
18+
import java.util.concurrent.atomic.AtomicInteger;
19+
import org.junit.Test;
20+
21+
import static org.junit.Assert.assertEquals;
22+
23+
public class CompletableFromCallableTest {
24+
@Test(expected = NullPointerException.class)
25+
public void fromCallableNull() {
26+
Completable.fromCallable(null);
27+
}
28+
29+
@Test
30+
public void fromCallable() {
31+
final AtomicInteger atomicInteger = new AtomicInteger();
32+
33+
Completable.fromCallable(new Callable<Object>() {
34+
@Override
35+
public Object call() throws Exception {
36+
atomicInteger.incrementAndGet();
37+
return null;
38+
}
39+
})
40+
.test()
41+
.assertResult();
42+
43+
assertEquals(1, atomicInteger.get());
44+
}
45+
46+
@Test
47+
public void fromCallableTwice() {
48+
final AtomicInteger atomicInteger = new AtomicInteger();
49+
50+
Callable<Object> callable = new Callable<Object>() {
51+
@Override
52+
public Object call() throws Exception {
53+
atomicInteger.incrementAndGet();
54+
return null;
55+
}
56+
};
57+
58+
Completable.fromCallable(callable)
59+
.test()
60+
.assertResult();
61+
62+
assertEquals(1, atomicInteger.get());
63+
64+
Completable.fromCallable(callable)
65+
.test()
66+
.assertResult();
67+
68+
assertEquals(2, atomicInteger.get());
69+
}
70+
71+
@Test
72+
public void fromCallableInvokesLazy() {
73+
final AtomicInteger atomicInteger = new AtomicInteger();
74+
75+
Completable completable = Completable.fromCallable(new Callable<Object>() {
76+
@Override
77+
public Object call() throws Exception {
78+
atomicInteger.incrementAndGet();
79+
return null;
80+
}
81+
});
82+
83+
assertEquals(0, atomicInteger.get());
84+
85+
completable
86+
.test()
87+
.assertResult();
88+
89+
assertEquals(1, atomicInteger.get());
90+
}
91+
92+
@Test
93+
public void fromCallableThrows() {
94+
Completable.fromCallable(new Callable<Object>() {
95+
@Override
96+
public Object call() throws Exception {
97+
throw new UnsupportedOperationException();
98+
}
99+
})
100+
.test()
101+
.assertFailure(UnsupportedOperationException.class);
102+
}
103+
}
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
/**
2+
* Copyright 2016 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
5+
* compliance with the License. You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is
10+
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
11+
* the License for the specific language governing permissions and limitations under the License.
12+
*/
13+
14+
package io.reactivex.internal.operators.completable;
15+
16+
import io.reactivex.Completable;
17+
import io.reactivex.Observable;
18+
import org.junit.Test;
19+
20+
public class CompletableFromObservableTest {
21+
@Test(expected = NullPointerException.class)
22+
public void fromObservableNull() {
23+
Completable.fromObservable(null);
24+
}
25+
26+
@Test
27+
public void fromObservable() {
28+
Completable.fromObservable(Observable.just(1))
29+
.test()
30+
.assertResult();
31+
}
32+
33+
@Test
34+
public void fromObservableEmpty() {
35+
Completable.fromObservable(Observable.empty())
36+
.test()
37+
.assertResult();
38+
}
39+
40+
@Test
41+
public void fromObservableError() {
42+
Completable.fromObservable(Observable.error(new UnsupportedOperationException()))
43+
.test()
44+
.assertFailure(UnsupportedOperationException.class);
45+
}
46+
}

0 commit comments

Comments
 (0)