Skip to content

Commit d46cc94

Browse files
mpkorstanjemarcphilipp
authored andcommitted
Extract RejectedExecutionHandler to avoid control flow by exception
1 parent 79f9eb2 commit d46cc94

File tree

1 file changed

+45
-21
lines changed

1 file changed

+45
-21
lines changed

junit-platform-engine/src/main/java/org/junit/platform/engine/support/hierarchical/ConcurrentHierarchicalTestExecutorService.java

Lines changed: 45 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import java.util.concurrent.Future;
3333
import java.util.concurrent.PriorityBlockingQueue;
3434
import java.util.concurrent.RejectedExecutionException;
35+
import java.util.concurrent.RejectedExecutionHandler;
3536
import java.util.concurrent.Semaphore;
3637
import java.util.concurrent.SynchronousQueue;
3738
import java.util.concurrent.ThreadFactory;
@@ -47,6 +48,7 @@
4748
import org.junit.platform.commons.logging.LoggerFactory;
4849
import org.junit.platform.commons.util.ClassLoaderUtils;
4950
import org.junit.platform.commons.util.Preconditions;
51+
import org.junit.platform.commons.util.ToStringBuilder;
5052
import org.junit.platform.engine.ConfigurationParameters;
5153

5254
/**
@@ -72,10 +74,12 @@ public ConcurrentHierarchicalTestExecutorService(ParallelExecutionConfiguration
7274

7375
ConcurrentHierarchicalTestExecutorService(ParallelExecutionConfiguration configuration, ClassLoader classLoader) {
7476
ThreadFactory threadFactory = new WorkerThreadFactory(classLoader);
75-
threadPool = new ThreadPoolExecutor(configuration.getCorePoolSize(), configuration.getMaxPoolSize(),
76-
configuration.getKeepAliveSeconds(), SECONDS, new SynchronousQueue<>(), threadFactory);
7777
parallelism = configuration.getParallelism();
7878
workerLeaseManager = new WorkerLeaseManager(parallelism, this::maybeStartWorker);
79+
var rejectedExecutionHandler = new LeaseAwareRejectedExecutionHandler(workerLeaseManager);
80+
threadPool = new ThreadPoolExecutor(configuration.getCorePoolSize(), configuration.getMaxPoolSize(),
81+
configuration.getKeepAliveSeconds(), SECONDS, new SynchronousQueue<>(), threadFactory,
82+
rejectedExecutionHandler);
7983
LOGGER.trace(() -> "initialized thread pool for parallelism of " + configuration.getParallelism());
8084
}
8185

@@ -142,26 +146,25 @@ private void maybeStartWorker(BooleanSupplier doneCondition) {
142146
if (workerLease == null) {
143147
return;
144148
}
145-
try {
146-
threadPool.execute(() -> {
147-
LOGGER.trace(() -> "starting worker");
148-
try {
149-
WorkerThread.getOrThrow().processQueueEntries(workerLease, doneCondition);
150-
}
151-
finally {
152-
workerLease.release(false);
153-
LOGGER.trace(() -> "stopping worker");
154-
}
155-
maybeStartWorker(doneCondition);
156-
});
157-
}
158-
catch (RejectedExecutionException e) {
159-
workerLease.release(false);
160-
if (threadPool.isShutdown() || workerLeaseManager.isAtLeastOneLeaseTaken()) {
161-
return;
149+
threadPool.execute(new RunLeaseAwareWorker(workerLease,
150+
() -> WorkerThread.getOrThrow().processQueueEntries(workerLease, doneCondition),
151+
() -> this.maybeStartWorker(doneCondition)));
152+
}
153+
154+
private record RunLeaseAwareWorker(WorkerLease workerLease, Runnable worker, Runnable onWorkerFinished)
155+
implements Runnable {
156+
157+
@Override
158+
public void run() {
159+
LOGGER.trace(() -> "starting worker");
160+
try {
161+
worker.run();
162162
}
163-
LOGGER.error(e, () -> "failed to submit worker to thread pool");
164-
throw e;
163+
finally {
164+
workerLease.release(false);
165+
LOGGER.trace(() -> "stopping worker");
166+
}
167+
onWorkerFinished.run();
165168
}
166169
}
167170

@@ -669,6 +672,12 @@ void reacquire() throws InterruptedException {
669672
LOGGER.trace(() -> "reacquired worker lease (available: %d)".formatted(semaphore.availablePermits()));
670673
}
671674
}
675+
676+
@Override
677+
public String toString() {
678+
return new ToStringBuilder(this).append("parallelism", parallelism).append("semaphore",
679+
semaphore).toString();
680+
}
672681
}
673682

674683
static class WorkerLease implements AutoCloseable {
@@ -705,4 +714,19 @@ void reacquire() throws InterruptedException {
705714
reacquisitionToken = null;
706715
}
707716
}
717+
718+
private record LeaseAwareRejectedExecutionHandler(WorkerLeaseManager workerLeaseManager)
719+
implements RejectedExecutionHandler {
720+
@Override
721+
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
722+
if (!(r instanceof RunLeaseAwareWorker worker)) {
723+
return;
724+
}
725+
worker.workerLease.release(false);
726+
if (executor.isShutdown() || workerLeaseManager.isAtLeastOneLeaseTaken()) {
727+
return;
728+
}
729+
throw new RejectedExecutionException("Task with " + workerLeaseManager + " rejected from " + executor);
730+
}
731+
}
708732
}

0 commit comments

Comments
 (0)