diff --git a/docs/changelog/124732.yaml b/docs/changelog/124732.yaml
new file mode 100644
index 0000000000000..671c04b478ded
--- /dev/null
+++ b/docs/changelog/124732.yaml
@@ -0,0 +1,6 @@
+pr: 124732
+summary: Prevent rare starvation bug when using scaling `EsThreadPoolExecutor` with empty core pool size.
+area: Infra/Core
+type: bug
+issues:
+ - 124667
diff --git a/server/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.java b/server/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.java
index 0c76e6d97cfb2..28849a825bf25 100644
--- a/server/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.java
+++ b/server/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.java
@@ -96,6 +96,21 @@ public static PrioritizedEsThreadPoolExecutor newSinglePrioritizing(
return new PrioritizedEsThreadPoolExecutor(name, 1, 1, 0L, TimeUnit.MILLISECONDS, threadFactory, contextHolder, timer);
}
+ /**
+ * Creates a scaling {@link EsThreadPoolExecutor} using an unbounded work queue.
+ *
+ * The {@link EsThreadPoolExecutor} scales the same way as a regular {@link ThreadPoolExecutor} until the core pool size
+ * (and at least 1) is reached: each time a task is submitted a new worker is added regardless if an idle worker is available.
+ *
+ * Once having reached the core pool size, a {@link ThreadPoolExecutor} will only add a new worker if the work queue rejects
+ * a task offer. Typically, using a regular unbounded queue, task offers won't ever be rejected, meaning the worker pool would never
+ * scale beyond the core pool size.
+ *
+ * Scaling {@link EsThreadPoolExecutor}s use a customized unbounded {@link LinkedTransferQueue}, which rejects every task offer unless
+ * it can be immediately transferred to an available idle worker. If no such worker is available, the executor will add
+ * a new worker if capacity remains, otherwise the task is rejected and then appended to the work queue via the {@link ForceQueuePolicy}
+ * rejection handler.
+ */
public static EsThreadPoolExecutor newScaling(
String name,
int min,
@@ -107,10 +122,12 @@ public static EsThreadPoolExecutor newScaling(
ThreadContext contextHolder,
TaskTrackingConfig config
) {
- ExecutorScalingQueue queue = new ExecutorScalingQueue<>();
- EsThreadPoolExecutor executor;
+ LinkedTransferQueue queue = newUnboundedScalingLTQueue(min, max);
+ // Force queued work via ForceQueuePolicy might starve if no worker is available (if core size is empty),
+ // probing the worker pool prevents this.
+ boolean probeWorkerPool = min == 0 && queue instanceof ExecutorScalingQueue;
if (config.trackExecutionTime()) {
- executor = new TaskExecutionTimeTrackingEsThreadPoolExecutor(
+ return new TaskExecutionTimeTrackingEsThreadPoolExecutor(
name,
min,
max,
@@ -119,12 +136,12 @@ public static EsThreadPoolExecutor newScaling(
queue,
TimedRunnable::new,
threadFactory,
- new ForceQueuePolicy(rejectAfterShutdown),
+ new ForceQueuePolicy(rejectAfterShutdown, probeWorkerPool),
contextHolder,
config
);
} else {
- executor = new EsThreadPoolExecutor(
+ return new EsThreadPoolExecutor(
name,
min,
max,
@@ -132,14 +149,27 @@ public static EsThreadPoolExecutor newScaling(
unit,
queue,
threadFactory,
- new ForceQueuePolicy(rejectAfterShutdown),
+ new ForceQueuePolicy(rejectAfterShutdown, probeWorkerPool),
contextHolder
);
}
- queue.executor = executor;
- return executor;
}
+ /**
+ * Creates a scaling {@link EsThreadPoolExecutor} using an unbounded work queue.
+ *
+ * The {@link EsThreadPoolExecutor} scales the same way as a regular {@link ThreadPoolExecutor} until the core pool size
+ * (and at least 1) is reached: each time a task is submitted a new worker is added regardless if an idle worker is available.
+ *
+ * Once having reached the core pool size, a {@link ThreadPoolExecutor} will only add a new worker if the work queue rejects
+ * a task offer. Typically, using a regular unbounded queue, task offers won't ever be rejected, meaning the worker pool would never
+ * scale beyond the core pool size.
+ *
+ * Scaling {@link EsThreadPoolExecutor}s use a customized unbounded {@link LinkedTransferQueue}, which rejects every task offer unless
+ * it can be immediately transferred to an available idle worker. If no such worker is available, the executor will add
+ * a new worker if capacity remains, otherwise the task is rejected and then appended to the work queue via the {@link ForceQueuePolicy}
+ * rejection handler.
+ */
public static EsThreadPoolExecutor newScaling(
String name,
int min,
@@ -389,32 +419,58 @@ public boolean isSystem() {
*/
private EsExecutors() {}
- static class ExecutorScalingQueue extends LinkedTransferQueue {
+ private static LinkedTransferQueue newUnboundedScalingLTQueue(int corePoolSize, int maxPoolSize) {
+ if (maxPoolSize == 1 || maxPoolSize == corePoolSize) {
+ // scaling beyond core pool size (or 1) not required, use a regular unbounded LinkedTransferQueue
+ return new LinkedTransferQueue<>();
+ }
+ // scaling beyond core pool size with an unbounded queue requires ExecutorScalingQueue
+ // note, reconfiguration of core / max pool size not supported in EsThreadPoolExecutor
+ return new ExecutorScalingQueue<>();
+ }
- ThreadPoolExecutor executor;
+ /**
+ * Customized {@link LinkedTransferQueue} to allow a {@link ThreadPoolExecutor} to scale beyond its core pool size despite having an
+ * unbounded queue.
+ *
+ * Note, usage of unbounded work queues is a problem by itself. For once, it makes error-prone customizations necessary so that
+ * thread pools can scale up adequately. But worse, infinite queues prevent backpressure and impose a high risk of causing OOM errors.
+ * Github #18613 captures various long outstanding, but important
+ * improvements to thread pools.
+ *
+ * Once having reached its core pool size, a {@link ThreadPoolExecutor} will only add more workers if capacity remains and
+ * the task offer is rejected by the work queue. Typically that's never the case using a regular unbounded queue.
+ *
+ * This customized implementation rejects every task offer unless it can be immediately transferred to an available idle worker.
+ * It relies on {@link ForceQueuePolicy} rejection handler to append the task to the work queue if no additional worker can be added
+ * and the task is rejected by the executor.
+ *
+ * Note, {@link ForceQueuePolicy} cannot guarantee there will be available workers when appending tasks directly to the queue.
+ * For that reason {@link ExecutorScalingQueue} cannot be used with executors with empty core and max pool size of 1:
+ * the only available worker could time out just about at the same time as the task is appended, see
+ * Github #124667 for more details.
+ *
+ * Note, configuring executors using core = max size in combination with {@code allowCoreThreadTimeOut} could be an alternative to
+ * {@link ExecutorScalingQueue}. However, the scaling behavior would be very different: Using {@link ExecutorScalingQueue}
+ * we are able to reuse idle workers if available by means of {@link ExecutorScalingQueue#tryTransfer(Object)}.
+ * If setting core = max size, the executor will add a new worker for every task submitted until reaching the core/max pool size
+ * even if there's idle workers available.
+ */
+ static class ExecutorScalingQueue extends LinkedTransferQueue {
ExecutorScalingQueue() {}
@Override
public boolean offer(E e) {
- // first try to transfer to a waiting worker thread
- if (tryTransfer(e) == false) {
- // check if there might be spare capacity in the thread
- // pool executor
- int left = executor.getMaximumPoolSize() - executor.getCorePoolSize();
- if (left > 0) {
- // reject queuing the task to force the thread pool
- // executor to add a worker if it can; combined
- // with ForceQueuePolicy, this causes the thread
- // pool to always scale up to max pool size and we
- // only queue when there is no spare capacity
- return false;
- } else {
- return super.offer(e);
- }
- } else {
- return true;
+ if (e == EsThreadPoolExecutor.WORKER_PROBE) { // referential equality
+ // this probe ensures a worker is available after force queueing a task via ForceQueuePolicy
+ return super.offer(e);
}
+ // try to transfer to a waiting worker thread
+ // otherwise reject queuing the task to force the thread pool executor to add a worker if it can;
+ // combined with ForceQueuePolicy, this causes the thread pool to always scale up to max pool size
+ // so that we only queue when there is no spare capacity
+ return tryTransfer(e);
}
// Overridden to workaround a JDK bug introduced in JDK 21.0.2
@@ -456,15 +512,24 @@ static class ForceQueuePolicy extends EsRejectedExecutionHandler {
*/
private final boolean rejectAfterShutdown;
+ /**
+ * Flag to indicate if the worker pool needs to be probed after force queuing a task to guarantee a worker is available.
+ */
+ private final boolean probeWorkerPool;
+
/**
* @param rejectAfterShutdown indicates if {@link Runnable} should be rejected once the thread pool is shutting down
*/
- ForceQueuePolicy(boolean rejectAfterShutdown) {
+ ForceQueuePolicy(boolean rejectAfterShutdown, boolean probeWorkerPool) {
this.rejectAfterShutdown = rejectAfterShutdown;
+ this.probeWorkerPool = probeWorkerPool;
}
@Override
public void rejectedExecution(Runnable task, ThreadPoolExecutor executor) {
+ if (task == EsThreadPoolExecutor.WORKER_PROBE) { // referential equality
+ return;
+ }
if (rejectAfterShutdown) {
if (executor.isShutdown()) {
reject(executor, task);
@@ -481,12 +546,19 @@ public void rejectedExecution(Runnable task, ThreadPoolExecutor executor) {
}
}
- private static void put(ThreadPoolExecutor executor, Runnable task) {
+ private void put(ThreadPoolExecutor executor, Runnable task) {
final BlockingQueue queue = executor.getQueue();
- // force queue policy should only be used with a scaling queue
- assert queue instanceof ExecutorScalingQueue;
+ // force queue policy should only be used with a scaling queue (ExecutorScalingQueue / LinkedTransferQueue)
+ assert queue instanceof LinkedTransferQueue;
try {
queue.put(task);
+ if (probeWorkerPool && task == queue.peek()) { // referential equality
+ // If the task is at the head of the queue, we can assume the queue was previously empty. In this case available workers
+ // might have timed out in the meanwhile. To prevent the task from starving, we submit a noop probe to the executor.
+ // Note, this deliberately doesn't check getPoolSize()==0 to avoid potential race conditions,
+ // as the count in the atomic state (used by workerCountOf) is decremented first.
+ executor.execute(EsThreadPoolExecutor.WORKER_PROBE);
+ }
} catch (final InterruptedException e) {
assert false : "a scaling queue never blocks so a put to it can never be interrupted";
throw new AssertionError(e);
diff --git a/server/src/main/java/org/elasticsearch/common/util/concurrent/EsThreadPoolExecutor.java b/server/src/main/java/org/elasticsearch/common/util/concurrent/EsThreadPoolExecutor.java
index a4d2777a48b63..ad4616692850e 100644
--- a/server/src/main/java/org/elasticsearch/common/util/concurrent/EsThreadPoolExecutor.java
+++ b/server/src/main/java/org/elasticsearch/common/util/concurrent/EsThreadPoolExecutor.java
@@ -29,6 +29,15 @@ public class EsThreadPoolExecutor extends ThreadPoolExecutor {
private static final Logger logger = LogManager.getLogger(EsThreadPoolExecutor.class);
+ // noop probe to prevent starvation of work in the work queue due to ForceQueuePolicy
+ // https://github.com/elastic/elasticsearch/issues/124667
+ // note, this is intentionally not a lambda to avoid this ever be turned into a compile time constant
+ // matching similar lambdas coming from other places
+ static final Runnable WORKER_PROBE = new Runnable() {
+ @Override
+ public void run() {}
+ };
+
private final ThreadContext contextHolder;
/**
@@ -66,9 +75,19 @@ public class EsThreadPoolExecutor extends ThreadPoolExecutor {
this.contextHolder = contextHolder;
}
+ @Override
+ public void setCorePoolSize(int corePoolSize) {
+ throw new UnsupportedOperationException("reconfiguration at runtime is not supported");
+ }
+
+ @Override
+ public void setMaximumPoolSize(int maximumPoolSize) {
+ throw new UnsupportedOperationException("reconfiguration at runtime is not supported");
+ }
+
@Override
public void execute(Runnable command) {
- final Runnable wrappedRunnable = wrapRunnable(command);
+ final Runnable wrappedRunnable = command != WORKER_PROBE ? wrapRunnable(command) : WORKER_PROBE;
try {
super.execute(wrappedRunnable);
} catch (Exception e) {
diff --git a/server/src/main/java/org/elasticsearch/threadpool/ScalingExecutorBuilder.java b/server/src/main/java/org/elasticsearch/threadpool/ScalingExecutorBuilder.java
index 1017d41a77444..0fb2f1e471d0b 100644
--- a/server/src/main/java/org/elasticsearch/threadpool/ScalingExecutorBuilder.java
+++ b/server/src/main/java/org/elasticsearch/threadpool/ScalingExecutorBuilder.java
@@ -105,9 +105,14 @@ public ScalingExecutorBuilder(
final EsExecutors.TaskTrackingConfig trackingConfig
) {
super(name, false);
- this.coreSetting = Setting.intSetting(settingsKey(prefix, "core"), core, Setting.Property.NodeScope);
- this.maxSetting = Setting.intSetting(settingsKey(prefix, "max"), max, Setting.Property.NodeScope);
- this.keepAliveSetting = Setting.timeSetting(settingsKey(prefix, "keep_alive"), keepAlive, Setting.Property.NodeScope);
+ this.coreSetting = Setting.intSetting(settingsKey(prefix, "core"), core, 0, Setting.Property.NodeScope);
+ this.maxSetting = Setting.intSetting(settingsKey(prefix, "max"), max, 1, Setting.Property.NodeScope);
+ this.keepAliveSetting = Setting.timeSetting(
+ settingsKey(prefix, "keep_alive"),
+ keepAlive,
+ TimeValue.ZERO,
+ Setting.Property.NodeScope
+ );
this.rejectAfterShutdown = rejectAfterShutdown;
this.trackingConfig = trackingConfig;
}
@@ -172,5 +177,4 @@ static class ScalingExecutorSettings extends ExecutorBuilder.ExecutorSettings {
this.keepAlive = keepAlive;
}
}
-
}
diff --git a/server/src/test/java/org/elasticsearch/action/support/RefCountingRunnableTests.java b/server/src/test/java/org/elasticsearch/action/support/RefCountingRunnableTests.java
index 5363722f2f49f..abbbd53dec570 100644
--- a/server/src/test/java/org/elasticsearch/action/support/RefCountingRunnableTests.java
+++ b/server/src/test/java/org/elasticsearch/action/support/RefCountingRunnableTests.java
@@ -100,7 +100,7 @@ public void testAsyncAcquire() throws InterruptedException {
final var completionLatch = new CountDownLatch(1);
final var executorService = EsExecutors.newScaling(
"test",
- 0,
+ 1,
between(1, 10),
10,
TimeUnit.SECONDS,
diff --git a/server/src/test/java/org/elasticsearch/common/util/concurrent/EsExecutorsTests.java b/server/src/test/java/org/elasticsearch/common/util/concurrent/EsExecutorsTests.java
index 2867c9e007937..e87c0d00c15cd 100644
--- a/server/src/test/java/org/elasticsearch/common/util/concurrent/EsExecutorsTests.java
+++ b/server/src/test/java/org/elasticsearch/common/util/concurrent/EsExecutorsTests.java
@@ -14,6 +14,7 @@
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.Processors;
+import org.elasticsearch.core.TimeValue;
import org.elasticsearch.node.Node;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.ThreadPool;
@@ -754,4 +755,153 @@ public void onRejection(Exception e) {
executor.execute(shouldBeRejected);
assertTrue(rejected.get());
}
+
+ public void testScalingWithEmptyCore() {
+ testScalingWithEmptyCore(
+ EsExecutors.newScaling(
+ getTestName(),
+ 0,
+ 1,
+ 0,
+ TimeUnit.MILLISECONDS,
+ true,
+ EsExecutors.daemonThreadFactory(getTestName()),
+ threadContext
+ )
+ );
+ }
+
+ public void testScalingWithEmptyCoreAndKeepAlive() {
+ testScalingWithEmptyCore(
+ EsExecutors.newScaling(
+ getTestName(),
+ 0,
+ 1,
+ 1,
+ TimeUnit.MILLISECONDS,
+ true,
+ EsExecutors.daemonThreadFactory(getTestName()),
+ threadContext
+ )
+ );
+ }
+
+ public void testScalingWithEmptyCoreAndLargerMaxSize() {
+ // TODO currently the reproduction of the starvation bug does not work if max pool size > 1
+ // https://github.com/elastic/elasticsearch/issues/124867
+ testScalingWithEmptyCore(
+ EsExecutors.newScaling(
+ getTestName(),
+ 0,
+ between(2, 5),
+ 0,
+ TimeUnit.MILLISECONDS,
+ true,
+ EsExecutors.daemonThreadFactory(getTestName()),
+ threadContext
+ )
+ );
+ }
+
+ public void testScalingWithEmptyCoreAndKeepAliveAndLargerMaxSize() {
+ // TODO currently the reproduction of the starvation bug does not work if max pool size > 1
+ // https://github.com/elastic/elasticsearch/issues/124867
+ testScalingWithEmptyCore(
+ EsExecutors.newScaling(
+ getTestName(),
+ 0,
+ between(2, 5),
+ 1,
+ TimeUnit.MILLISECONDS,
+ true,
+ EsExecutors.daemonThreadFactory(getTestName()),
+ threadContext
+ )
+ );
+ }
+
+ public void testScalingWithEmptyCoreAndWorkerPoolProbing() {
+ // https://github.com/elastic/elasticsearch/issues/124667 is difficult to reproduce if max pool size > 1.
+ // if probing mitigates the bug for max pool size = 1, we're good for larger pool sizes as well.
+ // the executor is created directly here, newScaling doesn't use ExecutorScalingQueue & probing if max pool size = 1.
+ testScalingWithEmptyCore(
+ new EsThreadPoolExecutor(
+ getTestName(),
+ 0,
+ 1,
+ 0,
+ TimeUnit.MILLISECONDS,
+ new EsExecutors.ExecutorScalingQueue<>(),
+ EsExecutors.daemonThreadFactory(getTestName()),
+ new EsExecutors.ForceQueuePolicy(true, true),
+ threadContext
+ )
+ );
+ }
+
+ public void testScalingWithEmptyCoreAndKeepAliveAndWorkerPoolProbing() {
+ // https://github.com/elastic/elasticsearch/issues/124667 is difficult to reproduce if max pool size > 1.
+ // if probing mitigates the bug for max pool size = 1, we're good for larger pool sizes as well.
+ // the executor is created directly here, newScaling doesn't use ExecutorScalingQueue & probing if max pool size = 1.
+ testScalingWithEmptyCore(
+ new EsThreadPoolExecutor(
+ getTestName(),
+ 0,
+ 1,
+ 1,
+ TimeUnit.MILLISECONDS,
+ new EsExecutors.ExecutorScalingQueue<>(),
+ EsExecutors.daemonThreadFactory(getTestName()),
+ new EsExecutors.ForceQueuePolicy(true, true),
+ threadContext
+ )
+ );
+ }
+
+ private void testScalingWithEmptyCore(EsThreadPoolExecutor executor) {
+ try {
+ class Task extends AbstractRunnable {
+ private int remaining;
+ private final CountDownLatch doneLatch;
+
+ Task(int iterations, CountDownLatch doneLatch) {
+ this.remaining = iterations;
+ this.doneLatch = doneLatch;
+ }
+
+ @Override
+ public void onFailure(Exception e) {
+ fail(e);
+ }
+
+ @Override
+ protected void doRun() {
+ if (--remaining == 0) {
+ doneLatch.countDown();
+ } else {
+ logger.trace("--> remaining [{}]", remaining);
+ final long keepAliveNanos = executor.getKeepAliveTime(TimeUnit.NANOSECONDS);
+ new Thread(() -> {
+ if (keepAliveNanos > 0) {
+ final var targetNanoTime = System.nanoTime() + keepAliveNanos + between(-10_000, 10_000);
+ while (System.nanoTime() < targetNanoTime) {
+ Thread.yield();
+ }
+ }
+ executor.execute(Task.this);
+ }).start();
+ }
+ }
+ }
+
+ for (int i = 0; i < 20; i++) {
+ logger.trace("--> attempt [{}]", i);
+ final var doneLatch = new CountDownLatch(1);
+ executor.execute(new Task(between(1, 500), doneLatch));
+ safeAwait(doneLatch, TimeValue.ONE_MINUTE);
+ }
+ } finally {
+ ThreadPool.terminate(executor, 1, TimeUnit.SECONDS);
+ }
+ }
}
diff --git a/server/src/test/java/org/elasticsearch/search/SearchServiceSingleNodeTests.java b/server/src/test/java/org/elasticsearch/search/SearchServiceSingleNodeTests.java
index 1bdde2e15b20b..66b9e69aea906 100644
--- a/server/src/test/java/org/elasticsearch/search/SearchServiceSingleNodeTests.java
+++ b/server/src/test/java/org/elasticsearch/search/SearchServiceSingleNodeTests.java
@@ -178,6 +178,8 @@
public class SearchServiceSingleNodeTests extends ESSingleNodeTestCase {
+ private static final int SEARCH_POOL_SIZE = 10;
+
@Override
protected boolean resetNodeAfterTest() {
return true;
@@ -263,7 +265,10 @@ public void onQueryPhase(SearchContext context, long tookInNanos) {
@Override
protected Settings nodeSettings() {
- return Settings.builder().put("search.default_search_timeout", "5s").build();
+ return Settings.builder()
+ .put("search.default_search_timeout", "5s")
+ .put("thread_pool.search.size", SEARCH_POOL_SIZE) // customized search pool size, reconfiguring at runtime is unsupported
+ .build();
}
public void testClearOnClose() {
@@ -2088,6 +2093,7 @@ public void onFailure(Exception exc) {
CountDownLatch latch = new CountDownLatch(1);
shardRequest.source().query(new MatchNoneQueryBuilder());
service.executeQueryPhase(shardRequest, task, new ActionListener<>() {
+
@Override
public void onResponse(SearchPhaseResult result) {
try {
@@ -2688,8 +2694,11 @@ public void testEnableSearchWorkerThreads() throws IOException {
public void testSlicingBehaviourForParallelCollection() throws Exception {
IndexService indexService = createIndex("index", Settings.EMPTY);
ThreadPoolExecutor executor = (ThreadPoolExecutor) indexService.getThreadPool().executor(ThreadPool.Names.SEARCH);
- final int configuredMaxPoolSize = 10;
- executor.setMaximumPoolSize(configuredMaxPoolSize); // We set this explicitly to be independent of CPU cores.
+
+ // We configure the executor pool size explicitly in nodeSettings to be independent of CPU cores
+ assert String.valueOf(SEARCH_POOL_SIZE).equals(node().settings().get("thread_pool.search.size"))
+ : "Unexpected thread_pool.search.size";
+
int numDocs = randomIntBetween(50, 100);
for (int i = 0; i < numDocs; i++) {
prepareIndex("index").setId(String.valueOf(i)).setSource("field", "value").get();
@@ -2722,7 +2731,7 @@ public void testSlicingBehaviourForParallelCollection() throws Exception {
final int maxPoolSize = executor.getMaximumPoolSize();
assertEquals(
"Sanity check to ensure this isn't the default of 1 when pool size is unset",
- configuredMaxPoolSize,
+ SEARCH_POOL_SIZE,
maxPoolSize
);
@@ -2752,7 +2761,7 @@ public void testSlicingBehaviourForParallelCollection() throws Exception {
final int maxPoolSize = executor.getMaximumPoolSize();
assertEquals(
"Sanity check to ensure this isn't the default of 1 when pool size is unset",
- configuredMaxPoolSize,
+ SEARCH_POOL_SIZE,
maxPoolSize
);
@@ -2843,7 +2852,7 @@ public void testSlicingBehaviourForParallelCollection() throws Exception {
final int maxPoolSize = executor.getMaximumPoolSize();
assertEquals(
"Sanity check to ensure this isn't the default of 1 when pool size is unset",
- configuredMaxPoolSize,
+ SEARCH_POOL_SIZE,
maxPoolSize
);
diff --git a/test/framework/src/main/java/org/elasticsearch/repositories/blobstore/ESMockAPIBasedRepositoryIntegTestCase.java b/test/framework/src/main/java/org/elasticsearch/repositories/blobstore/ESMockAPIBasedRepositoryIntegTestCase.java
index 21de4872c7b2c..372f61698ed8a 100644
--- a/test/framework/src/main/java/org/elasticsearch/repositories/blobstore/ESMockAPIBasedRepositoryIntegTestCase.java
+++ b/test/framework/src/main/java/org/elasticsearch/repositories/blobstore/ESMockAPIBasedRepositoryIntegTestCase.java
@@ -91,7 +91,7 @@ public static void startHttpServer() throws Exception {
// the EncryptedRepository can require more than one connection open at one time
executorService = EsExecutors.newScaling(
ESMockAPIBasedRepositoryIntegTestCase.class.getName(),
- 0,
+ 1,
2,
60,
TimeUnit.SECONDS,
diff --git a/test/framework/src/main/java/org/elasticsearch/test/transport/MockTransportService.java b/test/framework/src/main/java/org/elasticsearch/test/transport/MockTransportService.java
index 0ecd9bcd86dfc..df77d2b939dfe 100644
--- a/test/framework/src/main/java/org/elasticsearch/test/transport/MockTransportService.java
+++ b/test/framework/src/main/java/org/elasticsearch/test/transport/MockTransportService.java
@@ -314,7 +314,7 @@ private MockTransportService(
this.original = transport.getDelegate();
this.testExecutor = EsExecutors.newScaling(
"mock-transport",
- 0,
+ 1,
4,
30,
TimeUnit.SECONDS,