Skip to content

Commit b0bc478

Browse files
ggikkoakarnokd
authored andcommitted
2.x: coverage, add SingleToFlowableTest (#5673)
1 parent 7f2ceb4 commit b0bc478

File tree

2 files changed

+93
-0
lines changed

2 files changed

+93
-0
lines changed

src/test/java/io/reactivex/TestHelper.java

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1230,6 +1230,60 @@ protected void subscribeActual(SingleObserver<? super T> observer) {
12301230
}
12311231
}
12321232

1233+
/**
1234+
* Check if the given transformed reactive type reports multiple onSubscribe calls to
1235+
* RxJavaPlugins.
1236+
* @param <T> the input value type
1237+
* @param <R> the output value type
1238+
* @param transform the transform to drive an operator
1239+
*/
1240+
public static <T, R> void checkDoubleOnSubscribeSingleToFlowable(Function<Single<T>, ? extends Publisher<R>> transform) {
1241+
List<Throwable> errors = trackPluginErrors();
1242+
try {
1243+
final Boolean[] b = { null, null };
1244+
final CountDownLatch cdl = new CountDownLatch(1);
1245+
1246+
Single<T> source = new Single<T>() {
1247+
@Override
1248+
protected void subscribeActual(SingleObserver<? super T> observer) {
1249+
try {
1250+
Disposable d1 = Disposables.empty();
1251+
1252+
observer.onSubscribe(d1);
1253+
1254+
Disposable d2 = Disposables.empty();
1255+
1256+
observer.onSubscribe(d2);
1257+
1258+
b[0] = d1.isDisposed();
1259+
b[1] = d2.isDisposed();
1260+
} finally {
1261+
cdl.countDown();
1262+
}
1263+
}
1264+
};
1265+
1266+
Publisher<R> out = transform.apply(source);
1267+
1268+
out.subscribe(NoOpConsumer.INSTANCE);
1269+
1270+
try {
1271+
assertTrue("Timed out", cdl.await(5, TimeUnit.SECONDS));
1272+
} catch (InterruptedException ex) {
1273+
throw ExceptionHelper.wrapOrThrow(ex);
1274+
}
1275+
1276+
assertEquals("First disposed?", false, b[0]);
1277+
assertEquals("Second not disposed?", true, b[1]);
1278+
1279+
assertError(errors, 0, IllegalStateException.class, "Disposable already set!");
1280+
} catch (Throwable ex) {
1281+
throw ExceptionHelper.wrapOrThrow(ex);
1282+
} finally {
1283+
RxJavaPlugins.reset();
1284+
}
1285+
}
1286+
12331287
/**
12341288
* Check if the given transformed reactive type reports multiple onSubscribe calls to
12351289
* RxJavaPlugins.
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
/**
2+
* Copyright (c) 2016-present, RxJava Contributors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
5+
* compliance with the License. You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is
10+
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
11+
* the License for the specific language governing permissions and limitations under the License.
12+
*/
13+
14+
package io.reactivex.internal.operators.single;
15+
16+
import io.reactivex.Single;
17+
import io.reactivex.TestHelper;
18+
import io.reactivex.functions.Function;
19+
import io.reactivex.subjects.PublishSubject;
20+
import org.junit.Test;
21+
import org.reactivestreams.Publisher;
22+
23+
public class SingleToFlowableTest {
24+
25+
@Test
26+
public void dispose() {
27+
TestHelper.checkDisposed(PublishSubject.create().singleOrError().toFlowable());
28+
}
29+
30+
@Test
31+
public void doubleOnSubscribe() {
32+
TestHelper.checkDoubleOnSubscribeSingleToFlowable(new Function<Single<Object>, Publisher<Object>>() {
33+
@Override
34+
public Publisher<Object> apply(Single<Object> s) throws Exception {
35+
return s.toFlowable();
36+
}
37+
});
38+
}
39+
}

0 commit comments

Comments
 (0)