Skip to content

Commit fb8d9b4

Browse files
mpkorstanjemarcphilipp
authored andcommitted
Use ConcurrentSkipListSet with absolute ordering to back work queue
1 parent d46cc94 commit fb8d9b4

File tree

1 file changed

+25
-34
lines changed

1 file changed

+25
-34
lines changed

junit-platform-engine/src/main/java/org/junit/platform/engine/support/hierarchical/ConcurrentHierarchicalTestExecutorService.java

Lines changed: 25 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -23,14 +23,15 @@
2323
import java.util.ArrayList;
2424
import java.util.Collection;
2525
import java.util.EnumMap;
26+
import java.util.Iterator;
2627
import java.util.List;
2728
import java.util.Map;
28-
import java.util.Queue;
29+
import java.util.Set;
2930
import java.util.concurrent.Callable;
3031
import java.util.concurrent.CompletableFuture;
32+
import java.util.concurrent.ConcurrentSkipListSet;
3133
import java.util.concurrent.ExecutorService;
3234
import java.util.concurrent.Future;
33-
import java.util.concurrent.PriorityBlockingQueue;
3435
import java.util.concurrent.RejectedExecutionException;
3536
import java.util.concurrent.RejectedExecutionHandler;
3637
import java.util.concurrent.Semaphore;
@@ -225,17 +226,16 @@ void processQueueEntries(WorkerLease workerLease, BooleanSupplier doneCondition)
225226
LOGGER.trace(() -> "yielding resource lock");
226227
break;
227228
}
228-
var queueEntries = workQueue.peekAll();
229-
if (queueEntries.isEmpty()) {
229+
if (workQueue.isEmpty()) {
230230
LOGGER.trace(() -> "no queue entries available");
231231
break;
232232
}
233-
processQueueEntries(queueEntries);
233+
processQueueEntries();
234234
}
235235
}
236236

237-
private void processQueueEntries(List<WorkQueue.Entry> queueEntries) {
238-
var queueEntriesByResult = tryToStealWorkWithoutBlocking(queueEntries);
237+
private void processQueueEntries() {
238+
var queueEntriesByResult = tryToStealWorkWithoutBlocking(workQueue);
239239
var queueModified = queueEntriesByResult.containsKey(WorkStealResult.EXECUTED_BY_THIS_WORKER) //
240240
|| queueEntriesByResult.containsKey(WorkStealResult.EXECUTED_BY_DIFFERENT_WORKER);
241241
if (queueModified) {
@@ -288,7 +288,6 @@ void invokeAll(List<? extends TestTask> testTasks) {
288288
private List<WorkQueue.Entry> forkConcurrentChildren(List<? extends TestTask> children,
289289
Consumer<TestTask> isolatedTaskCollector, List<TestTask> sameThreadTasks) {
290290

291-
int index = 0;
292291
List<WorkQueue.Entry> queueEntries = new ArrayList<>(children.size());
293292
for (TestTask child : children) {
294293
if (requiresGlobalReadWriteLock(child)) {
@@ -298,7 +297,7 @@ else if (child.getExecutionMode() == SAME_THREAD) {
298297
sameThreadTasks.add(child);
299298
}
300299
else {
301-
queueEntries.add(WorkQueue.Entry.createWithIndex(child, index++));
300+
queueEntries.add(workQueue.createEntry(child));
302301
}
303302
}
304303

@@ -316,12 +315,10 @@ else if (child.getExecutionMode() == SAME_THREAD) {
316315
}
317316

318317
private Map<WorkStealResult, List<WorkQueue.Entry>> tryToStealWorkWithoutBlocking(
319-
List<WorkQueue.Entry> queueEntries) {
318+
Iterable<WorkQueue.Entry> queueEntries) {
320319

321320
Map<WorkStealResult, List<WorkQueue.Entry>> queueEntriesByResult = new EnumMap<>(WorkStealResult.class);
322-
if (!queueEntries.isEmpty()) {
323-
tryToStealWork(queueEntries, BlockingMode.NON_BLOCKING, queueEntriesByResult);
324-
}
321+
tryToStealWork(queueEntries, BlockingMode.NON_BLOCKING, queueEntriesByResult);
325322
return queueEntriesByResult;
326323
}
327324

@@ -333,7 +330,7 @@ private void tryToStealWorkWithBlocking(Map<WorkStealResult, List<WorkQueue.Entr
333330
tryToStealWork(entriesRequiringResourceLocks, BlockingMode.BLOCKING, queueEntriesByResult);
334331
}
335332

336-
private void tryToStealWork(List<WorkQueue.Entry> entries, BlockingMode blocking,
333+
private void tryToStealWork(Iterable<WorkQueue.Entry> entries, BlockingMode blocking,
337334
Map<WorkStealResult, List<WorkQueue.Entry>> queueEntriesByResult) {
338335
for (var entry : entries) {
339336
var state = tryToStealWork(entry, blocking);
@@ -539,16 +536,21 @@ private enum BlockingMode {
539536
NON_BLOCKING, BLOCKING
540537
}
541538

542-
private static class WorkQueue {
543-
544-
private final Queue<Entry> queue = new PriorityBlockingQueue<>();
539+
private static class WorkQueue implements Iterable<WorkQueue.Entry> {
540+
private final AtomicInteger index = new AtomicInteger();
541+
private final Set<Entry> queue = new ConcurrentSkipListSet<>();
545542

546543
Entry add(TestTask task) {
547-
Entry entry = Entry.create(task);
544+
Entry entry = createEntry(task);
548545
LOGGER.trace(() -> "forking: " + entry.task);
549546
return doAdd(entry);
550547
}
551548

549+
Entry createEntry(TestTask task) {
550+
int level = task.getTestDescriptor().getUniqueId().getSegments().size();
551+
return new Entry(task, new CompletableFuture<>(), level, index.getAndIncrement());
552+
}
553+
552554
void addAll(Collection<Entry> entries) {
553555
entries.forEach(this::doAdd);
554556
}
@@ -566,13 +568,6 @@ private Entry doAdd(Entry entry) {
566568
return entry;
567569
}
568570

569-
private List<WorkQueue.Entry> peekAll() {
570-
List<Entry> entries = new ArrayList<>(queue);
571-
// Iteration order isn't the same as queue order.
572-
entries.sort(naturalOrder());
573-
return entries;
574-
}
575-
576571
boolean remove(Entry entry) {
577572
return queue.remove(entry);
578573
}
@@ -581,18 +576,14 @@ boolean isEmpty() {
581576
return queue.isEmpty();
582577
}
583578

579+
@Override
580+
public Iterator<Entry> iterator() {
581+
return queue.iterator();
582+
}
583+
584584
private record Entry(TestTask task, CompletableFuture<@Nullable Void> future, int level, int index)
585585
implements Comparable<Entry> {
586586

587-
static Entry create(TestTask task) {
588-
return createWithIndex(task, 0);
589-
}
590-
591-
static Entry createWithIndex(TestTask task, int index) {
592-
int level = task.getTestDescriptor().getUniqueId().getSegments().size();
593-
return new Entry(task, new CompletableFuture<>(), level, index);
594-
}
595-
596587
@SuppressWarnings("FutureReturnValueIgnored")
597588
Entry {
598589
future.whenComplete((__, t) -> {

0 commit comments

Comments
 (0)