Skip to content

Commit 53c30d3

Browse files
Parallel Operator
This operator came out of discussions and work with @headinthebox to allow explicit and composable declaration of blocks of work that can be scheduled for parallel execution. An Observable event stream will be sharded using groupBy using a value from Scheduler. degreeOfParallelism() (defaulting to number of CPU cores) and perform the defined work in parallel. Instead of having various parallel operators like parallelMap, parallelFilter parallelScan etc this can work generically for any operators or sequence of operators.
1 parent 1ea15ba commit 53c30d3

File tree

4 files changed

+162
-7
lines changed

4 files changed

+162
-7
lines changed
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
package rx.lang.groovy
2+
3+
import org.junit.Test
4+
5+
import rx.Observable
6+
import rx.Scheduler
7+
import rx.concurrency.Schedulers
8+
import rx.util.functions.Func1
9+
10+
class TestParallel {
11+
12+
@Test
13+
public void testParallelOperator() {
14+
Observable.range(0, 100)
15+
.parallel({
16+
it.map({ return it; })
17+
})
18+
.toBlockingObservable()
19+
.forEach({ println("T: " + it + " Thread: " + Thread.currentThread()); });
20+
}
21+
}

rxjava-core/src/main/java/rx/Observable.java

Lines changed: 30 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -34,10 +34,11 @@
3434
import rx.operators.OperationCache;
3535
import rx.operators.OperationCombineLatest;
3636
import rx.operators.OperationConcat;
37+
import rx.operators.OperationDebounce;
3738
import rx.operators.OperationDefer;
3839
import rx.operators.OperationDematerialize;
39-
import rx.operators.OperationDistinctUntilChanged;
4040
import rx.operators.OperationDistinct;
41+
import rx.operators.OperationDistinctUntilChanged;
4142
import rx.operators.OperationFilter;
4243
import rx.operators.OperationFinally;
4344
import rx.operators.OperationFirstOrDefault;
@@ -53,6 +54,7 @@
5354
import rx.operators.OperationOnErrorResumeNextViaObservable;
5455
import rx.operators.OperationOnErrorReturn;
5556
import rx.operators.OperationOnExceptionResumeNextViaObservable;
57+
import rx.operators.OperationParallel;
5658
import rx.operators.OperationRetry;
5759
import rx.operators.OperationSample;
5860
import rx.operators.OperationScan;
@@ -67,7 +69,6 @@
6769
import rx.operators.OperationTakeUntil;
6870
import rx.operators.OperationTakeWhile;
6971
import rx.operators.OperationThrottleFirst;
70-
import rx.operators.OperationDebounce;
7172
import rx.operators.OperationTimestamp;
7273
import rx.operators.OperationToObservableFuture;
7374
import rx.operators.OperationToObservableIterable;
@@ -1773,15 +1774,13 @@ public static <T> Observable<T> switchOnNext(Observable<? extends Observable<? e
17731774
* its {@link Observer}s; it invokes {@code onCompleted} or {@code onError} only once; and it never invokes {@code onNext} after invoking either {@code onCompleted} or {@code onError}.
17741775
* {@code synchronize} enforces this, and the Observable it returns invokes {@code onNext} and {@code onCompleted} or {@code onError} synchronously.
17751776
*
1776-
* @param observable
1777-
* the source Observable
17781777
* @param <T>
17791778
* the type of item emitted by the source Observable
17801779
* @return an Observable that is a chronologically well-behaved version of the source
17811780
* Observable, and that synchronously notifies its {@link Observer}s
17821781
*/
1783-
public static <T> Observable<T> synchronize(Observable<? extends T> observable) {
1784-
return create(OperationSynchronize.synchronize(observable));
1782+
public Observable<T> synchronize() {
1783+
return create(OperationSynchronize.synchronize(this));
17851784
}
17861785

17871786

@@ -3484,6 +3483,31 @@ public Observable<T> cache() {
34843483
return create(OperationCache.cache(this));
34853484
}
34863485

3486+
/**
3487+
* Perform work in parallel by sharding an {@code Observable<T>} on a {@link Schedulers#threadPoolForComputation()} {@link Scheduler} and return an {@code Observable<R>} with the output.
3488+
*
3489+
* @param f
3490+
* a {@link Func1} that applies Observable operators to {@code Observable<T>} in parallel and returns an {@code Observable<R>}
3491+
* @return an Observable with the output of the {@link Func1} executed on a {@link Scheduler}
3492+
*/
3493+
public <R> Observable<R> parallel(Func1<Observable<T>, Observable<R>> f) {
3494+
return OperationParallel.parallel(this, f);
3495+
}
3496+
3497+
/**
3498+
* Perform work in parallel by sharding an {@code Observable<T>} on a {@link Scheduler} and return an {@code Observable<R>} with the output.
3499+
*
3500+
* @param f
3501+
* a {@link Func1} that applies Observable operators to {@code Observable<T>} in parallel and returns an {@code Observable<R>}
3502+
* @param s
3503+
* a {@link Scheduler} to perform the work on.
3504+
* @return an Observable with the output of the {@link Func1} executed on a {@link Scheduler}
3505+
*/
3506+
3507+
public <R> Observable<R> parallel(final Func1<Observable<T>, Observable<R>> f, final Scheduler s) {
3508+
return OperationParallel.parallel(this, f, s);
3509+
}
3510+
34873511
/**
34883512
* Returns a {@link ConnectableObservable}, which waits until its {@link ConnectableObservable#connect connect} method is called before it begins emitting
34893513
* items to those {@link Observer}s that have subscribed to it.

rxjava-core/src/main/java/rx/Scheduler.java

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -212,12 +212,23 @@ public Subscription call(Scheduler scheduler, Void state) {
212212
}
213213

214214
/**
215-
* Returns the scheduler's notion of current absolute time in milliseconds.
215+
* @return the scheduler's notion of current absolute time in milliseconds.
216216
*/
217217
public long now() {
218218
return System.currentTimeMillis();
219219
}
220220

221+
/**
222+
* Parallelism available to a Scheduler.
223+
* <p>
224+
* This defaults to {@code Runtime.getRuntime().availableProcessors()} but can be overridden for use cases such as scheduling work on a computer cluster.
225+
*
226+
* @return the scheduler's available degree of parallelism.
227+
*/
228+
public int degreeOfParallelism() {
229+
return Runtime.getRuntime().availableProcessors();
230+
}
231+
221232
public static class UnitTest {
222233
@SuppressWarnings("unchecked") // mocking is unchecked, unfortunately
223234
@Test
Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,99 @@
1+
/**
2+
* Copyright 2013 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package rx.operators;
17+
18+
import static org.junit.Assert.*;
19+
20+
import java.util.concurrent.atomic.AtomicInteger;
21+
22+
import org.junit.Test;
23+
24+
import rx.Observable;
25+
import rx.Scheduler;
26+
import rx.concurrency.Schedulers;
27+
import rx.observables.GroupedObservable;
28+
import rx.util.functions.Action1;
29+
import rx.util.functions.Func0;
30+
import rx.util.functions.Func1;
31+
32+
/**
33+
* Identifies unit of work that can be executed in parallel on a given Scheduler.
34+
*/
35+
public final class OperationParallel<T> {
36+
37+
public static <T, R> Observable<R> parallel(Observable<T> source, Func1<Observable<T>, Observable<R>> f) {
38+
return parallel(source, f, Schedulers.threadPoolForComputation());
39+
}
40+
41+
public static <T, R> Observable<R> parallel(final Observable<T> source, final Func1<Observable<T>, Observable<R>> f, final Scheduler s) {
42+
return Observable.defer(new Func0<Observable<R>>() {
43+
44+
@Override
45+
public Observable<R> call() {
46+
final AtomicInteger i = new AtomicInteger(0);
47+
return source.groupBy(new Func1<T, Integer>() {
48+
49+
@Override
50+
public Integer call(T t) {
51+
return i.incrementAndGet() % s.degreeOfParallelism();
52+
}
53+
54+
}).flatMap(new Func1<GroupedObservable<Integer, T>, Observable<R>>() {
55+
56+
@Override
57+
public Observable<R> call(GroupedObservable<Integer, T> group) {
58+
return f.call(group.observeOn(s));
59+
}
60+
}).synchronize();
61+
}
62+
});
63+
}
64+
65+
public static class UnitTest {
66+
67+
@Test
68+
public void testParallel() {
69+
int NUM = 1000;
70+
final AtomicInteger count = new AtomicInteger();
71+
Observable.range(1, NUM).parallel(
72+
new Func1<Observable<Integer>, Observable<Integer[]>>() {
73+
74+
@Override
75+
public Observable<Integer[]> call(Observable<Integer> o) {
76+
return o.map(new Func1<Integer, Integer[]>() {
77+
78+
@Override
79+
public Integer[] call(Integer t) {
80+
return new Integer[] { t, t * 99 };
81+
}
82+
83+
});
84+
}
85+
}).toBlockingObservable().forEach(new Action1<Integer[]>() {
86+
87+
@Override
88+
public void call(Integer[] v) {
89+
count.incrementAndGet();
90+
System.out.println("V: " + v[0] + " R: " + v[1] + " Thread: " + Thread.currentThread());
91+
}
92+
93+
});
94+
95+
// just making sure we finish and get the number we expect
96+
assertEquals(NUM, count.get());
97+
}
98+
}
99+
}

0 commit comments

Comments
 (0)