Skip to content

Commit 36dceba

Browse files
Change Parallel to use Long instead of Int
1 parent 84372e1 commit 36dceba

File tree

2 files changed

+77
-9
lines changed

2 files changed

+77
-9
lines changed

rxjava-core/src/main/java/rx/operators/OperatorParallel.java

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -29,33 +29,35 @@ public final class OperatorParallel<T, R> implements Operator<R, T> {
2929

3030
private final Scheduler scheduler;
3131
private final Func1<Observable<T>, Observable<R>> f;
32+
private final int degreeOfParallelism;
3233

3334
public OperatorParallel(Func1<Observable<T>, Observable<R>> f, Scheduler scheduler) {
3435
this.scheduler = scheduler;
3536
this.f = f;
37+
this.degreeOfParallelism = scheduler.degreeOfParallelism();
3638
}
3739

3840
@Override
3941
public Subscriber<? super T> call(Subscriber<? super R> op) {
4042

41-
Func1<Subscriber<? super GroupedObservable<Integer, T>>, Subscriber<? super T>> groupBy =
42-
new OperatorGroupBy<Integer, T>(new Func1<T, Integer>() {
43+
Func1<Subscriber<? super GroupedObservable<Long, T>>, Subscriber<? super T>> groupBy =
44+
new OperatorGroupBy<Long, T>(new Func1<T, Long>() {
4345

44-
int i = 0;
46+
long i = 0;
4547

4648
@Override
47-
public Integer call(T t) {
48-
return i++ % scheduler.degreeOfParallelism();
49+
public Long call(T t) {
50+
return i++ % degreeOfParallelism;
4951
}
5052

5153
});
5254

53-
Func1<Subscriber<? super Observable<R>>, Subscriber<? super GroupedObservable<Integer, T>>> map =
54-
new OperatorMap<GroupedObservable<Integer, T>, Observable<R>>(
55-
new Func1<GroupedObservable<Integer, T>, Observable<R>>() {
55+
Func1<Subscriber<? super Observable<R>>, Subscriber<? super GroupedObservable<Long, T>>> map =
56+
new OperatorMap<GroupedObservable<Long, T>, Observable<R>>(
57+
new Func1<GroupedObservable<Long, T>, Observable<R>>() {
5658

5759
@Override
58-
public Observable<R> call(GroupedObservable<Integer, T> g) {
60+
public Observable<R> call(GroupedObservable<Long, T> g) {
5961
// Must use observeOn not subscribeOn because we have a single source behind groupBy.
6062
// The origin is already subscribed to, we are moving each group on to a new thread
6163
// but the origin itself can only be on a single thread.
Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
package rx.operators;
2+
3+
import rx.Observable;
4+
import rx.functions.Action0;
5+
import rx.functions.Func1;
6+
import rx.perf.AbstractPerformanceTester;
7+
import rx.perf.IntegerSumObserver;
8+
9+
public class OperatorParallelPerformance extends AbstractPerformanceTester {
10+
11+
private final static int REPS = 10000000;
12+
13+
OperatorParallelPerformance() {
14+
super(REPS);
15+
}
16+
17+
public static void main(String args[]) {
18+
19+
final OperatorParallelPerformance spt = new OperatorParallelPerformance();
20+
try {
21+
spt.runTest(new Action0() {
22+
23+
@Override
24+
public void call() {
25+
spt.parallelSum();
26+
}
27+
});
28+
} catch (Exception e) {
29+
e.printStackTrace();
30+
}
31+
32+
}
33+
34+
/**
35+
*
36+
* Run: 10 - 11,220,888 ops/sec
37+
* Run: 11 - 12,372,424 ops/sec
38+
* Run: 12 - 11,028,921 ops/sec
39+
* Run: 13 - 11,813,711 ops/sec
40+
* Run: 14 - 12,098,364 ops/sec
41+
*
42+
*/
43+
public long parallelSum() {
44+
45+
Observable<Integer> s = Observable.range(1, REPS).parallel(new Func1<Observable<Integer>, Observable<Integer>>() {
46+
47+
@Override
48+
public Observable<Integer> call(Observable<Integer> l) {
49+
return l.map(new Func1<Integer, Integer>() {
50+
51+
@Override
52+
public Integer call(Integer i) {
53+
return i + 1;
54+
}
55+
56+
});
57+
}
58+
59+
});
60+
IntegerSumObserver o = new IntegerSumObserver();
61+
62+
s.subscribe(o);
63+
return o.sum;
64+
}
65+
66+
}

0 commit comments

Comments
 (0)