Skip to content

Commit d2fe631

Browse files
authored
2.x: API to get distinct Workers from some Schedulers (#5741)
1 parent c9af67b commit d2fe631

File tree

4 files changed

+272
-19
lines changed

4 files changed

+272
-19
lines changed

src/main/java/io/reactivex/internal/operators/parallel/ParallelRunOn.java

Lines changed: 40 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@
2222
import io.reactivex.exceptions.MissingBackpressureException;
2323
import io.reactivex.internal.fuseable.ConditionalSubscriber;
2424
import io.reactivex.internal.queue.SpscArrayQueue;
25+
import io.reactivex.internal.schedulers.SchedulerMultiWorkerSupport;
26+
import io.reactivex.internal.schedulers.SchedulerMultiWorkerSupport.WorkerCallback;
2527
import io.reactivex.internal.subscriptions.SubscriptionHelper;
2628
import io.reactivex.internal.util.BackpressureHelper;
2729
import io.reactivex.parallel.ParallelFlowable;
@@ -47,34 +49,58 @@ public ParallelRunOn(ParallelFlowable<? extends T> parent,
4749
}
4850

4951
@Override
50-
public void subscribe(Subscriber<? super T>[] subscribers) {
52+
public void subscribe(final Subscriber<? super T>[] subscribers) {
5153
if (!validate(subscribers)) {
5254
return;
5355
}
5456

5557
int n = subscribers.length;
5658

5759
@SuppressWarnings("unchecked")
58-
Subscriber<T>[] parents = new Subscriber[n];
60+
final Subscriber<T>[] parents = new Subscriber[n];
61+
62+
if (scheduler instanceof SchedulerMultiWorkerSupport) {
63+
SchedulerMultiWorkerSupport multiworker = (SchedulerMultiWorkerSupport) scheduler;
64+
multiworker.createWorkers(n, new MultiWorkerCallback(subscribers, parents));
65+
} else {
66+
for (int i = 0; i < n; i++) {
67+
createSubscriber(i, subscribers, parents, scheduler.createWorker());
68+
}
69+
}
70+
source.subscribe(parents);
71+
}
5972

60-
int prefetch = this.prefetch;
73+
void createSubscriber(int i, Subscriber<? super T>[] subscribers,
74+
Subscriber<T>[] parents, Scheduler.Worker worker) {
6175

62-
for (int i = 0; i < n; i++) {
63-
Subscriber<? super T> a = subscribers[i];
76+
Subscriber<? super T> a = subscribers[i];
6477

65-
Worker w = scheduler.createWorker();
66-
SpscArrayQueue<T> q = new SpscArrayQueue<T>(prefetch);
78+
SpscArrayQueue<T> q = new SpscArrayQueue<T>(prefetch);
6779

68-
if (a instanceof ConditionalSubscriber) {
69-
parents[i] = new RunOnConditionalSubscriber<T>((ConditionalSubscriber<? super T>)a, prefetch, q, w);
70-
} else {
71-
parents[i] = new RunOnSubscriber<T>(a, prefetch, q, w);
72-
}
80+
if (a instanceof ConditionalSubscriber) {
81+
parents[i] = new RunOnConditionalSubscriber<T>((ConditionalSubscriber<? super T>)a, prefetch, q, worker);
82+
} else {
83+
parents[i] = new RunOnSubscriber<T>(a, prefetch, q, worker);
7384
}
74-
75-
source.subscribe(parents);
7685
}
7786

87+
final class MultiWorkerCallback implements WorkerCallback {
88+
89+
final Subscriber<? super T>[] subscribers;
90+
91+
final Subscriber<T>[] parents;
92+
93+
MultiWorkerCallback(Subscriber<? super T>[] subscribers,
94+
Subscriber<T>[] parents) {
95+
this.subscribers = subscribers;
96+
this.parents = parents;
97+
}
98+
99+
@Override
100+
public void onWorker(int i, Worker w) {
101+
createSubscriber(i, subscribers, parents, w);
102+
}
103+
}
78104

79105
@Override
80106
public int parallelism() {

src/main/java/io/reactivex/internal/schedulers/ComputationScheduler.java

Lines changed: 31 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -15,19 +15,20 @@
1515
*/
1616
package io.reactivex.internal.schedulers;
1717

18+
import java.util.concurrent.*;
19+
import java.util.concurrent.atomic.AtomicReference;
20+
1821
import io.reactivex.Scheduler;
1922
import io.reactivex.annotations.NonNull;
2023
import io.reactivex.disposables.*;
2124
import io.reactivex.internal.disposables.*;
22-
23-
import java.util.concurrent.*;
24-
import java.util.concurrent.atomic.AtomicReference;
25+
import io.reactivex.internal.functions.ObjectHelper;
2526

2627
/**
2728
* Holds a fixed pool of worker threads and assigns them
2829
* to requested Scheduler.Workers in a round-robin fashion.
2930
*/
30-
public final class ComputationScheduler extends Scheduler {
31+
public final class ComputationScheduler extends Scheduler implements SchedulerMultiWorkerSupport {
3132
/** This will indicate no pool is active. */
3233
static final FixedSchedulerPool NONE;
3334
/** Manages a fixed number of workers. */
@@ -67,7 +68,7 @@ static int cap(int cpuCount, int paramThreads) {
6768
return paramThreads <= 0 || paramThreads > cpuCount ? cpuCount : paramThreads;
6869
}
6970

70-
static final class FixedSchedulerPool {
71+
static final class FixedSchedulerPool implements SchedulerMultiWorkerSupport {
7172
final int cores;
7273

7374
final PoolWorker[] eventLoops;
@@ -96,6 +97,25 @@ public void shutdown() {
9697
w.dispose();
9798
}
9899
}
100+
101+
@Override
102+
public void createWorkers(int number, WorkerCallback callback) {
103+
int c = cores;
104+
if (c == 0) {
105+
for (int i = 0; i < number; i++) {
106+
callback.onWorker(i, SHUTDOWN_WORKER);
107+
}
108+
} else {
109+
int index = (int)n % c;
110+
for (int i = 0; i < number; i++) {
111+
callback.onWorker(i, new EventLoopWorker(eventLoops[index]));
112+
if (++index == c) {
113+
index = 0;
114+
}
115+
}
116+
n = index;
117+
}
118+
}
99119
}
100120

101121
/**
@@ -125,6 +145,12 @@ public Worker createWorker() {
125145
return new EventLoopWorker(pool.get().getEventLoop());
126146
}
127147

148+
@Override
149+
public void createWorkers(int number, WorkerCallback callback) {
150+
ObjectHelper.verifyPositive(number, "number > 0 required");
151+
pool.get().createWorkers(number, callback);
152+
}
153+
128154
@NonNull
129155
@Override
130156
public Disposable scheduleDirect(@NonNull Runnable run, long delay, TimeUnit unit) {
Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
/**
2+
* Copyright (c) 2016-present, RxJava Contributors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
5+
* compliance with the License. You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is
10+
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
11+
* the License for the specific language governing permissions and limitations under the License.
12+
*/
13+
14+
package io.reactivex.internal.schedulers;
15+
16+
import io.reactivex.Scheduler;
17+
import io.reactivex.annotations.*;
18+
19+
/**
20+
* Allows retrieving multiple workers from the implementing
21+
* {@link io.reactivex.Scheduler} in a way that when asking for
22+
* at most the parallelism level of the Scheduler, those
23+
* {@link io.reactivex.Scheduler.Worker} instances will be running
24+
* with different backing threads.
25+
*
26+
* @since 2.1.7 - experimental
27+
*/
28+
@Experimental
29+
public interface SchedulerMultiWorkerSupport {
30+
31+
/**
32+
* Creates the given number of {@link io.reactivex.Scheduler.Worker} instances
33+
* that are possibly backed by distinct threads
34+
* and calls the specified {@code Consumer} with them.
35+
* @param number the number of workers to create, positive
36+
* @param callback the callback to send worker instances to
37+
*/
38+
void createWorkers(int number, @NonNull WorkerCallback callback);
39+
40+
/**
41+
* The callback interface for the {@link SchedulerMultiWorkerSupport#createWorkers(int, WorkerCallback)}
42+
* method.
43+
*/
44+
interface WorkerCallback {
45+
/**
46+
* Called with the Worker index and instance.
47+
* @param index the worker index, zero-based
48+
* @param worker the worker instance
49+
*/
50+
void onWorker(int index, @NonNull Scheduler.Worker worker);
51+
}
52+
}
Lines changed: 149 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,149 @@
1+
/**
2+
* Copyright (c) 2016-present, RxJava Contributors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
5+
* compliance with the License. You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is
10+
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
11+
* the License for the specific language governing permissions and limitations under the License.
12+
*/
13+
14+
package io.reactivex.internal.schedulers;
15+
16+
import static org.junit.Assert.*;
17+
18+
import java.util.*;
19+
import java.util.concurrent.*;
20+
21+
import org.junit.Test;
22+
23+
import io.reactivex.Scheduler.Worker;
24+
import io.reactivex.TestHelper;
25+
import io.reactivex.disposables.CompositeDisposable;
26+
import io.reactivex.internal.schedulers.SchedulerMultiWorkerSupport.WorkerCallback;
27+
import io.reactivex.schedulers.Schedulers;
28+
29+
public class SchedulerMultiWorkerSupportTest {
30+
31+
final int max = ComputationScheduler.MAX_THREADS;
32+
33+
@Test
34+
public void moreThanMaxWorkers() {
35+
final List<Worker> list = new ArrayList<Worker>();
36+
37+
SchedulerMultiWorkerSupport mws = (SchedulerMultiWorkerSupport)Schedulers.computation();
38+
39+
mws.createWorkers(max * 2, new WorkerCallback() {
40+
@Override
41+
public void onWorker(int i, Worker w) {
42+
list.add(w);
43+
}
44+
});
45+
46+
assertEquals(max * 2, list.size());
47+
}
48+
49+
@Test
50+
public void getShutdownWorkers() {
51+
final List<Worker> list = new ArrayList<Worker>();
52+
53+
ComputationScheduler.NONE.createWorkers(max * 2, new WorkerCallback() {
54+
@Override
55+
public void onWorker(int i, Worker w) {
56+
list.add(w);
57+
}
58+
});
59+
60+
assertEquals(max * 2, list.size());
61+
62+
for (Worker w : list) {
63+
assertEquals(ComputationScheduler.SHUTDOWN_WORKER, w);
64+
}
65+
}
66+
67+
@Test
68+
public void distinctThreads() throws Exception {
69+
for (int i = 0; i < 1000; i++) {
70+
71+
final CompositeDisposable composite = new CompositeDisposable();
72+
73+
try {
74+
final CountDownLatch cdl = new CountDownLatch(max * 2);
75+
76+
final Set<String> threads1 = Collections.synchronizedSet(new HashSet<String>());
77+
78+
final Set<String> threads2 = Collections.synchronizedSet(new HashSet<String>());
79+
80+
Runnable parallel1 = new Runnable() {
81+
@Override
82+
public void run() {
83+
final List<Worker> list1 = new ArrayList<Worker>();
84+
85+
SchedulerMultiWorkerSupport mws = (SchedulerMultiWorkerSupport)Schedulers.computation();
86+
87+
mws.createWorkers(max, new WorkerCallback() {
88+
@Override
89+
public void onWorker(int i, Worker w) {
90+
list1.add(w);
91+
composite.add(w);
92+
}
93+
});
94+
95+
Runnable run = new Runnable() {
96+
@Override
97+
public void run() {
98+
threads1.add(Thread.currentThread().getName());
99+
cdl.countDown();
100+
}
101+
};
102+
103+
for (Worker w : list1) {
104+
w.schedule(run);
105+
}
106+
}
107+
};
108+
109+
Runnable parallel2 = new Runnable() {
110+
@Override
111+
public void run() {
112+
final List<Worker> list2 = new ArrayList<Worker>();
113+
114+
SchedulerMultiWorkerSupport mws = (SchedulerMultiWorkerSupport)Schedulers.computation();
115+
116+
mws.createWorkers(max, new WorkerCallback() {
117+
@Override
118+
public void onWorker(int i, Worker w) {
119+
list2.add(w);
120+
composite.add(w);
121+
}
122+
});
123+
124+
Runnable run = new Runnable() {
125+
@Override
126+
public void run() {
127+
threads2.add(Thread.currentThread().getName());
128+
cdl.countDown();
129+
}
130+
};
131+
132+
for (Worker w : list2) {
133+
w.schedule(run);
134+
}
135+
}
136+
};
137+
138+
TestHelper.race(parallel1, parallel2);
139+
140+
assertTrue(cdl.await(5, TimeUnit.SECONDS));
141+
142+
assertEquals(threads1.toString(), max, threads1.size());
143+
assertEquals(threads2.toString(), max, threads2.size());
144+
} finally {
145+
composite.dispose();
146+
}
147+
}
148+
}
149+
}

0 commit comments

Comments
 (0)