Skip to content

Commit eb27104

Browse files
committed
Docs, drop skip method argument, test
1 parent 97e5bd8 commit eb27104

File tree

3 files changed

+123
-27
lines changed

3 files changed

+123
-27
lines changed

server/src/main/java/org/elasticsearch/action/search/OnlinePrewarmingService.java

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,16 +17,12 @@
1717
* that are more speculative.
1818
*/
1919
public interface OnlinePrewarmingService {
20-
OnlinePrewarmingService NOOP = (indexShard, skipPrewarmingCondition) -> {};
20+
OnlinePrewarmingService NOOP = indexShard -> {};
2121

2222
/**
2323
* Prewarms resources (typically segments) for the given index shard.
2424
*
2525
* @param indexShard the index shard for which resources should be prewarmed
26-
* @param skipPrewarming a flag indicating whether prewarming should be skipped.
27-
* Callers should decide if certain prewarming calls
28-
* should be skipped and indicate this decision via this
29-
* flag.
3026
*/
31-
void prewarm(IndexShard indexShard, boolean skipPrewarming);
27+
void prewarm(IndexShard indexShard);
3228
}

server/src/main/java/org/elasticsearch/search/SearchService.java

Lines changed: 35 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -290,7 +290,7 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv
290290
// the system has a chance to catch up and prewarming doesn't take over the network bandwidth
291291
public static final Setting<Integer> PREWARMING_THRESHOLD_THREADPOOL_SIZE_FACTOR_POOL_SIZE = Setting.intSetting(
292292
"search.online_prewarming_threshold_poolsize_factor",
293-
10,
293+
10, // we will only execute online prewarming if there are less than 10 queued up items/ search thread
294294
0, // 0 would mean we only execute online prewarming if there's no queuing in the search tp
295295
Setting.Property.NodeScope
296296
);
@@ -719,11 +719,9 @@ private <T extends RefCounted> void ensureAfterSeqNoRefreshed(
719719
if (waitForCheckpoint <= UNASSIGNED_SEQ_NO) {
720720
runAsync(executor, executable, listener);
721721
// we successfully submitted the async task to the search pool so let's prewarm the shard
722-
onlinePrewarmingService.prewarm(
723-
shard,
724-
executor instanceof ThreadPoolExecutor tpe
725-
&& ((tpe.getMaximumPoolSize() * prewarmingMaxPoolFactorThreshold) < tpe.getQueue().size())
726-
);
722+
if (isExecutorQueuedBeyondPrewarmingFactor(executor, prewarmingMaxPoolFactorThreshold) == false) {
723+
onlinePrewarmingService.prewarm(shard);
724+
}
727725
return;
728726
}
729727
if (shard.indexSettings().getRefreshInterval().getMillis() <= 0) {
@@ -801,11 +799,9 @@ private void searchReady() {
801799
}
802800
runAsync(executor, executable, listener);
803801
// we successfully submitted the async task to the search pool so let's prewarm the shard
804-
onlinePrewarmingService.prewarm(
805-
shard,
806-
executor instanceof ThreadPoolExecutor tpe
807-
&& ((tpe.getMaximumPoolSize() * prewarmingMaxPoolFactorThreshold) < tpe.getQueue().size())
808-
);
802+
if (isExecutorQueuedBeyondPrewarmingFactor(executor, prewarmingMaxPoolFactorThreshold) == false) {
803+
onlinePrewarmingService.prewarm(shard);
804+
}
809805
}
810806
}
811807
});
@@ -814,6 +810,28 @@ private void searchReady() {
814810
}
815811
}
816812

813+
/**
814+
* Checks if the executor is queued beyond the prewarming factor threshold, relative to the
815+
* number of threads in the pool.
816+
* This is used to determine if we should prewarm the shard - i.e. if the executor doesn't
817+
* contain queued tasks beyond the prewarming factor threshold X max pool size.
818+
*
819+
* @param searchOperationsExecutor the executor that executes the search operations
820+
* @param prewarmingMaxPoolFactorThreshold maximum number of queued up items / thread in the search pool
821+
*/
822+
// visible for testing
823+
static boolean isExecutorQueuedBeyondPrewarmingFactor(Executor searchOperationsExecutor, int prewarmingMaxPoolFactorThreshold) {
824+
if (searchOperationsExecutor instanceof ThreadPoolExecutor tpe) {
825+
return (tpe.getMaximumPoolSize() * prewarmingMaxPoolFactorThreshold) < tpe.getQueue().size();
826+
} else {
827+
logger.trace(
828+
"received executor [{}] that we can't inspect for queueing. allowing online prewarming for all searches",
829+
searchOperationsExecutor
830+
);
831+
return false;
832+
}
833+
}
834+
817835
private IndexShard getShard(ShardSearchRequest request) {
818836
final ShardSearchContextId contextId = request.readerId();
819837
if (contextId != null && sessionId.equals(contextId.getSessionId())) {
@@ -995,11 +1013,9 @@ public void executeQueryPhase(
9951013
}
9961014
}, wrapFailureListener(listener, readerContext, markAsUsed));
9971015
// we successfully submitted the async task to the search pool so let's prewarm the shard
998-
onlinePrewarmingService.prewarm(
999-
readerContext.indexShard(),
1000-
executor instanceof ThreadPoolExecutor tpe
1001-
&& ((tpe.getMaximumPoolSize() * prewarmingMaxPoolFactorThreshold) < tpe.getQueue().size())
1002-
);
1016+
if (isExecutorQueuedBeyondPrewarmingFactor(executor, prewarmingMaxPoolFactorThreshold) == false) {
1017+
onlinePrewarmingService.prewarm(readerContext.indexShard());
1018+
}
10031019
}
10041020

10051021
/**
@@ -1066,11 +1082,9 @@ public void executeQueryPhase(
10661082
}
10671083
}, wrapFailureListener(l, readerContext, markAsUsed));
10681084
// we successfully submitted the async task to the search pool so let's prewarm the shard
1069-
onlinePrewarmingService.prewarm(
1070-
readerContext.indexShard(),
1071-
executor instanceof ThreadPoolExecutor tpe
1072-
&& ((tpe.getMaximumPoolSize() * prewarmingMaxPoolFactorThreshold) < tpe.getQueue().size())
1073-
);
1085+
if (isExecutorQueuedBeyondPrewarmingFactor(executor, prewarmingMaxPoolFactorThreshold) == false) {
1086+
onlinePrewarmingService.prewarm(readerContext.indexShard());
1087+
}
10741088
}));
10751089
}
10761090

server/src/test/java/org/elasticsearch/search/SearchServiceTests.java

Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,16 +53,25 @@
5353
import org.elasticsearch.search.sort.BucketedSort;
5454
import org.elasticsearch.search.sort.MinAndMax;
5555
import org.elasticsearch.search.sort.SortOrder;
56+
import org.elasticsearch.telemetry.metric.MeterRegistry;
5657
import org.elasticsearch.test.MockLog;
58+
import org.elasticsearch.threadpool.DefaultBuiltInExecutorBuilders;
59+
import org.elasticsearch.threadpool.ThreadPool;
5760
import org.elasticsearch.xcontent.XContentParserConfiguration;
5861

5962
import java.io.IOException;
6063
import java.util.Collections;
64+
import java.util.Map;
65+
import java.util.concurrent.CountDownLatch;
66+
import java.util.concurrent.ExecutorService;
67+
import java.util.concurrent.TimeUnit;
6168
import java.util.concurrent.atomic.AtomicBoolean;
6269
import java.util.function.BiFunction;
6370
import java.util.function.Predicate;
6471

6572
import static org.elasticsearch.common.Strings.format;
73+
import static org.elasticsearch.common.util.concurrent.EsExecutors.DIRECT_EXECUTOR_SERVICE;
74+
import static org.elasticsearch.search.SearchService.isExecutorQueuedBeyondPrewarmingFactor;
6675
import static org.elasticsearch.search.SearchService.maybeWrapListenerForStackTrace;
6776
import static org.hamcrest.CoreMatchers.is;
6877
import static org.hamcrest.Matchers.not;
@@ -231,6 +240,83 @@ public void onFailure(Exception e) {
231240
}
232241
}
233242

243+
public void testIsExecutorQueuedBeyondPrewarmingFactor() throws InterruptedException {
244+
{
245+
final String threadPoolName = randomFrom(
246+
ThreadPool.THREAD_POOL_TYPES.entrySet()
247+
.stream()
248+
.filter(t -> t.getValue().equals(ThreadPool.ThreadPoolType.FIXED))
249+
.map(Map.Entry::getKey)
250+
.sorted()
251+
.toList()
252+
);
253+
final int size = 6;
254+
final int queueSize = size * 100;
255+
256+
ThreadPool threadPool = null;
257+
final Settings nodeSettings = Settings.builder()
258+
.put("node.name", "testPrewarmingBasedOnQueuedItems")
259+
.put("thread_pool." + threadPoolName + ".size", size)
260+
.put("thread_pool." + threadPoolName + ".queue_size", queueSize)
261+
.build();
262+
final CountDownLatch blockThreadPoolToQueueItems = new CountDownLatch(1);
263+
264+
try {
265+
threadPool = new ThreadPool(nodeSettings, MeterRegistry.NOOP, new DefaultBuiltInExecutorBuilders());
266+
ExecutorService executor = threadPool.executor(threadPoolName);
267+
268+
// these tasks will consume the thread pool causing further
269+
// submissions to queue
270+
final CountDownLatch occupyAllThreads = new CountDownLatch(size);
271+
for (int i = 0; i < size; i++) {
272+
executor.execute(() -> {
273+
try {
274+
occupyAllThreads.countDown();
275+
blockThreadPoolToQueueItems.await();
276+
} catch (InterruptedException e) {
277+
fail(e.toString());
278+
}
279+
});
280+
}
281+
282+
// wait for all threads to have an active task in their hands
283+
occupyAllThreads.await();
284+
285+
// now on to the fun stuff, let's queue up items - 2 queued items
286+
// for every thread in the pool (plus one more for one thread)
287+
for (int i = 0; i < 13; i++) {
288+
executor.execute(() -> {});
289+
}
290+
291+
// 13 queued up items
292+
assertThat(isExecutorQueuedBeyondPrewarmingFactor(executor, 10), is(false));
293+
assertThat(isExecutorQueuedBeyondPrewarmingFactor(executor, 2), is(true));
294+
295+
// let's get us at the 10 factor (6 threads * 10 + 1= 61 queued up items - at which point we should indicate
296+
// prewarming should not happen)
297+
for (int i = 0; i < 48; i++) {
298+
executor.execute(() -> {});
299+
}
300+
301+
// 61 queued up items
302+
assertThat(isExecutorQueuedBeyondPrewarmingFactor(executor, 10), is(true));
303+
blockThreadPoolToQueueItems.countDown();
304+
} catch (AssertionError e) {
305+
// terminate more gracefully if there's an assertion error above
306+
blockThreadPoolToQueueItems.countDown();
307+
throw e;
308+
} finally {
309+
ThreadPool.terminate(threadPool, 10, TimeUnit.SECONDS);
310+
}
311+
}
312+
313+
{
314+
// executors that are not ThreadPoolExecutor (i.e. no stats available) are always
315+
// allowing prewarming
316+
assertThat(isExecutorQueuedBeyondPrewarmingFactor(DIRECT_EXECUTOR_SERVICE, 2), is(false));
317+
}
318+
}
319+
234320
private void doTestCanMatch(
235321
SearchRequest searchRequest,
236322
SortField sortField,

0 commit comments

Comments
 (0)