Skip to content

Commit 52dbaef

Browse files
marcphilippleonard84
authored andcommitted
Only steal queue entries for siblings of dynamic children
Co-authored-by: Leonard Brünings <[email protected]>
1 parent cc532f9 commit 52dbaef

File tree

2 files changed

+154
-10
lines changed

2 files changed

+154
-10
lines changed

junit-platform-engine/src/main/java/org/junit/platform/engine/support/hierarchical/ConcurrentHierarchicalTestExecutorService.java

Lines changed: 46 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,7 @@ public void close() {
108108
}
109109

110110
var entry = enqueue(testTask);
111-
workerThread.addDynamicChild(entry);
111+
workerThread.trackSubmittedChild(entry);
112112
return new WorkStealingFuture(entry);
113113
}
114114

@@ -201,7 +201,7 @@ private class WorkerThread extends Thread {
201201
@Nullable
202202
WorkerLease workerLease;
203203

204-
private Deque<WorkQueue.Entry> dynamicChildren = new ArrayDeque<>();
204+
private final Deque<State> stateStack = new ArrayDeque<>();
205205

206206
WorkerThread(Runnable runnable, String name) {
207207
super(runnable, name);
@@ -480,10 +480,12 @@ private boolean tryExecuteTask(TestTask testTask) {
480480

481481
private void doExecute(TestTask testTask) {
482482
LOGGER.trace(() -> "executing: " + testTask);
483+
stateStack.push(new State());
483484
try {
484485
testTask.execute();
485486
}
486487
finally {
488+
stateStack.pop();
487489
LOGGER.trace(() -> "finished executing: " + testTask);
488490
}
489491
}
@@ -496,18 +498,52 @@ private static CompletableFuture<?> toCombinedFuture(List<WorkQueue.Entry> entri
496498
return CompletableFuture.allOf(futures);
497499
}
498500

499-
private void addDynamicChild(WorkQueue.Entry entry) {
500-
dynamicChildren.add(entry);
501+
private void trackSubmittedChild(WorkQueue.Entry entry) {
502+
stateStack.element().trackSubmittedChild(entry);
501503
}
502504

503-
private void tryToStealWorkFromDynamicChildren() {
504-
for (var entry : dynamicChildren) {
505-
tryToStealWork(entry, BlockingMode.NON_BLOCKING);
505+
private void tryToStealWorkFromSubmittedChildren() {
506+
var currentState = stateStack.element();
507+
var currentSubmittedChildren = currentState.submittedChildren;
508+
if (currentSubmittedChildren == null || currentSubmittedChildren.isEmpty()) {
509+
return;
510+
}
511+
var iterator = currentSubmittedChildren.listIterator(currentSubmittedChildren.size());
512+
while (iterator.hasPrevious()) {
513+
WorkQueue.Entry entry = iterator.previous();
514+
var result = tryToStealWork(entry, BlockingMode.NON_BLOCKING);
515+
if (result.isExecuted()) {
516+
iterator.remove();
517+
}
518+
}
519+
currentState.clearIfEmpty();
520+
}
521+
522+
private static class State {
523+
524+
@Nullable
525+
private List<WorkQueue.Entry> submittedChildren;
526+
527+
private void trackSubmittedChild(WorkQueue.Entry entry) {
528+
if (submittedChildren == null) {
529+
submittedChildren = new ArrayList<>();
530+
}
531+
submittedChildren.add(entry);
532+
}
533+
534+
private void clearIfEmpty() {
535+
if (submittedChildren != null && submittedChildren.isEmpty()) {
536+
submittedChildren = null;
537+
}
506538
}
507539
}
508540

509541
private enum WorkStealResult {
510-
EXECUTED_BY_DIFFERENT_WORKER, RESOURCE_LOCK_UNAVAILABLE, EXECUTED_BY_THIS_WORKER
542+
EXECUTED_BY_DIFFERENT_WORKER, RESOURCE_LOCK_UNAVAILABLE, EXECUTED_BY_THIS_WORKER;
543+
544+
private boolean isExecuted() {
545+
return this != RESOURCE_LOCK_UNAVAILABLE;
546+
}
511547
}
512548

513549
private interface BlockingAction<T> {
@@ -535,11 +571,11 @@ private static class WorkStealingFuture extends BlockingAwareFuture<@Nullable Vo
535571
if (entry.future.isDone()) {
536572
return callable.call();
537573
}
538-
workerThread.tryToStealWorkFromDynamicChildren();
574+
workerThread.tryToStealWorkFromSubmittedChildren();
539575
if (entry.future.isDone()) {
540576
return callable.call();
541577
}
542-
LOGGER.trace(() -> "blocking for child task");
578+
LOGGER.trace(() -> "blocking for child task: " + entry.task);
543579
return workerThread.runBlocking(entry.future::isDone, () -> {
544580
try {
545581
return callable.call();

platform-tests/src/test/java/org/junit/platform/engine/support/hierarchical/ConcurrentHierarchicalTestExecutorServiceTests.java

Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -555,6 +555,114 @@ void stealsDynamicChildren() throws Exception {
555555
assertThat(child2.executionThread).isEqualTo(root.executionThread).isNotEqualTo(child1.executionThread);
556556
}
557557

558+
@RepeatedTest(value = 100, failureThreshold = 1)
559+
void stealsNestedDynamicChildren() throws Exception {
560+
service = new ConcurrentHierarchicalTestExecutorService(configuration(2, 2));
561+
562+
var barrier = new CyclicBarrier(2);
563+
564+
var leaf1a = new TestTaskStub(ExecutionMode.CONCURRENT) //
565+
.withName("leaf1a").withLevel(3);
566+
var leaf1b = new TestTaskStub(ExecutionMode.CONCURRENT) //
567+
.withName("leaf1b").withLevel(3);
568+
569+
var child1 = new TestTaskStub(ExecutionMode.CONCURRENT, () -> {
570+
barrier.await();
571+
var futureA = requiredService().submit(leaf1a);
572+
barrier.await();
573+
var futureB = requiredService().submit(leaf1b);
574+
futureA.get();
575+
futureB.get();
576+
barrier.await();
577+
}) //
578+
.withName("child1").withLevel(2);
579+
580+
var leaf2a = new TestTaskStub(ExecutionMode.CONCURRENT) //
581+
.withName("leaf2a").withLevel(3);
582+
var leaf2b = new TestTaskStub(ExecutionMode.CONCURRENT) //
583+
.withName("leaf2b").withLevel(3);
584+
585+
var child2 = new TestTaskStub(ExecutionMode.CONCURRENT, () -> {
586+
barrier.await();
587+
var futureA = requiredService().submit(leaf2a);
588+
barrier.await();
589+
var futureB = requiredService().submit(leaf2b);
590+
futureB.get();
591+
futureA.get();
592+
barrier.await();
593+
}) //
594+
.withName("child2").withLevel(2);
595+
596+
var root = new TestTaskStub(ExecutionMode.SAME_THREAD, () -> {
597+
var future1 = requiredService().submit(child1);
598+
var future2 = requiredService().submit(child2);
599+
future1.get();
600+
future2.get();
601+
}) //
602+
.withName("root").withLevel(1);
603+
604+
service.submit(root).get();
605+
606+
assertThat(Stream.of(root, child1, child2, leaf1a, leaf1b, leaf2a, leaf2b)) //
607+
.allSatisfy(TestTaskStub::assertExecutedSuccessfully);
608+
assertThat(child2.executionThread).isNotEqualTo(child1.executionThread);
609+
assertThat(child1.executionThread).isEqualTo(leaf1a.executionThread).isEqualTo(leaf1b.executionThread);
610+
assertThat(child2.executionThread).isEqualTo(leaf2a.executionThread).isEqualTo(leaf2b.executionThread);
611+
}
612+
613+
@RepeatedTest(value = 100, failureThreshold = 1)
614+
void stealsSiblingDynamicChildrenOnly() throws Exception {
615+
service = new ConcurrentHierarchicalTestExecutorService(configuration(2, 3));
616+
617+
var child1Started = new CountDownLatch(1);
618+
var child2Started = new CountDownLatch(1);
619+
var leaf1ASubmitted = new CountDownLatch(1);
620+
var leaf1AStarted = new CountDownLatch(1);
621+
622+
var leaf1a = new TestTaskStub(ExecutionMode.CONCURRENT, () -> {
623+
leaf1AStarted.countDown();
624+
child2Started.await();
625+
}) //
626+
.withName("leaf1a").withLevel(3);
627+
628+
var child1 = new TestTaskStub(ExecutionMode.CONCURRENT, () -> {
629+
child1Started.countDown();
630+
leaf1ASubmitted.await();
631+
}) //
632+
.withName("child1").withLevel(2);
633+
634+
var child2 = new TestTaskStub(ExecutionMode.CONCURRENT, child2Started::countDown) //
635+
.withName("child2").withLevel(2);
636+
637+
var child3 = new TestTaskStub(ExecutionMode.CONCURRENT, () -> {
638+
var futureA = requiredService().submit(leaf1a);
639+
leaf1ASubmitted.countDown();
640+
leaf1AStarted.await();
641+
futureA.get();
642+
}) //
643+
.withName("child3").withLevel(2);
644+
645+
var root = new TestTaskStub(ExecutionMode.SAME_THREAD, () -> {
646+
var future1 = requiredService().submit(child1);
647+
child1Started.await();
648+
var future2 = requiredService().submit(child2);
649+
var future3 = requiredService().submit(child3);
650+
future1.get();
651+
future2.get();
652+
future3.get();
653+
}) //
654+
.withName("root").withLevel(1);
655+
656+
service.submit(root).get();
657+
658+
assertThat(Stream.of(root, child1, child2, child3, leaf1a)) //
659+
.allSatisfy(TestTaskStub::assertExecutedSuccessfully);
660+
661+
assertThat(child3.executionThread).isNotEqualTo(child1.executionThread).isNotEqualTo(child2.executionThread);
662+
assertThat(child1.executionThread).isNotEqualTo(child2.executionThread);
663+
assertThat(child1.executionThread).isEqualTo(leaf1a.executionThread);
664+
}
665+
558666
private static ExclusiveResource exclusiveResource(LockMode lockMode) {
559667
return new ExclusiveResource("key", lockMode);
560668
}

0 commit comments

Comments
 (0)