Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Stream;

import org.jspecify.annotations.NullMarked;
import org.jspecify.annotations.Nullable;
Expand Down Expand Up @@ -275,6 +276,45 @@ void prioritizesTestsOverContainers() throws Exception {
assertThat(child2.startTime).isBeforeOrEqualTo(child1.startTime);
}

@Test
void limitsWorkerThreadsToMaxPoolSize() throws Exception {
service = new ConcurrentHierarchicalTestExecutorService(configuration(3, 3));

CountDownLatch latch = new CountDownLatch(3);
Executable behavior = () -> {
latch.countDown();
latch.await();
};
var leaf1a = new TestTaskStub(ExecutionMode.CONCURRENT, behavior) //
.withName("leaf1a").withLevel(3);
var leaf1b = new TestTaskStub(ExecutionMode.CONCURRENT, behavior) //
.withName("leaf1b").withLevel(3);
var leaf2a = new TestTaskStub(ExecutionMode.CONCURRENT, behavior) //
.withName("leaf2a").withLevel(3);
var leaf2b = new TestTaskStub(ExecutionMode.CONCURRENT, behavior) //
.withName("leaf2b").withLevel(3);

// When executed, there are 2 worker threads active and 1 available.
// Both invokeAlls race each other trying to start 1 more.
var child1 = new TestTaskStub(ExecutionMode.CONCURRENT,
() -> requiredService().invokeAll(List.of(leaf1a, leaf1b))) //
.withName("child1").withLevel(2);
var child2 = new TestTaskStub(ExecutionMode.CONCURRENT,
() -> requiredService().invokeAll(List.of(leaf2a, leaf2b))) //
.withName("child2").withLevel(2);

var root = new TestTaskStub(ExecutionMode.SAME_THREAD,
() -> requiredService().invokeAll(List.of(child1, child2))) //
.withName("root").withLevel(1);

service.submit(root).get();

assertThat(List.of(root, child1, child2, leaf1a, leaf1b, leaf2a, leaf2b)) //
.allSatisfy(TestTaskStub::assertExecutedSuccessfully);
assertThat(Stream.of(leaf1a, leaf1b, leaf2a, leaf2b).map(TestTaskStub::executionThread).distinct()) //
.hasSize(3);
}

private static ExclusiveResource exclusiveResource() {
return new ExclusiveResource("key", ExclusiveResource.LockMode.READ_WRITE);
}
Expand All @@ -284,7 +324,11 @@ private ConcurrentHierarchicalTestExecutorService requiredService() {
}

private static ParallelExecutionConfiguration configuration(int parallelism) {
return new DefaultParallelExecutionConfiguration(parallelism, parallelism, 256 + parallelism, parallelism, 0,
return configuration(parallelism, 256 + parallelism);
}

private static ParallelExecutionConfiguration configuration(int parallelism, int maxPoolSize) {
return new DefaultParallelExecutionConfiguration(parallelism, parallelism, maxPoolSize, parallelism, 0,
__ -> true);
}

Expand Down