Skip to content

Commit 06f5d83

Browse files
Bind implementation of Parallel
1 parent 02ccc4d commit 06f5d83

File tree

4 files changed

+66
-63
lines changed

4 files changed

+66
-63
lines changed

rxjava-core/src/main/java/rx/Observable.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@
7171
import rx.operators.OperationOnErrorResumeNextViaObservable;
7272
import rx.operators.OperationOnErrorReturn;
7373
import rx.operators.OperationOnExceptionResumeNextViaObservable;
74-
import rx.operators.OperationParallel;
74+
import rx.operators.OperatorParallel;
7575
import rx.operators.OperationParallelMerge;
7676
import rx.operators.OperationRepeat;
7777
import rx.operators.OperationReplay;
@@ -5550,7 +5550,7 @@ public final Observable<T> onExceptionResumeNext(final Observable<? extends T> r
55505550
* @see <a href="https://github.com/Netflix/RxJava/wiki/Observable-Utility-Operators#parallel">RxJava Wiki: parallel()</a>
55515551
*/
55525552
public final <R> Observable<R> parallel(Func1<Observable<T>, Observable<R>> f) {
5553-
return OperationParallel.parallel(this, f);
5553+
return bind(new OperatorParallel<T, R>(f, Schedulers.computation()));
55545554
}
55555555

55565556
/**
@@ -5569,7 +5569,7 @@ public final <R> Observable<R> parallel(Func1<Observable<T>, Observable<R>> f) {
55695569
* @see <a href="https://github.com/Netflix/RxJava/wiki/Observable-Utility-Operators#parallel">RxJava Wiki: parallel()</a>
55705570
*/
55715571
public final <R> Observable<R> parallel(final Func1<Observable<T>, Observable<R>> f, final Scheduler s) {
5572-
return OperationParallel.parallel(this, f, s);
5572+
return bind(new OperatorParallel<T, R>(f, s));
55735573
}
55745574

55755575
/**

rxjava-core/src/main/java/rx/operators/OperationParallel.java

Lines changed: 0 additions & 59 deletions
This file was deleted.
Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
/**
2+
* Copyright 2014 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package rx.operators;
17+
18+
import rx.Observable;
19+
import rx.Operator;
20+
import rx.Scheduler;
21+
import rx.observables.GroupedObservable;
22+
import rx.util.functions.Func1;
23+
24+
/**
25+
* Identifies unit of work that can be executed in parallel on a given Scheduler.
26+
*/
27+
public final class OperatorParallel<T, R> implements Func1<Operator<? super R>, Operator<? super T>> {
28+
29+
private final Scheduler scheduler;
30+
private final Func1<Observable<T>, Observable<R>> f;
31+
32+
public OperatorParallel(Func1<Observable<T>, Observable<R>> f, Scheduler scheduler) {
33+
this.scheduler = scheduler;
34+
this.f = f;
35+
}
36+
37+
@Override
38+
public Operator<? super T> call(Operator<? super R> op) {
39+
40+
Func1<Operator<? super GroupedObservable<Integer, T>>, Operator<? super T>> groupBy = new OperatorGroupBy<Integer, T>(new Func1<T, Integer>() {
41+
42+
int i = 0;
43+
44+
@Override
45+
public Integer call(T t) {
46+
return i++ % scheduler.degreeOfParallelism();
47+
}
48+
49+
});
50+
51+
Func1<Operator<? super Observable<R>>, Operator<? super GroupedObservable<Integer, T>>> map = new OperatorMap<GroupedObservable<Integer, T>, Observable<R>>(new Func1<GroupedObservable<Integer, T>, Observable<R>>() {
52+
53+
@Override
54+
public Observable<R> call(GroupedObservable<Integer, T> g) {
55+
return f.call(g.observeOn(scheduler));
56+
}
57+
});
58+
59+
// bind together operators
60+
return groupBy.call(map.call(new OperatorMerge().call(op)));
61+
}
62+
}

rxjava-core/src/test/java/rx/operators/OperationParallelTest.java renamed to rxjava-core/src/test/java/rx/operators/OperatorParallelTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@
2525
import rx.util.functions.Action1;
2626
import rx.util.functions.Func1;
2727

28-
public class OperationParallelTest {
28+
public class OperatorParallelTest {
2929

3030
@Test
3131
public void testParallel() {

0 commit comments

Comments
 (0)