Skip to content

Commit 177d468

Browse files
RomanWuattierakarnokd
authored andcommitted
Have unit tests extends RxJavaTest - 2 (#6594)
This commit updates the unit tests of the following operators: * flowable * maybe * observable * disposable * observer * parallel * processors * schedulers and RxJava plugin. Related: #6583
1 parent 663e5a2 commit 177d468

File tree

92 files changed

+171
-1853
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

92 files changed

+171
-1853
lines changed

src/test/java/io/reactivex/flowable/FlowableBackpressureTests.java

Lines changed: 8 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import java.util.concurrent.*;
2020
import java.util.concurrent.atomic.*;
2121

22+
import io.reactivex.RxJavaTest;
2223
import org.junit.*;
2324
import org.junit.rules.TestName;
2425
import org.reactivestreams.*;
@@ -31,7 +32,7 @@
3132
import io.reactivex.schedulers.Schedulers;
3233
import io.reactivex.subscribers.*;
3334

34-
public class FlowableBackpressureTests {
35+
public class FlowableBackpressureTests extends RxJavaTest {
3536

3637
static final class FirehoseNoBackpressure extends AtomicBoolean implements Subscription {
3738

@@ -235,36 +236,6 @@ public Publisher<Integer> apply(Integer i) {
235236
assertTrue(c.get() < Flowable.bufferSize());
236237
}
237238

238-
@Test
239-
@Ignore("The test is non-deterministic and can't be made deterministic")
240-
public void flatMapAsync() {
241-
int num = (int) (Flowable.bufferSize() * 2.1);
242-
AtomicInteger c = new AtomicInteger();
243-
TestSubscriber<Integer> ts = new TestSubscriber<Integer>();
244-
245-
incrementingIntegers(c)
246-
.subscribeOn(Schedulers.computation())
247-
.flatMap(new Function<Integer, Publisher<Integer>>() {
248-
@Override
249-
public Publisher<Integer> apply(Integer i) {
250-
return incrementingIntegers(new AtomicInteger())
251-
.take(10)
252-
.subscribeOn(Schedulers.computation());
253-
}
254-
}
255-
)
256-
.take(num).subscribe(ts);
257-
258-
ts.awaitDone(5, TimeUnit.SECONDS);
259-
ts.assertNoErrors();
260-
System.out.println("testFlatMapAsync => Received: " + ts.values().size() + " Emitted: " + c.get() + " Size: " + Flowable.bufferSize());
261-
assertEquals(num, ts.values().size());
262-
// even though we only need 10, it will request at least Flowable.bufferSize(), and then as it drains keep requesting more
263-
// and then it will be non-deterministic when the take() causes the unsubscribe as it is scheduled on 10 different schedulers (threads)
264-
// normally this number is ~250 but can get up to ~1200 when Flowable.bufferSize() == 1024
265-
assertTrue(c.get() <= Flowable.bufferSize() * 2);
266-
}
267-
268239
@Test
269240
public void zipSync() {
270241
int num = (int) (Flowable.bufferSize() * 4.1);
@@ -479,7 +450,7 @@ public void onNext(Integer t) {
479450
assertEquals(20, batches.get());
480451
}
481452

482-
@Test(timeout = 2000)
453+
@Test
483454
public void firehoseFailsAsExpected() {
484455
AtomicInteger c = new AtomicInteger();
485456
TestSubscriber<Integer> ts = new TestSubscriber<Integer>();
@@ -515,7 +486,7 @@ public void firehoseFailsAsExpectedLoop() {
515486
}
516487
}
517488

518-
@Test(timeout = 10000)
489+
@Test
519490
public void onBackpressureDrop() {
520491
long t = System.currentTimeMillis();
521492
for (int i = 0; i < 100; i++) {
@@ -544,7 +515,7 @@ public void onBackpressureDrop() {
544515
}
545516
}
546517

547-
@Test(timeout = 20000)
518+
@Test
548519
public void onBackpressureDropWithAction() {
549520
for (int i = 0; i < 100; i++) {
550521
final AtomicInteger emitCount = new AtomicInteger();
@@ -586,7 +557,7 @@ public void accept(Integer v) {
586557
}
587558
}
588559

589-
@Test(timeout = 10000)
560+
@Test
590561
public void onBackpressureDropSynchronous() {
591562
for (int i = 0; i < 100; i++) {
592563
int num = (int) (Flowable.bufferSize() * 1.1); // > 1 so that take doesn't prevent buffer overflow
@@ -608,7 +579,7 @@ public void onBackpressureDropSynchronous() {
608579
}
609580
}
610581

611-
@Test(timeout = 10000)
582+
@Test
612583
public void onBackpressureDropSynchronousWithAction() {
613584
for (int i = 0; i < 100; i++) {
614585
final AtomicInteger dropCount = new AtomicInteger();
@@ -639,7 +610,7 @@ public void accept(Integer j) {
639610
}
640611
}
641612

642-
@Test(timeout = 2000)
613+
@Test
643614
public void onBackpressureBuffer() {
644615
int num = (int) (Flowable.bufferSize() * 1.1); // > 1 so that take doesn't prevent buffer overflow
645616
AtomicInteger c = new AtomicInteger();

src/test/java/io/reactivex/flowable/FlowableCollectTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@
2828
import io.reactivex.plugins.RxJavaPlugins;
2929
import io.reactivex.testsupport.TestHelper;
3030

31-
public final class FlowableCollectTest {
31+
public final class FlowableCollectTest extends RxJavaTest {
3232

3333
@Test
3434
public void collectToListFlowable() {

src/test/java/io/reactivex/flowable/FlowableCombineLatestTests.java

Lines changed: 2 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -15,17 +15,14 @@
1515
*/
1616
package io.reactivex.flowable;
1717

18-
import static io.reactivex.Flowable.combineLatest;
19-
import static org.junit.Assert.assertNull;
20-
18+
import io.reactivex.RxJavaTest;
2119
import org.junit.*;
2220

2321
import io.reactivex.Flowable;
2422
import io.reactivex.flowable.FlowableCovarianceTest.*;
2523
import io.reactivex.functions.*;
26-
import io.reactivex.processors.BehaviorProcessor;
2724

28-
public class FlowableCombineLatestTests {
25+
public class FlowableCombineLatestTests extends RxJavaTest {
2926
/**
3027
* This won't compile if super/extends isn't done correctly on generics.
3128
*/
@@ -63,25 +60,4 @@ public void accept(ExtendedResult t1) {
6360
System.out.println("Result: " + t1);
6461
}
6562
};
66-
67-
@Ignore("No longer allowed")
68-
@Test
69-
public void nullEmitting() throws Exception {
70-
// FIXME this is no longer allowed
71-
Flowable<Boolean> nullObservable = BehaviorProcessor.createDefault((Boolean) null);
72-
Flowable<Boolean> nonNullObservable = BehaviorProcessor.createDefault(true);
73-
Flowable<Boolean> combined =
74-
combineLatest(nullObservable, nonNullObservable, new BiFunction<Boolean, Boolean, Boolean>() {
75-
@Override
76-
public Boolean apply(Boolean bool1, Boolean bool2) {
77-
return bool1 == null ? null : bool2;
78-
}
79-
});
80-
combined.subscribe(new Consumer<Boolean>() {
81-
@Override
82-
public void accept(Boolean aBoolean) {
83-
assertNull(aBoolean);
84-
}
85-
});
86-
}
8763
}

src/test/java/io/reactivex/flowable/FlowableConcatTests.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,13 +16,14 @@
1616

1717
import java.util.*;
1818

19+
import io.reactivex.RxJavaTest;
1920
import org.junit.Test;
2021
import org.reactivestreams.*;
2122

2223
import io.reactivex.Flowable;
2324
import io.reactivex.flowable.FlowableCovarianceTest.*;
2425

25-
public class FlowableConcatTests {
26+
public class FlowableConcatTests extends RxJavaTest {
2627

2728
@Test
2829
public void concatSimple() {

src/test/java/io/reactivex/flowable/FlowableConversionTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@
2828
import io.reactivex.schedulers.Schedulers;
2929
import io.reactivex.subscribers.DefaultSubscriber;
3030

31-
public class FlowableConversionTest {
31+
public class FlowableConversionTest extends RxJavaTest {
3232

3333
public static class Cylon { }
3434

src/test/java/io/reactivex/flowable/FlowableCovarianceTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@
3333
*
3434
* See https://github.com/Netflix/RxJava/pull/331
3535
*/
36-
public class FlowableCovarianceTest {
36+
public class FlowableCovarianceTest extends RxJavaTest {
3737

3838
/**
3939
* This won't compile if super/extends isn't done correctly on generics.

src/test/java/io/reactivex/flowable/FlowableDoAfterNextTest.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,12 @@
1919

2020
import java.util.concurrent.atomic.AtomicInteger;
2121

22+
import io.reactivex.RxJavaTest;
2223
import org.junit.Test;
2324

2425
import io.reactivex.functions.Consumer;
2526

26-
public class FlowableDoAfterNextTest {
27+
public class FlowableDoAfterNextTest extends RxJavaTest {
2728

2829
@Test
2930
public void ifFunctionThrowsThatNoMoreEventsAreProcessed() {

src/test/java/io/reactivex/flowable/FlowableDoOnTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@
2323
import io.reactivex.exceptions.TestException;
2424
import io.reactivex.functions.*;
2525

26-
public class FlowableDoOnTest {
26+
public class FlowableDoOnTest extends RxJavaTest {
2727

2828
@Test
2929
public void doOnEach() {

src/test/java/io/reactivex/flowable/FlowableErrorHandlingTests.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,14 +18,15 @@
1818
import java.util.concurrent.*;
1919
import java.util.concurrent.atomic.AtomicReference;
2020

21+
import io.reactivex.RxJavaTest;
2122
import org.junit.Test;
2223
import org.reactivestreams.Subscriber;
2324

2425
import io.reactivex.Flowable;
2526
import io.reactivex.schedulers.Schedulers;
2627
import io.reactivex.subscribers.DefaultSubscriber;
2728

28-
public class FlowableErrorHandlingTests {
29+
public class FlowableErrorHandlingTests extends RxJavaTest {
2930

3031
/**
3132
* Test that an error from a user provided Observer.onNext

src/test/java/io/reactivex/flowable/FlowableEventStreamTest.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,11 +16,12 @@
1616

1717
package io.reactivex.flowable;
1818

19+
import io.reactivex.RxJavaTest;
1920
import org.junit.Test;
2021

2122
import io.reactivex.testsupport.TestHelper;
2223

23-
public class FlowableEventStreamTest {
24+
public class FlowableEventStreamTest extends RxJavaTest {
2425
@Test
2526
public void constructorShouldBePrivate() {
2627
TestHelper.checkUtilityClass(FlowableEventStream.class);

0 commit comments

Comments
 (0)