Skip to content

Commit 9b48074

Browse files
committed
Extract RejectedExecutionHandler to avoid control flow by exception
1 parent aa35129 commit 9b48074

File tree

1 file changed

+45
-21
lines changed

1 file changed

+45
-21
lines changed

junit-platform-engine/src/main/java/org/junit/platform/engine/support/hierarchical/ConcurrentHierarchicalTestExecutorService.java

Lines changed: 45 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import java.util.concurrent.Future;
3333
import java.util.concurrent.PriorityBlockingQueue;
3434
import java.util.concurrent.RejectedExecutionException;
35+
import java.util.concurrent.RejectedExecutionHandler;
3536
import java.util.concurrent.Semaphore;
3637
import java.util.concurrent.SynchronousQueue;
3738
import java.util.concurrent.ThreadFactory;
@@ -47,6 +48,7 @@
4748
import org.junit.platform.commons.logging.LoggerFactory;
4849
import org.junit.platform.commons.util.ClassLoaderUtils;
4950
import org.junit.platform.commons.util.Preconditions;
51+
import org.junit.platform.commons.util.ToStringBuilder;
5052
import org.junit.platform.engine.ConfigurationParameters;
5153

5254
/**
@@ -72,10 +74,12 @@ public ConcurrentHierarchicalTestExecutorService(ParallelExecutionConfiguration
7274

7375
ConcurrentHierarchicalTestExecutorService(ParallelExecutionConfiguration configuration, ClassLoader classLoader) {
7476
ThreadFactory threadFactory = new WorkerThreadFactory(classLoader);
75-
threadPool = new ThreadPoolExecutor(configuration.getCorePoolSize(), configuration.getMaxPoolSize(),
76-
configuration.getKeepAliveSeconds(), SECONDS, new SynchronousQueue<>(), threadFactory);
7777
parallelism = configuration.getParallelism();
7878
workerLeaseManager = new WorkerLeaseManager(parallelism, this::maybeStartWorker);
79+
var rejectedExecutionHandler = new LeaseAwareRejectedExecutionHandler(workerLeaseManager);
80+
threadPool = new ThreadPoolExecutor(configuration.getCorePoolSize(), configuration.getMaxPoolSize(),
81+
configuration.getKeepAliveSeconds(), SECONDS, new SynchronousQueue<>(), threadFactory,
82+
rejectedExecutionHandler);
7983
LOGGER.trace(() -> "initialized thread pool for parallelism of " + configuration.getParallelism());
8084
}
8185

@@ -142,26 +146,25 @@ private void maybeStartWorker(BooleanSupplier doneCondition) {
142146
if (workerLease == null) {
143147
return;
144148
}
145-
try {
146-
threadPool.execute(() -> {
147-
LOGGER.trace(() -> "starting worker");
148-
try {
149-
WorkerThread.getOrThrow().processQueueEntries(workerLease, doneCondition);
150-
}
151-
finally {
152-
workerLease.release(false);
153-
LOGGER.trace(() -> "stopping worker");
154-
}
155-
maybeStartWorker(doneCondition);
156-
});
157-
}
158-
catch (RejectedExecutionException e) {
159-
workerLease.release(false);
160-
if (threadPool.isShutdown() || workerLeaseManager.isAtLeastOneLeaseTaken()) {
161-
return;
149+
threadPool.execute(new RunLeaseAwareWorker(workerLease,
150+
() -> WorkerThread.getOrThrow().processQueueEntries(workerLease, doneCondition),
151+
() -> this.maybeStartWorker(doneCondition)));
152+
}
153+
154+
private record RunLeaseAwareWorker(WorkerLease workerLease, Runnable worker, Runnable onWorkerFinished)
155+
implements Runnable {
156+
157+
@Override
158+
public void run() {
159+
LOGGER.trace(() -> "starting worker");
160+
try {
161+
worker.run();
162162
}
163-
LOGGER.error(e, () -> "failed to submit worker to thread pool");
164-
throw e;
163+
finally {
164+
workerLease.release(false);
165+
LOGGER.trace(() -> "stopping worker");
166+
}
167+
onWorkerFinished.run();
165168
}
166169
}
167170

@@ -671,6 +674,12 @@ void reacquire() throws InterruptedException {
671674
LOGGER.trace(() -> "reacquired worker lease (available: %d)".formatted(semaphore.availablePermits()));
672675
}
673676
}
677+
678+
@Override
679+
public String toString() {
680+
return new ToStringBuilder(this).append("parallelism", parallelism).append("semaphore",
681+
semaphore).toString();
682+
}
674683
}
675684

676685
static class WorkerLease implements AutoCloseable {
@@ -707,4 +716,19 @@ void reacquire() throws InterruptedException {
707716
reacquisitionToken = null;
708717
}
709718
}
719+
720+
private record LeaseAwareRejectedExecutionHandler(WorkerLeaseManager workerLeaseManager)
721+
implements RejectedExecutionHandler {
722+
@Override
723+
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
724+
if (!(r instanceof RunLeaseAwareWorker worker)) {
725+
return;
726+
}
727+
worker.workerLease.release(false);
728+
if (executor.isShutdown() || workerLeaseManager.isAtLeastOneLeaseTaken()) {
729+
return;
730+
}
731+
throw new RejectedExecutionException("Task with " + workerLeaseManager + " rejected from " + executor);
732+
}
733+
}
710734
}

0 commit comments

Comments
 (0)