Skip to content

Commit d24bfbd

Browse files
authored
2.x: Add TCK for MulticastProcessor & {0..1}.flatMapPublisher (#6022)
1 parent 07e126f commit d24bfbd

6 files changed

+297
-0
lines changed
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
/**
2+
* Copyright (c) 2016-present, RxJava Contributors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
5+
* compliance with the License. You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is
10+
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
11+
* the License for the specific language governing permissions and limitations under the License.
12+
*/
13+
14+
package io.reactivex.tck;
15+
16+
import org.reactivestreams.Publisher;
17+
import org.testng.annotations.Test;
18+
19+
import io.reactivex.*;
20+
21+
@Test
22+
public class CompletableAndThenPublisherTckTest extends BaseTck<Integer> {
23+
24+
@Override
25+
public Publisher<Integer> createPublisher(final long elements) {
26+
return
27+
Completable.complete().hide().andThen(Flowable.range(0, (int)elements))
28+
;
29+
}
30+
}
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
/**
2+
* Copyright (c) 2016-present, RxJava Contributors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
5+
* compliance with the License. You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is
10+
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
11+
* the License for the specific language governing permissions and limitations under the License.
12+
*/
13+
14+
package io.reactivex.tck;
15+
16+
import org.reactivestreams.Publisher;
17+
import org.testng.annotations.Test;
18+
19+
import io.reactivex.*;
20+
import io.reactivex.functions.Function;
21+
22+
@Test
23+
public class MaybeFlatMapPublisherTckTest extends BaseTck<Integer> {
24+
25+
@Override
26+
public Publisher<Integer> createPublisher(final long elements) {
27+
return
28+
Maybe.just(1).hide().flatMapPublisher(new Function<Integer, Publisher<Integer>>() {
29+
@Override
30+
public Publisher<Integer> apply(Integer v)
31+
throws Exception {
32+
return Flowable.range(0, (int)elements);
33+
}
34+
})
35+
;
36+
}
37+
}
Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
/**
2+
* Copyright (c) 2016-present, RxJava Contributors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
5+
* compliance with the License. You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is
10+
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
11+
* the License for the specific language governing permissions and limitations under the License.
12+
*/
13+
14+
package io.reactivex.tck;
15+
16+
import org.reactivestreams.Publisher;
17+
import org.testng.annotations.Test;
18+
19+
import io.reactivex.processors.MulticastProcessor;
20+
import io.reactivex.schedulers.Schedulers;
21+
22+
@Test
23+
public class MulticastProcessorAsPublisherTckTest extends BaseTck<Integer> {
24+
25+
public MulticastProcessorAsPublisherTckTest() {
26+
super(100);
27+
}
28+
29+
@Override
30+
public Publisher<Integer> createPublisher(final long elements) {
31+
final MulticastProcessor<Integer> mp = MulticastProcessor.create();
32+
mp.start();
33+
34+
Schedulers.io().scheduleDirect(new Runnable() {
35+
@Override
36+
public void run() {
37+
long start = System.currentTimeMillis();
38+
while (!mp.hasSubscribers()) {
39+
try {
40+
Thread.sleep(1);
41+
} catch (InterruptedException ex) {
42+
return;
43+
}
44+
45+
if (System.currentTimeMillis() - start > 200) {
46+
return;
47+
}
48+
}
49+
50+
for (int i = 0; i < elements; i++) {
51+
while (!mp.offer(i)) {
52+
Thread.yield();
53+
if (System.currentTimeMillis() - start > 1000) {
54+
return;
55+
}
56+
}
57+
}
58+
mp.onComplete();
59+
}
60+
});
61+
return mp;
62+
}
63+
}
Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
/**
2+
* Copyright (c) 2016-present, RxJava Contributors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
5+
* compliance with the License. You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is
10+
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
11+
* the License for the specific language governing permissions and limitations under the License.
12+
*/
13+
14+
package io.reactivex.tck;
15+
16+
import java.util.concurrent.*;
17+
18+
import org.reactivestreams.*;
19+
import org.reactivestreams.tck.*;
20+
import org.testng.annotations.Test;
21+
22+
import io.reactivex.exceptions.TestException;
23+
import io.reactivex.processors.*;
24+
25+
@Test
26+
public class MulticastProcessorRefCountedTckTest extends IdentityProcessorVerification<Integer> {
27+
28+
public MulticastProcessorRefCountedTckTest() {
29+
super(new TestEnvironment(50));
30+
}
31+
32+
@Override
33+
public Processor<Integer, Integer> createIdentityProcessor(int bufferSize) {
34+
MulticastProcessor<Integer> mp = MulticastProcessor.create(true);
35+
return mp;
36+
}
37+
38+
@Override
39+
public Publisher<Integer> createFailedPublisher() {
40+
MulticastProcessor<Integer> mp = MulticastProcessor.create();
41+
mp.start();
42+
mp.onError(new TestException());
43+
return mp;
44+
}
45+
46+
@Override
47+
public ExecutorService publisherExecutorService() {
48+
return Executors.newCachedThreadPool();
49+
}
50+
51+
@Override
52+
public Integer createElement(int element) {
53+
return element;
54+
}
55+
56+
@Override
57+
public long maxSupportedSubscribers() {
58+
return 1;
59+
}
60+
61+
@Override
62+
public long maxElementsFromPublisher() {
63+
return 1024;
64+
}
65+
}
Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
/**
2+
* Copyright (c) 2016-present, RxJava Contributors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
5+
* compliance with the License. You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is
10+
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
11+
* the License for the specific language governing permissions and limitations under the License.
12+
*/
13+
14+
package io.reactivex.tck;
15+
16+
import java.util.concurrent.*;
17+
18+
import org.reactivestreams.*;
19+
import org.reactivestreams.tck.*;
20+
import org.testng.annotations.Test;
21+
22+
import io.reactivex.exceptions.TestException;
23+
import io.reactivex.processors.*;
24+
25+
@Test
26+
public class MulticastProcessorTckTest extends IdentityProcessorVerification<Integer> {
27+
28+
public MulticastProcessorTckTest() {
29+
super(new TestEnvironment(50));
30+
}
31+
32+
@Override
33+
public Processor<Integer, Integer> createIdentityProcessor(int bufferSize) {
34+
MulticastProcessor<Integer> mp = MulticastProcessor.create();
35+
return new RefCountProcessor<Integer>(mp);
36+
}
37+
38+
@Override
39+
public Publisher<Integer> createFailedPublisher() {
40+
MulticastProcessor<Integer> mp = MulticastProcessor.create();
41+
mp.start();
42+
mp.onError(new TestException());
43+
return mp;
44+
}
45+
46+
@Override
47+
public ExecutorService publisherExecutorService() {
48+
return Executors.newCachedThreadPool();
49+
}
50+
51+
@Override
52+
public Integer createElement(int element) {
53+
return element;
54+
}
55+
56+
@Override
57+
public long maxSupportedSubscribers() {
58+
return 1;
59+
}
60+
61+
@Override
62+
public long maxElementsFromPublisher() {
63+
return 1024;
64+
}
65+
}
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
/**
2+
* Copyright (c) 2016-present, RxJava Contributors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
5+
* compliance with the License. You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is
10+
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
11+
* the License for the specific language governing permissions and limitations under the License.
12+
*/
13+
14+
package io.reactivex.tck;
15+
16+
import org.reactivestreams.Publisher;
17+
import org.testng.annotations.Test;
18+
19+
import io.reactivex.*;
20+
import io.reactivex.functions.Function;
21+
22+
@Test
23+
public class SingleFlatMapFlowableTckTest extends BaseTck<Integer> {
24+
25+
@Override
26+
public Publisher<Integer> createPublisher(final long elements) {
27+
return
28+
Single.just(1).hide().flatMapPublisher(new Function<Integer, Publisher<Integer>>() {
29+
@Override
30+
public Publisher<Integer> apply(Integer v)
31+
throws Exception {
32+
return Flowable.range(0, (int)elements);
33+
}
34+
})
35+
;
36+
}
37+
}

0 commit comments

Comments
 (0)