Skip to content

Commit a545da0

Browse files
committed
Move invokeAll() code to WorkerThread`
1 parent 1055dbb commit a545da0

File tree

1 file changed

+140
-134
lines changed

1 file changed

+140
-134
lines changed

junit-platform-engine/src/main/java/org/junit/platform/engine/support/hierarchical/ConcurrentHierarchicalTestExecutorService.java

Lines changed: 140 additions & 134 deletions
Original file line numberDiff line numberDiff line change
@@ -101,44 +101,7 @@ public void invokeAll(List<? extends TestTask> testTasks) {
101101
Preconditions.condition(workerThread != null && workerThread.executor() == this,
102102
"invokeAll() must be called from a worker thread that belongs to this executor");
103103

104-
if (testTasks.isEmpty()) {
105-
return;
106-
}
107-
108-
if (testTasks.size() == 1) {
109-
executeTask(testTasks.get(0));
110-
return;
111-
}
112-
113-
List<TestTask> isolatedTasks = new ArrayList<>(testTasks.size());
114-
List<TestTask> sameThreadTasks = new ArrayList<>(testTasks.size());
115-
var queueEntries = forkConcurrentChildren(testTasks, isolatedTasks::add, sameThreadTasks::add);
116-
executeAll(sameThreadTasks);
117-
var remainingForkedChildren = stealWork(queueEntries);
118-
waitFor(remainingForkedChildren);
119-
executeAll(isolatedTasks);
120-
}
121-
122-
private static void waitFor(List<WorkQueue.Entry> children) {
123-
if (children.isEmpty()) {
124-
return;
125-
}
126-
var future = toCombinedFuture(children);
127-
try {
128-
if (future.isDone()) {
129-
// no need to release worker lease
130-
future.join();
131-
}
132-
else {
133-
WorkerThread.getOrThrow().runBlocking(() -> {
134-
LOGGER.trace(() -> "blocking for forked children: " + children);
135-
return future.join();
136-
});
137-
}
138-
}
139-
catch (InterruptedException e) {
140-
Thread.currentThread().interrupt();
141-
}
104+
workerThread.invokeAll(testTasks);
142105
}
143106

144107
private WorkQueue.Entry enqueue(TestTask testTask) {
@@ -170,101 +133,6 @@ private void maybeStartWorker() {
170133
}
171134
}
172135

173-
private static CompletableFuture<?> toCombinedFuture(List<WorkQueue.Entry> entries) {
174-
if (entries.size() == 1) {
175-
return entries.get(0).future();
176-
}
177-
var futures = entries.stream().map(WorkQueue.Entry::future).toArray(CompletableFuture<?>[]::new);
178-
return CompletableFuture.allOf(futures);
179-
}
180-
181-
private List<WorkQueue.Entry> stealWork(Collection<WorkQueue.Entry> queueEntries) {
182-
if (queueEntries.isEmpty()) {
183-
return List.of();
184-
}
185-
List<WorkQueue.Entry> concurrentlyExecutingChildren = new ArrayList<>(queueEntries.size());
186-
for (var entry : queueEntries) {
187-
var claimed = workQueue.remove(entry);
188-
if (claimed) {
189-
LOGGER.trace(() -> "stole work: " + entry);
190-
var executed = tryExecute(entry);
191-
if (!executed) {
192-
workQueue.add(entry);
193-
concurrentlyExecutingChildren.add(entry);
194-
}
195-
}
196-
else {
197-
concurrentlyExecutingChildren.add(entry);
198-
}
199-
}
200-
return concurrentlyExecutingChildren;
201-
}
202-
203-
private Collection<WorkQueue.Entry> forkConcurrentChildren(List<? extends TestTask> children,
204-
Consumer<TestTask> isolatedTaskCollector, Consumer<TestTask> sameThreadTaskCollector) {
205-
206-
if (children.isEmpty()) {
207-
return List.of();
208-
}
209-
Queue<WorkQueue.Entry> queueEntries = new PriorityQueue<>(children.size(), reverseOrder());
210-
for (TestTask child : children) {
211-
if (requiresGlobalReadWriteLock(child)) {
212-
isolatedTaskCollector.accept(child);
213-
}
214-
else if (child.getExecutionMode() == SAME_THREAD) {
215-
sameThreadTaskCollector.accept(child);
216-
}
217-
else {
218-
queueEntries.add(enqueue(child));
219-
}
220-
}
221-
return queueEntries;
222-
}
223-
224-
private static boolean requiresGlobalReadWriteLock(TestTask testTask) {
225-
return testTask.getResourceLock().getResources().contains(GLOBAL_READ_WRITE);
226-
}
227-
228-
private void executeAll(List<? extends TestTask> children) {
229-
if (children.isEmpty()) {
230-
return;
231-
}
232-
LOGGER.trace(() -> "running %d SAME_THREAD children".formatted(children.size()));
233-
if (children.size() == 1) {
234-
executeTask(children.get(0));
235-
return;
236-
}
237-
for (var testTask : children) {
238-
executeTask(testTask);
239-
}
240-
}
241-
242-
private static boolean tryExecute(WorkQueue.Entry entry) {
243-
try {
244-
var executed = tryExecuteTask(entry.task);
245-
if (executed) {
246-
entry.future.complete(null);
247-
}
248-
return executed;
249-
}
250-
catch (Throwable t) {
251-
entry.future.completeExceptionally(t);
252-
return true;
253-
}
254-
}
255-
256-
private void executeEntry(WorkQueue.Entry entry) {
257-
try {
258-
executeTask(entry.task);
259-
}
260-
catch (Throwable t) {
261-
entry.future.completeExceptionally(t);
262-
}
263-
finally {
264-
entry.future.complete(null);
265-
}
266-
}
267-
268136
@SuppressWarnings("try")
269137
private void executeTask(TestTask testTask) {
270138
var executed = tryExecuteTask(testTask);
@@ -363,7 +231,7 @@ void processQueueEntries() {
363231
}
364232
LOGGER.trace(() -> "processing: " + entry.task);
365233
this.workerLease = workerLease;
366-
executeEntry(entry);
234+
execute(entry);
367235
}
368236
}
369237
}
@@ -384,6 +252,144 @@ <T> T runBlocking(BlockingAction<T> blockingAction) throws InterruptedException
384252
}
385253
}
386254

255+
void invokeAll(List<? extends TestTask> testTasks) {
256+
257+
if (testTasks.isEmpty()) {
258+
return;
259+
}
260+
261+
if (testTasks.size() == 1) {
262+
executeTask(testTasks.get(0));
263+
return;
264+
}
265+
266+
List<TestTask> isolatedTasks = new ArrayList<>(testTasks.size());
267+
List<TestTask> sameThreadTasks = new ArrayList<>(testTasks.size());
268+
var queueEntries = forkConcurrentChildren(testTasks, isolatedTasks::add, sameThreadTasks::add);
269+
executeAll(sameThreadTasks);
270+
var remainingForkedChildren = stealWork(queueEntries);
271+
waitFor(remainingForkedChildren);
272+
executeAll(isolatedTasks);
273+
}
274+
275+
private Collection<WorkQueue.Entry> forkConcurrentChildren(List<? extends TestTask> children,
276+
Consumer<TestTask> isolatedTaskCollector, Consumer<TestTask> sameThreadTaskCollector) {
277+
278+
if (children.isEmpty()) {
279+
return List.of();
280+
}
281+
282+
Queue<WorkQueue.Entry> queueEntries = new PriorityQueue<>(children.size(), reverseOrder());
283+
for (TestTask child : children) {
284+
if (requiresGlobalReadWriteLock(child)) {
285+
isolatedTaskCollector.accept(child);
286+
}
287+
else if (child.getExecutionMode() == SAME_THREAD) {
288+
sameThreadTaskCollector.accept(child);
289+
}
290+
else {
291+
queueEntries.add(enqueue(child));
292+
}
293+
}
294+
return queueEntries;
295+
}
296+
297+
private static CompletableFuture<?> toCombinedFuture(List<WorkQueue.Entry> entries) {
298+
if (entries.size() == 1) {
299+
return entries.get(0).future();
300+
}
301+
var futures = entries.stream().map(WorkQueue.Entry::future).toArray(CompletableFuture<?>[]::new);
302+
return CompletableFuture.allOf(futures);
303+
}
304+
305+
private List<WorkQueue.Entry> stealWork(Collection<WorkQueue.Entry> queueEntries) {
306+
if (queueEntries.isEmpty()) {
307+
return List.of();
308+
}
309+
List<WorkQueue.Entry> concurrentlyExecutingChildren = new ArrayList<>(queueEntries.size());
310+
for (var entry : queueEntries) {
311+
var claimed = workQueue.remove(entry);
312+
if (claimed) {
313+
LOGGER.trace(() -> "stole work: " + entry);
314+
var executed = tryExecute(entry);
315+
if (!executed) {
316+
workQueue.add(entry);
317+
concurrentlyExecutingChildren.add(entry);
318+
}
319+
}
320+
else {
321+
concurrentlyExecutingChildren.add(entry);
322+
}
323+
}
324+
return concurrentlyExecutingChildren;
325+
}
326+
327+
private void waitFor(List<WorkQueue.Entry> children) {
328+
if (children.isEmpty()) {
329+
return;
330+
}
331+
var future = toCombinedFuture(children);
332+
try {
333+
if (future.isDone()) {
334+
// no need to release worker lease
335+
future.join();
336+
}
337+
else {
338+
runBlocking(() -> {
339+
LOGGER.trace(() -> "blocking for forked children: " + children);
340+
return future.join();
341+
});
342+
}
343+
}
344+
catch (InterruptedException e) {
345+
currentThread().interrupt();
346+
}
347+
}
348+
349+
private static boolean requiresGlobalReadWriteLock(TestTask testTask) {
350+
return testTask.getResourceLock().getResources().contains(GLOBAL_READ_WRITE);
351+
}
352+
353+
private void executeAll(List<? extends TestTask> children) {
354+
if (children.isEmpty()) {
355+
return;
356+
}
357+
LOGGER.trace(() -> "running %d SAME_THREAD children".formatted(children.size()));
358+
if (children.size() == 1) {
359+
executeTask(children.get(0));
360+
return;
361+
}
362+
for (var testTask : children) {
363+
executeTask(testTask);
364+
}
365+
}
366+
367+
private static boolean tryExecute(WorkQueue.Entry entry) {
368+
try {
369+
var executed = tryExecuteTask(entry.task);
370+
if (executed) {
371+
entry.future.complete(null);
372+
}
373+
return executed;
374+
}
375+
catch (Throwable t) {
376+
entry.future.completeExceptionally(t);
377+
return true;
378+
}
379+
}
380+
381+
private void execute(WorkQueue.Entry entry) {
382+
try {
383+
executeTask(entry.task);
384+
}
385+
catch (Throwable t) {
386+
entry.future.completeExceptionally(t);
387+
}
388+
finally {
389+
entry.future.complete(null);
390+
}
391+
}
392+
387393
interface BlockingAction<T> {
388394
T run() throws InterruptedException;
389395
}

0 commit comments

Comments
 (0)