From a0d7b5a427e55f9cefc9b583e2f04df6ef19c48c Mon Sep 17 00:00:00 2001 From: Andrei Dan Date: Wed, 9 Apr 2025 21:02:36 +0100 Subject: [PATCH 1/2] Online prewarming service interface docs and usage in SearchService This adds the interface for search online prewarming with a default NOOP implementation. This also hooks the interface in the SearchService after we fork the query phase to the search thread pool. --- .../search/OnlinePrewarmingService.java | 32 ++++++++++++ .../OnlinePrewarmingServiceProvider.java | 20 ++++++++ .../common/settings/ClusterSettings.java | 1 + .../node/NodeServiceProvider.java | 9 +++- .../elasticsearch/search/SearchService.java | 50 +++++++++++++++++-- .../snapshots/SnapshotResiliencyTests.java | 4 +- .../search/MockSearchService.java | 4 +- 7 files changed, 113 insertions(+), 7 deletions(-) create mode 100644 server/src/main/java/org/elasticsearch/action/search/OnlinePrewarmingService.java create mode 100644 server/src/main/java/org/elasticsearch/action/search/OnlinePrewarmingServiceProvider.java diff --git a/server/src/main/java/org/elasticsearch/action/search/OnlinePrewarmingService.java b/server/src/main/java/org/elasticsearch/action/search/OnlinePrewarmingService.java new file mode 100644 index 0000000000000..570a67388b604 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/action/search/OnlinePrewarmingService.java @@ -0,0 +1,32 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.action.search; + +import org.elasticsearch.index.shard.IndexShard; + +/** + * Interface for prewarming the segments of a shard, tailored for consumption at + * higher volumes than alternative warming strategies (i.e. offline / recovery warming) + * that are more speculative. + */ +public interface OnlinePrewarmingService { + OnlinePrewarmingService NOOP = (indexShard, skipPrewarmingCondition) -> {}; + + /** + * Prewarms resources (typically segments) for the given index shard. + * + * @param indexShard the index shard for which resources should be prewarmed + * @param skipPrewarming a flag indicating whether prewarming should be skipped. + * Callers should decide if certain prewarming calls + * should be skipped and indicate this decision via this + * flag. + */ + void prewarm(IndexShard indexShard, boolean skipPrewarming); +} diff --git a/server/src/main/java/org/elasticsearch/action/search/OnlinePrewarmingServiceProvider.java b/server/src/main/java/org/elasticsearch/action/search/OnlinePrewarmingServiceProvider.java new file mode 100644 index 0000000000000..342e1a6806b2f --- /dev/null +++ b/server/src/main/java/org/elasticsearch/action/search/OnlinePrewarmingServiceProvider.java @@ -0,0 +1,20 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.action.search; + +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.threadpool.ThreadPool; + +public interface OnlinePrewarmingServiceProvider { + OnlinePrewarmingServiceProvider DEFAULT = (settings, threadPool, clusterService) -> OnlinePrewarmingService.NOOP; + + OnlinePrewarmingService create(Settings settings, ThreadPool threadPool, ClusterService clusterService); +} diff --git a/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java b/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java index 3bd58c400e1b2..4bbf3107411b3 100644 --- a/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java @@ -485,6 +485,7 @@ public void apply(Settings value, Settings current, Settings previous) { SearchService.CCS_VERSION_CHECK_SETTING, SearchService.CCS_COLLECT_TELEMETRY, SearchService.BATCHED_QUERY_PHASE, + SearchService.PREWARMING_THRESHOLD_THREADPOOL_SIZE_FACTOR_POOL_SIZE, MultiBucketConsumerService.MAX_BUCKET_SETTING, SearchService.LOW_LEVEL_CANCELLATION_SETTING, SearchService.MAX_OPEN_SCROLL_CONTEXT, diff --git a/server/src/main/java/org/elasticsearch/node/NodeServiceProvider.java b/server/src/main/java/org/elasticsearch/node/NodeServiceProvider.java index b9e58863cad6c..cd9df0162c3d4 100644 --- a/server/src/main/java/org/elasticsearch/node/NodeServiceProvider.java +++ b/server/src/main/java/org/elasticsearch/node/NodeServiceProvider.java @@ -9,6 +9,8 @@ package org.elasticsearch.node; +import org.elasticsearch.action.search.OnlinePrewarmingService; +import org.elasticsearch.action.search.OnlinePrewarmingServiceProvider; import org.elasticsearch.client.internal.node.NodeClient; import org.elasticsearch.cluster.ClusterInfoService; import org.elasticsearch.cluster.InternalClusterInfoService; @@ -123,6 +125,10 @@ SearchService newSearchService( ExecutorSelector executorSelector, Tracer tracer ) { + OnlinePrewarmingService onlinePrewarmingService = pluginsService.loadSingletonServiceProvider( + OnlinePrewarmingServiceProvider.class, + () -> OnlinePrewarmingServiceProvider.DEFAULT + ).create(clusterService.getSettings(), threadPool, clusterService); return new SearchService( clusterService, indicesService, @@ -132,7 +138,8 @@ SearchService newSearchService( fetchPhase, circuitBreakerService, executorSelector, - tracer + tracer, + onlinePrewarmingService ); } diff --git a/server/src/main/java/org/elasticsearch/search/SearchService.java b/server/src/main/java/org/elasticsearch/search/SearchService.java index f4e1bbd421d62..25f7aa6e177a3 100644 --- a/server/src/main/java/org/elasticsearch/search/SearchService.java +++ b/server/src/main/java/org/elasticsearch/search/SearchService.java @@ -24,6 +24,7 @@ import org.elasticsearch.action.ResolvedIndices; import org.elasticsearch.action.search.CanMatchNodeRequest; import org.elasticsearch.action.search.CanMatchNodeResponse; +import org.elasticsearch.action.search.OnlinePrewarmingService; import org.elasticsearch.action.search.SearchShardTask; import org.elasticsearch.action.search.SearchType; import org.elasticsearch.action.support.TransportActions; @@ -147,6 +148,7 @@ import java.util.Set; import java.util.concurrent.ExecutionException; import java.util.concurrent.Executor; +import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -283,6 +285,16 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv Property.NodeScope ); + // This setting ensures that we skip online prewarming tasks if the queuing in the search thread pool + // reaches the configured factor X number of max threads in the search thread pool, such that + // the system has a chance to catch up and prewarming doesn't take over the network bandwidth + public static final Setting PREWARMING_THRESHOLD_THREADPOOL_SIZE_FACTOR_POOL_SIZE = Setting.intSetting( + "search.online_prewarming_threshold_poolsize_factor", + 10, + 0, // 0 would mean we only execute online prewarming if there's no queuing in the search tp + Setting.Property.NodeScope + ); + private static final boolean BATCHED_QUERY_PHASE_FEATURE_FLAG = new FeatureFlag("batched_query_phase").isEnabled(); /** @@ -317,6 +329,8 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv private final FetchPhase fetchPhase; private final CircuitBreaker circuitBreaker; + private final OnlinePrewarmingService onlinePrewarmingService; + private final int prewarmingMaxPoolFactorThreshold; private volatile Executor searchExecutor; private volatile boolean enableQueryPhaseParallelCollection; @@ -362,7 +376,8 @@ public SearchService( FetchPhase fetchPhase, CircuitBreakerService circuitBreakerService, ExecutorSelector executorSelector, - Tracer tracer + Tracer tracer, + OnlinePrewarmingService onlinePrewarmingService ) { Settings settings = clusterService.getSettings(); this.threadPool = threadPool; @@ -375,7 +390,7 @@ public SearchService( this.multiBucketConsumerService = new MultiBucketConsumerService(clusterService, settings, circuitBreaker); this.executorSelector = executorSelector; this.tracer = tracer; - + this.onlinePrewarmingService = onlinePrewarmingService; TimeValue keepAliveInterval = KEEPALIVE_INTERVAL_SETTING.get(settings); setKeepAlives(DEFAULT_KEEPALIVE_SETTING.get(settings), MAX_KEEPALIVE_SETTING.get(settings)); @@ -427,6 +442,7 @@ public SearchService( memoryAccountingBufferSize = MEMORY_ACCOUNTING_BUFFER_SIZE.get(settings).getBytes(); clusterService.getClusterSettings() .addSettingsUpdateConsumer(MEMORY_ACCOUNTING_BUFFER_SIZE, newValue -> this.memoryAccountingBufferSize = newValue.getBytes()); + prewarmingMaxPoolFactorThreshold = PREWARMING_THRESHOLD_THREADPOOL_SIZE_FACTOR_POOL_SIZE.get(settings); } public CircuitBreaker getCircuitBreaker() { @@ -702,6 +718,12 @@ private void ensureAfterSeqNoRefreshed( try { if (waitForCheckpoint <= UNASSIGNED_SEQ_NO) { runAsync(executor, executable, listener); + // we successfully submitted the async task to the search pool so let's prewarm the shard + onlinePrewarmingService.prewarm( + shard, + executor instanceof ThreadPoolExecutor tpe + && ((tpe.getMaximumPoolSize() * prewarmingMaxPoolFactorThreshold) < tpe.getQueue().size()) + ); return; } if (shard.indexSettings().getRefreshInterval().getMillis() <= 0) { @@ -778,6 +800,12 @@ private void searchReady() { timeoutTask.cancel(); } runAsync(executor, executable, listener); + // we successfully submitted the async task to the search pool so let's prewarm the shard + onlinePrewarmingService.prewarm( + shard, + executor instanceof ThreadPoolExecutor tpe + && ((tpe.getMaximumPoolSize() * prewarmingMaxPoolFactorThreshold) < tpe.getQueue().size()) + ); } } }); @@ -939,7 +967,8 @@ public void executeQueryPhase( freeReaderContext(readerContext.id()); throw e; } - runAsync(getExecutor(readerContext.indexShard()), () -> { + Executor executor = getExecutor(readerContext.indexShard()); + runAsync(executor, () -> { final ShardSearchRequest shardSearchRequest = readerContext.getShardSearchRequest(null); try (SearchContext searchContext = createContext(readerContext, shardSearchRequest, task, ResultsType.QUERY, false);) { var opsListener = searchContext.indexShard().getSearchOperationListener(); @@ -965,6 +994,12 @@ public void executeQueryPhase( throw e; } }, wrapFailureListener(listener, readerContext, markAsUsed)); + // we successfully submitted the async task to the search pool so let's prewarm the shard + onlinePrewarmingService.prewarm( + readerContext.indexShard(), + executor instanceof ThreadPoolExecutor tpe + && ((tpe.getMaximumPoolSize() * prewarmingMaxPoolFactorThreshold) < tpe.getQueue().size()) + ); } /** @@ -991,7 +1026,8 @@ public void executeQueryPhase( final Releasable markAsUsed = readerContext.markAsUsed(getKeepAlive(shardSearchRequest)); rewriteAndFetchShardRequest(readerContext.indexShard(), shardSearchRequest, listener.delegateFailure((l, rewritten) -> { // fork the execution in the search thread pool - runAsync(getExecutor(readerContext.indexShard()), () -> { + Executor executor = getExecutor(readerContext.indexShard()); + runAsync(executor, () -> { readerContext.setAggregatedDfs(request.dfs()); try (SearchContext searchContext = createContext(readerContext, shardSearchRequest, task, ResultsType.QUERY, true);) { final QuerySearchResult queryResult; @@ -1029,6 +1065,12 @@ public void executeQueryPhase( throw e; } }, wrapFailureListener(l, readerContext, markAsUsed)); + // we successfully submitted the async task to the search pool so let's prewarm the shard + onlinePrewarmingService.prewarm( + readerContext.indexShard(), + executor instanceof ThreadPoolExecutor tpe + && ((tpe.getMaximumPoolSize() * prewarmingMaxPoolFactorThreshold) < tpe.getQueue().size()) + ); })); } diff --git a/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java b/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java index ce7a5e4262fb7..8ca529d0707c1 100644 --- a/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java +++ b/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java @@ -49,6 +49,7 @@ import org.elasticsearch.action.bulk.TransportShardBulkAction; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.resync.TransportResyncReplicationAction; +import org.elasticsearch.action.search.OnlinePrewarmingService; import org.elasticsearch.action.search.SearchExecutionStatsCollector; import org.elasticsearch.action.search.SearchPhaseController; import org.elasticsearch.action.search.SearchRequest; @@ -2314,7 +2315,8 @@ public RecyclerBytesStreamOutput newNetworkBytesStream() { new FetchPhase(Collections.emptyList()), new NoneCircuitBreakerService(), EmptySystemIndices.INSTANCE.getExecutorSelector(), - Tracer.NOOP + Tracer.NOOP, + OnlinePrewarmingService.NOOP ); final SnapshotFilesProvider snapshotFilesProvider = new SnapshotFilesProvider(repositoriesService); diff --git a/test/framework/src/main/java/org/elasticsearch/search/MockSearchService.java b/test/framework/src/main/java/org/elasticsearch/search/MockSearchService.java index 42b11173a3b19..45e814a0477e5 100644 --- a/test/framework/src/main/java/org/elasticsearch/search/MockSearchService.java +++ b/test/framework/src/main/java/org/elasticsearch/search/MockSearchService.java @@ -9,6 +9,7 @@ package org.elasticsearch.search; +import org.elasticsearch.action.search.OnlinePrewarmingService; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.core.TimeValue; @@ -95,7 +96,8 @@ public MockSearchService( fetchPhase, circuitBreakerService, executorSelector, - tracer + tracer, + OnlinePrewarmingService.NOOP ); } From eb27104872e4ddaf1edb1e11c5a1f977090d9ebb Mon Sep 17 00:00:00 2001 From: Andrei Dan Date: Fri, 11 Apr 2025 10:40:49 +0100 Subject: [PATCH 2/2] Docs, drop skip method argument, test --- .../search/OnlinePrewarmingService.java | 8 +- .../elasticsearch/search/SearchService.java | 56 +++++++----- .../search/SearchServiceTests.java | 86 +++++++++++++++++++ 3 files changed, 123 insertions(+), 27 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/search/OnlinePrewarmingService.java b/server/src/main/java/org/elasticsearch/action/search/OnlinePrewarmingService.java index 570a67388b604..2601353b50565 100644 --- a/server/src/main/java/org/elasticsearch/action/search/OnlinePrewarmingService.java +++ b/server/src/main/java/org/elasticsearch/action/search/OnlinePrewarmingService.java @@ -17,16 +17,12 @@ * that are more speculative. */ public interface OnlinePrewarmingService { - OnlinePrewarmingService NOOP = (indexShard, skipPrewarmingCondition) -> {}; + OnlinePrewarmingService NOOP = indexShard -> {}; /** * Prewarms resources (typically segments) for the given index shard. * * @param indexShard the index shard for which resources should be prewarmed - * @param skipPrewarming a flag indicating whether prewarming should be skipped. - * Callers should decide if certain prewarming calls - * should be skipped and indicate this decision via this - * flag. */ - void prewarm(IndexShard indexShard, boolean skipPrewarming); + void prewarm(IndexShard indexShard); } diff --git a/server/src/main/java/org/elasticsearch/search/SearchService.java b/server/src/main/java/org/elasticsearch/search/SearchService.java index 25f7aa6e177a3..7f7b3bacc12ae 100644 --- a/server/src/main/java/org/elasticsearch/search/SearchService.java +++ b/server/src/main/java/org/elasticsearch/search/SearchService.java @@ -290,7 +290,7 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv // the system has a chance to catch up and prewarming doesn't take over the network bandwidth public static final Setting PREWARMING_THRESHOLD_THREADPOOL_SIZE_FACTOR_POOL_SIZE = Setting.intSetting( "search.online_prewarming_threshold_poolsize_factor", - 10, + 10, // we will only execute online prewarming if there are less than 10 queued up items/ search thread 0, // 0 would mean we only execute online prewarming if there's no queuing in the search tp Setting.Property.NodeScope ); @@ -719,11 +719,9 @@ private void ensureAfterSeqNoRefreshed( if (waitForCheckpoint <= UNASSIGNED_SEQ_NO) { runAsync(executor, executable, listener); // we successfully submitted the async task to the search pool so let's prewarm the shard - onlinePrewarmingService.prewarm( - shard, - executor instanceof ThreadPoolExecutor tpe - && ((tpe.getMaximumPoolSize() * prewarmingMaxPoolFactorThreshold) < tpe.getQueue().size()) - ); + if (isExecutorQueuedBeyondPrewarmingFactor(executor, prewarmingMaxPoolFactorThreshold) == false) { + onlinePrewarmingService.prewarm(shard); + } return; } if (shard.indexSettings().getRefreshInterval().getMillis() <= 0) { @@ -801,11 +799,9 @@ private void searchReady() { } runAsync(executor, executable, listener); // we successfully submitted the async task to the search pool so let's prewarm the shard - onlinePrewarmingService.prewarm( - shard, - executor instanceof ThreadPoolExecutor tpe - && ((tpe.getMaximumPoolSize() * prewarmingMaxPoolFactorThreshold) < tpe.getQueue().size()) - ); + if (isExecutorQueuedBeyondPrewarmingFactor(executor, prewarmingMaxPoolFactorThreshold) == false) { + onlinePrewarmingService.prewarm(shard); + } } } }); @@ -814,6 +810,28 @@ private void searchReady() { } } + /** + * Checks if the executor is queued beyond the prewarming factor threshold, relative to the + * number of threads in the pool. + * This is used to determine if we should prewarm the shard - i.e. if the executor doesn't + * contain queued tasks beyond the prewarming factor threshold X max pool size. + * + * @param searchOperationsExecutor the executor that executes the search operations + * @param prewarmingMaxPoolFactorThreshold maximum number of queued up items / thread in the search pool + */ + // visible for testing + static boolean isExecutorQueuedBeyondPrewarmingFactor(Executor searchOperationsExecutor, int prewarmingMaxPoolFactorThreshold) { + if (searchOperationsExecutor instanceof ThreadPoolExecutor tpe) { + return (tpe.getMaximumPoolSize() * prewarmingMaxPoolFactorThreshold) < tpe.getQueue().size(); + } else { + logger.trace( + "received executor [{}] that we can't inspect for queueing. allowing online prewarming for all searches", + searchOperationsExecutor + ); + return false; + } + } + private IndexShard getShard(ShardSearchRequest request) { final ShardSearchContextId contextId = request.readerId(); if (contextId != null && sessionId.equals(contextId.getSessionId())) { @@ -995,11 +1013,9 @@ public void executeQueryPhase( } }, wrapFailureListener(listener, readerContext, markAsUsed)); // we successfully submitted the async task to the search pool so let's prewarm the shard - onlinePrewarmingService.prewarm( - readerContext.indexShard(), - executor instanceof ThreadPoolExecutor tpe - && ((tpe.getMaximumPoolSize() * prewarmingMaxPoolFactorThreshold) < tpe.getQueue().size()) - ); + if (isExecutorQueuedBeyondPrewarmingFactor(executor, prewarmingMaxPoolFactorThreshold) == false) { + onlinePrewarmingService.prewarm(readerContext.indexShard()); + } } /** @@ -1066,11 +1082,9 @@ public void executeQueryPhase( } }, wrapFailureListener(l, readerContext, markAsUsed)); // we successfully submitted the async task to the search pool so let's prewarm the shard - onlinePrewarmingService.prewarm( - readerContext.indexShard(), - executor instanceof ThreadPoolExecutor tpe - && ((tpe.getMaximumPoolSize() * prewarmingMaxPoolFactorThreshold) < tpe.getQueue().size()) - ); + if (isExecutorQueuedBeyondPrewarmingFactor(executor, prewarmingMaxPoolFactorThreshold) == false) { + onlinePrewarmingService.prewarm(readerContext.indexShard()); + } })); } diff --git a/server/src/test/java/org/elasticsearch/search/SearchServiceTests.java b/server/src/test/java/org/elasticsearch/search/SearchServiceTests.java index 7c89c106a7c69..c26dfb630a4bc 100644 --- a/server/src/test/java/org/elasticsearch/search/SearchServiceTests.java +++ b/server/src/test/java/org/elasticsearch/search/SearchServiceTests.java @@ -53,16 +53,25 @@ import org.elasticsearch.search.sort.BucketedSort; import org.elasticsearch.search.sort.MinAndMax; import org.elasticsearch.search.sort.SortOrder; +import org.elasticsearch.telemetry.metric.MeterRegistry; import org.elasticsearch.test.MockLog; +import org.elasticsearch.threadpool.DefaultBuiltInExecutorBuilders; +import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xcontent.XContentParserConfiguration; import java.io.IOException; import java.util.Collections; +import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.BiFunction; import java.util.function.Predicate; import static org.elasticsearch.common.Strings.format; +import static org.elasticsearch.common.util.concurrent.EsExecutors.DIRECT_EXECUTOR_SERVICE; +import static org.elasticsearch.search.SearchService.isExecutorQueuedBeyondPrewarmingFactor; import static org.elasticsearch.search.SearchService.maybeWrapListenerForStackTrace; import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.Matchers.not; @@ -231,6 +240,83 @@ public void onFailure(Exception e) { } } + public void testIsExecutorQueuedBeyondPrewarmingFactor() throws InterruptedException { + { + final String threadPoolName = randomFrom( + ThreadPool.THREAD_POOL_TYPES.entrySet() + .stream() + .filter(t -> t.getValue().equals(ThreadPool.ThreadPoolType.FIXED)) + .map(Map.Entry::getKey) + .sorted() + .toList() + ); + final int size = 6; + final int queueSize = size * 100; + + ThreadPool threadPool = null; + final Settings nodeSettings = Settings.builder() + .put("node.name", "testPrewarmingBasedOnQueuedItems") + .put("thread_pool." + threadPoolName + ".size", size) + .put("thread_pool." + threadPoolName + ".queue_size", queueSize) + .build(); + final CountDownLatch blockThreadPoolToQueueItems = new CountDownLatch(1); + + try { + threadPool = new ThreadPool(nodeSettings, MeterRegistry.NOOP, new DefaultBuiltInExecutorBuilders()); + ExecutorService executor = threadPool.executor(threadPoolName); + + // these tasks will consume the thread pool causing further + // submissions to queue + final CountDownLatch occupyAllThreads = new CountDownLatch(size); + for (int i = 0; i < size; i++) { + executor.execute(() -> { + try { + occupyAllThreads.countDown(); + blockThreadPoolToQueueItems.await(); + } catch (InterruptedException e) { + fail(e.toString()); + } + }); + } + + // wait for all threads to have an active task in their hands + occupyAllThreads.await(); + + // now on to the fun stuff, let's queue up items - 2 queued items + // for every thread in the pool (plus one more for one thread) + for (int i = 0; i < 13; i++) { + executor.execute(() -> {}); + } + + // 13 queued up items + assertThat(isExecutorQueuedBeyondPrewarmingFactor(executor, 10), is(false)); + assertThat(isExecutorQueuedBeyondPrewarmingFactor(executor, 2), is(true)); + + // let's get us at the 10 factor (6 threads * 10 + 1= 61 queued up items - at which point we should indicate + // prewarming should not happen) + for (int i = 0; i < 48; i++) { + executor.execute(() -> {}); + } + + // 61 queued up items + assertThat(isExecutorQueuedBeyondPrewarmingFactor(executor, 10), is(true)); + blockThreadPoolToQueueItems.countDown(); + } catch (AssertionError e) { + // terminate more gracefully if there's an assertion error above + blockThreadPoolToQueueItems.countDown(); + throw e; + } finally { + ThreadPool.terminate(threadPool, 10, TimeUnit.SECONDS); + } + } + + { + // executors that are not ThreadPoolExecutor (i.e. no stats available) are always + // allowing prewarming + assertThat(isExecutorQueuedBeyondPrewarmingFactor(DIRECT_EXECUTOR_SERVICE, 2), is(false)); + } + } + private void doTestCanMatch( SearchRequest searchRequest, SortField sortField,