@@ -29,33 +29,35 @@ public final class OperatorParallel<T, R> implements Operator<R, T> {
2929
3030 private final Scheduler scheduler ;
3131 private final Func1 <Observable <T >, Observable <R >> f ;
32+ private final int degreeOfParallelism ;
3233
3334 public OperatorParallel (Func1 <Observable <T >, Observable <R >> f , Scheduler scheduler ) {
3435 this .scheduler = scheduler ;
3536 this .f = f ;
37+ this .degreeOfParallelism = scheduler .degreeOfParallelism ();
3638 }
3739
3840 @ Override
3941 public Subscriber <? super T > call (Subscriber <? super R > op ) {
4042
41- Func1 <Subscriber <? super GroupedObservable <Integer , T >>, Subscriber <? super T >> groupBy =
42- new OperatorGroupBy <Integer , T >(new Func1 <T , Integer >() {
43+ Func1 <Subscriber <? super GroupedObservable <Long , T >>, Subscriber <? super T >> groupBy =
44+ new OperatorGroupBy <Long , T >(new Func1 <T , Long >() {
4345
44- int i = 0 ;
46+ long i = 0 ;
4547
4648 @ Override
47- public Integer call (T t ) {
48- return i ++ % scheduler . degreeOfParallelism () ;
49+ public Long call (T t ) {
50+ return i ++ % degreeOfParallelism ;
4951 }
5052
5153 });
5254
53- Func1 <Subscriber <? super Observable <R >>, Subscriber <? super GroupedObservable <Integer , T >>> map =
54- new OperatorMap <GroupedObservable <Integer , T >, Observable <R >>(
55- new Func1 <GroupedObservable <Integer , T >, Observable <R >>() {
55+ Func1 <Subscriber <? super Observable <R >>, Subscriber <? super GroupedObservable <Long , T >>> map =
56+ new OperatorMap <GroupedObservable <Long , T >, Observable <R >>(
57+ new Func1 <GroupedObservable <Long , T >, Observable <R >>() {
5658
5759 @ Override
58- public Observable <R > call (GroupedObservable <Integer , T > g ) {
60+ public Observable <R > call (GroupedObservable <Long , T > g ) {
5961 // Must use observeOn not subscribeOn because we have a single source behind groupBy.
6062 // The origin is already subscribed to, we are moving each group on to a new thread
6163 // but the origin itself can only be on a single thread.
0 commit comments