Skip to content

Commit 42b7e4b

Browse files
Merge branch 'OperatorParallelMerge' of github.com:akarnokd/RxJava into merge-prs
Conflicts: rxjava-core/src/main/java/rx/Observable.java
2 parents d868724 + cd0a844 commit 42b7e4b

File tree

3 files changed

+10
-9
lines changed

3 files changed

+10
-9
lines changed

rxjava-core/src/main/java/rx/Observable.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,6 @@
4949
import rx.operators.OnSubscribeFromIterable;
5050
import rx.operators.OnSubscribeRange;
5151
import rx.operators.OperationDelay;
52-
import rx.operators.OperationParallelMerge;
5352
import rx.operators.OperationSequenceEqual;
5453
import rx.operators.OperationSkip;
5554
import rx.operators.OperationSkipUntil;
@@ -101,6 +100,7 @@
101100
import rx.operators.OperatorOnErrorReturn;
102101
import rx.operators.OperatorOnExceptionResumeNextViaObservable;
103102
import rx.operators.OperatorParallel;
103+
import rx.operators.OperatorParallelMerge;
104104
import rx.operators.OperatorPivot;
105105
import rx.operators.OperatorRepeat;
106106
import rx.operators.OperatorReplay;
@@ -2312,7 +2312,7 @@ public final static <T> Observable<T> never() {
23122312
* @see <a href="https://github.com/Netflix/RxJava/wiki/Combining-Observables#wiki-parallelmerge">RxJava Wiki: parallelMerge()</a>
23132313
*/
23142314
public final static <T> Observable<Observable<T>> parallelMerge(Observable<Observable<T>> source, int parallelObservables) {
2315-
return OperationParallelMerge.parallelMerge(source, parallelObservables);
2315+
return OperatorParallelMerge.parallelMerge(source, parallelObservables);
23162316
}
23172317

23182318
/**
@@ -2335,7 +2335,7 @@ public final static <T> Observable<Observable<T>> parallelMerge(Observable<Obser
23352335
* @see <a href="https://github.com/Netflix/RxJava/wiki/Combining-Observables#wiki-parallelmerge">RxJava Wiki: parallelMerge()</a>
23362336
*/
23372337
public final static <T> Observable<Observable<T>> parallelMerge(Observable<Observable<T>> source, int parallelObservables, Scheduler scheduler) {
2338-
return OperationParallelMerge.parallelMerge(source, parallelObservables, scheduler);
2338+
return OperatorParallelMerge.parallelMerge(source, parallelObservables, scheduler);
23392339
}
23402340

23412341
/**

rxjava-core/src/main/java/rx/operators/OperationParallelMerge.java renamed to rxjava-core/src/main/java/rx/operators/OperatorParallelMerge.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,8 @@
2323
import rx.observables.GroupedObservable;
2424
import rx.schedulers.Schedulers;
2525

26-
public class OperationParallelMerge {
26+
public final class OperatorParallelMerge {
27+
private OperatorParallelMerge() { throw new IllegalStateException("No instances!"); }
2728

2829
public static <T> Observable<Observable<T>> parallelMerge(final Observable<Observable<T>> source, final int parallelObservables) {
2930
return parallelMerge(source, parallelObservables, Schedulers.immediate());

rxjava-core/src/test/java/rx/operators/OperationParallelMergeTest.java renamed to rxjava-core/src/test/java/rx/operators/OperatorParallelMergeTest.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@
3030
import rx.schedulers.Schedulers;
3131
import rx.subjects.PublishSubject;
3232

33-
public class OperationParallelMergeTest {
33+
public class OperatorParallelMergeTest {
3434

3535
@Test
3636
public void testParallelMerge() {
@@ -41,8 +41,8 @@ public void testParallelMerge() {
4141

4242
Observable<Observable<String>> fourStreams = Observable.<Observable<String>> from(p1, p2, p3, p4);
4343

44-
Observable<Observable<String>> twoStreams = OperationParallelMerge.parallelMerge(fourStreams, 2);
45-
Observable<Observable<String>> threeStreams = OperationParallelMerge.parallelMerge(fourStreams, 3);
44+
Observable<Observable<String>> twoStreams = Observable.parallelMerge(fourStreams, 2);
45+
Observable<Observable<String>> threeStreams = Observable.parallelMerge(fourStreams, 3);
4646

4747
List<? super Observable<String>> fourList = fourStreams.toList().toBlockingObservable().last();
4848
List<? super Observable<String>> threeList = threeStreams.toList().toBlockingObservable().last();
@@ -62,7 +62,7 @@ public void testNumberOfThreads() {
6262
final ConcurrentHashMap<Long, Long> threads = new ConcurrentHashMap<Long, Long>();
6363
// parallelMerge into 3 streams and observeOn for each
6464
// we expect 3 threads in the output
65-
OperationParallelMerge.parallelMerge(getStreams(), 3)
65+
Observable.parallelMerge(getStreams(), 3)
6666
.flatMap(new Func1<Observable<String>, Observable<String>>() {
6767

6868
@Override
@@ -89,7 +89,7 @@ public void testNumberOfThreadsOnScheduledMerge() {
8989

9090
// now we parallelMerge into 3 streams and observeOn for each
9191
// we expect 3 threads in the output
92-
Observable.merge(OperationParallelMerge.parallelMerge(getStreams(), 3, Schedulers.newThread()))
92+
Observable.merge(Observable.parallelMerge(getStreams(), 3, Schedulers.newThread()))
9393
.toBlockingObservable().forEach(new Action1<String>() {
9494

9595
@Override

0 commit comments

Comments
 (0)