Skip to content

Commit fa09255

Browse files
authored
Online prewarming service interface docs and usage in SearchService (#126561)
This adds the interface for search online prewarming with a default NOOP implementation. This also hooks the interface in the SearchService after we fork the query phase to the search thread pool.
1 parent 800cf72 commit fa09255

File tree

8 files changed

+209
-7
lines changed

8 files changed

+209
-7
lines changed
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
package org.elasticsearch.action.search;
11+
12+
import org.elasticsearch.index.shard.IndexShard;
13+
14+
/**
15+
* Interface for prewarming the segments of a shard, tailored for consumption at
16+
* higher volumes than alternative warming strategies (i.e. offline / recovery warming)
17+
* that are more speculative.
18+
*/
19+
public interface OnlinePrewarmingService {
20+
OnlinePrewarmingService NOOP = indexShard -> {};
21+
22+
/**
23+
* Prewarms resources (typically segments) for the given index shard.
24+
*
25+
* @param indexShard the index shard for which resources should be prewarmed
26+
*/
27+
void prewarm(IndexShard indexShard);
28+
}
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
package org.elasticsearch.action.search;
11+
12+
import org.elasticsearch.cluster.service.ClusterService;
13+
import org.elasticsearch.common.settings.Settings;
14+
import org.elasticsearch.threadpool.ThreadPool;
15+
16+
public interface OnlinePrewarmingServiceProvider {
17+
OnlinePrewarmingServiceProvider DEFAULT = (settings, threadPool, clusterService) -> OnlinePrewarmingService.NOOP;
18+
19+
OnlinePrewarmingService create(Settings settings, ThreadPool threadPool, ClusterService clusterService);
20+
}

server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -485,6 +485,7 @@ public void apply(Settings value, Settings current, Settings previous) {
485485
SearchService.CCS_VERSION_CHECK_SETTING,
486486
SearchService.CCS_COLLECT_TELEMETRY,
487487
SearchService.BATCHED_QUERY_PHASE,
488+
SearchService.PREWARMING_THRESHOLD_THREADPOOL_SIZE_FACTOR_POOL_SIZE,
488489
MultiBucketConsumerService.MAX_BUCKET_SETTING,
489490
SearchService.LOW_LEVEL_CANCELLATION_SETTING,
490491
SearchService.MAX_OPEN_SCROLL_CONTEXT,

server/src/main/java/org/elasticsearch/node/NodeServiceProvider.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,8 @@
99

1010
package org.elasticsearch.node;
1111

12+
import org.elasticsearch.action.search.OnlinePrewarmingService;
13+
import org.elasticsearch.action.search.OnlinePrewarmingServiceProvider;
1214
import org.elasticsearch.client.internal.node.NodeClient;
1315
import org.elasticsearch.cluster.ClusterInfoService;
1416
import org.elasticsearch.cluster.InternalClusterInfoService;
@@ -123,6 +125,10 @@ SearchService newSearchService(
123125
ExecutorSelector executorSelector,
124126
Tracer tracer
125127
) {
128+
OnlinePrewarmingService onlinePrewarmingService = pluginsService.loadSingletonServiceProvider(
129+
OnlinePrewarmingServiceProvider.class,
130+
() -> OnlinePrewarmingServiceProvider.DEFAULT
131+
).create(clusterService.getSettings(), threadPool, clusterService);
126132
return new SearchService(
127133
clusterService,
128134
indicesService,
@@ -132,7 +138,8 @@ SearchService newSearchService(
132138
fetchPhase,
133139
circuitBreakerService,
134140
executorSelector,
135-
tracer
141+
tracer,
142+
onlinePrewarmingService
136143
);
137144
}
138145

server/src/main/java/org/elasticsearch/search/SearchService.java

Lines changed: 60 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import org.elasticsearch.action.ResolvedIndices;
2525
import org.elasticsearch.action.search.CanMatchNodeRequest;
2626
import org.elasticsearch.action.search.CanMatchNodeResponse;
27+
import org.elasticsearch.action.search.OnlinePrewarmingService;
2728
import org.elasticsearch.action.search.SearchShardTask;
2829
import org.elasticsearch.action.search.SearchType;
2930
import org.elasticsearch.action.support.TransportActions;
@@ -147,6 +148,7 @@
147148
import java.util.Set;
148149
import java.util.concurrent.ExecutionException;
149150
import java.util.concurrent.Executor;
151+
import java.util.concurrent.ThreadPoolExecutor;
150152
import java.util.concurrent.TimeoutException;
151153
import java.util.concurrent.atomic.AtomicBoolean;
152154
import java.util.concurrent.atomic.AtomicInteger;
@@ -283,6 +285,16 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv
283285
Property.NodeScope
284286
);
285287

288+
// This setting ensures that we skip online prewarming tasks if the queuing in the search thread pool
289+
// reaches the configured factor X number of max threads in the search thread pool, such that
290+
// the system has a chance to catch up and prewarming doesn't take over the network bandwidth
291+
public static final Setting<Integer> PREWARMING_THRESHOLD_THREADPOOL_SIZE_FACTOR_POOL_SIZE = Setting.intSetting(
292+
"search.online_prewarming_threshold_poolsize_factor",
293+
10, // we will only execute online prewarming if there are less than 10 queued up items/ search thread
294+
0, // 0 would mean we only execute online prewarming if there's no queuing in the search tp
295+
Setting.Property.NodeScope
296+
);
297+
286298
private static final boolean BATCHED_QUERY_PHASE_FEATURE_FLAG = new FeatureFlag("batched_query_phase").isEnabled();
287299

288300
/**
@@ -317,6 +329,8 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv
317329

318330
private final FetchPhase fetchPhase;
319331
private final CircuitBreaker circuitBreaker;
332+
private final OnlinePrewarmingService onlinePrewarmingService;
333+
private final int prewarmingMaxPoolFactorThreshold;
320334
private volatile Executor searchExecutor;
321335
private volatile boolean enableQueryPhaseParallelCollection;
322336

@@ -362,7 +376,8 @@ public SearchService(
362376
FetchPhase fetchPhase,
363377
CircuitBreakerService circuitBreakerService,
364378
ExecutorSelector executorSelector,
365-
Tracer tracer
379+
Tracer tracer,
380+
OnlinePrewarmingService onlinePrewarmingService
366381
) {
367382
Settings settings = clusterService.getSettings();
368383
this.threadPool = threadPool;
@@ -375,7 +390,7 @@ public SearchService(
375390
this.multiBucketConsumerService = new MultiBucketConsumerService(clusterService, settings, circuitBreaker);
376391
this.executorSelector = executorSelector;
377392
this.tracer = tracer;
378-
393+
this.onlinePrewarmingService = onlinePrewarmingService;
379394
TimeValue keepAliveInterval = KEEPALIVE_INTERVAL_SETTING.get(settings);
380395
setKeepAlives(DEFAULT_KEEPALIVE_SETTING.get(settings), MAX_KEEPALIVE_SETTING.get(settings));
381396

@@ -427,6 +442,7 @@ public SearchService(
427442
memoryAccountingBufferSize = MEMORY_ACCOUNTING_BUFFER_SIZE.get(settings).getBytes();
428443
clusterService.getClusterSettings()
429444
.addSettingsUpdateConsumer(MEMORY_ACCOUNTING_BUFFER_SIZE, newValue -> this.memoryAccountingBufferSize = newValue.getBytes());
445+
prewarmingMaxPoolFactorThreshold = PREWARMING_THRESHOLD_THREADPOOL_SIZE_FACTOR_POOL_SIZE.get(settings);
430446
}
431447

432448
public CircuitBreaker getCircuitBreaker() {
@@ -702,6 +718,10 @@ private <T extends RefCounted> void ensureAfterSeqNoRefreshed(
702718
try {
703719
if (waitForCheckpoint <= UNASSIGNED_SEQ_NO) {
704720
runAsync(executor, executable, listener);
721+
// we successfully submitted the async task to the search pool so let's prewarm the shard
722+
if (isExecutorQueuedBeyondPrewarmingFactor(executor, prewarmingMaxPoolFactorThreshold) == false) {
723+
onlinePrewarmingService.prewarm(shard);
724+
}
705725
return;
706726
}
707727
if (shard.indexSettings().getRefreshInterval().getMillis() <= 0) {
@@ -778,6 +798,10 @@ private void searchReady() {
778798
timeoutTask.cancel();
779799
}
780800
runAsync(executor, executable, listener);
801+
// we successfully submitted the async task to the search pool so let's prewarm the shard
802+
if (isExecutorQueuedBeyondPrewarmingFactor(executor, prewarmingMaxPoolFactorThreshold) == false) {
803+
onlinePrewarmingService.prewarm(shard);
804+
}
781805
}
782806
}
783807
});
@@ -786,6 +810,28 @@ private void searchReady() {
786810
}
787811
}
788812

813+
/**
814+
* Checks if the executor is queued beyond the prewarming factor threshold, relative to the
815+
* number of threads in the pool.
816+
* This is used to determine if we should prewarm the shard - i.e. if the executor doesn't
817+
* contain queued tasks beyond the prewarming factor threshold X max pool size.
818+
*
819+
* @param searchOperationsExecutor the executor that executes the search operations
820+
* @param prewarmingMaxPoolFactorThreshold maximum number of queued up items / thread in the search pool
821+
*/
822+
// visible for testing
823+
static boolean isExecutorQueuedBeyondPrewarmingFactor(Executor searchOperationsExecutor, int prewarmingMaxPoolFactorThreshold) {
824+
if (searchOperationsExecutor instanceof ThreadPoolExecutor tpe) {
825+
return (tpe.getMaximumPoolSize() * prewarmingMaxPoolFactorThreshold) < tpe.getQueue().size();
826+
} else {
827+
logger.trace(
828+
"received executor [{}] that we can't inspect for queueing. allowing online prewarming for all searches",
829+
searchOperationsExecutor
830+
);
831+
return false;
832+
}
833+
}
834+
789835
private IndexShard getShard(ShardSearchRequest request) {
790836
final ShardSearchContextId contextId = request.readerId();
791837
if (contextId != null && sessionId.equals(contextId.getSessionId())) {
@@ -939,7 +985,8 @@ public void executeQueryPhase(
939985
freeReaderContext(readerContext.id());
940986
throw e;
941987
}
942-
runAsync(getExecutor(readerContext.indexShard()), () -> {
988+
Executor executor = getExecutor(readerContext.indexShard());
989+
runAsync(executor, () -> {
943990
final ShardSearchRequest shardSearchRequest = readerContext.getShardSearchRequest(null);
944991
try (SearchContext searchContext = createContext(readerContext, shardSearchRequest, task, ResultsType.QUERY, false);) {
945992
var opsListener = searchContext.indexShard().getSearchOperationListener();
@@ -965,6 +1012,10 @@ public void executeQueryPhase(
9651012
throw e;
9661013
}
9671014
}, wrapFailureListener(listener, readerContext, markAsUsed));
1015+
// we successfully submitted the async task to the search pool so let's prewarm the shard
1016+
if (isExecutorQueuedBeyondPrewarmingFactor(executor, prewarmingMaxPoolFactorThreshold) == false) {
1017+
onlinePrewarmingService.prewarm(readerContext.indexShard());
1018+
}
9681019
}
9691020

9701021
/**
@@ -991,7 +1042,8 @@ public void executeQueryPhase(
9911042
final Releasable markAsUsed = readerContext.markAsUsed(getKeepAlive(shardSearchRequest));
9921043
rewriteAndFetchShardRequest(readerContext.indexShard(), shardSearchRequest, listener.delegateFailure((l, rewritten) -> {
9931044
// fork the execution in the search thread pool
994-
runAsync(getExecutor(readerContext.indexShard()), () -> {
1045+
Executor executor = getExecutor(readerContext.indexShard());
1046+
runAsync(executor, () -> {
9951047
readerContext.setAggregatedDfs(request.dfs());
9961048
try (SearchContext searchContext = createContext(readerContext, shardSearchRequest, task, ResultsType.QUERY, true);) {
9971049
final QuerySearchResult queryResult;
@@ -1029,6 +1081,10 @@ public void executeQueryPhase(
10291081
throw e;
10301082
}
10311083
}, wrapFailureListener(l, readerContext, markAsUsed));
1084+
// we successfully submitted the async task to the search pool so let's prewarm the shard
1085+
if (isExecutorQueuedBeyondPrewarmingFactor(executor, prewarmingMaxPoolFactorThreshold) == false) {
1086+
onlinePrewarmingService.prewarm(readerContext.indexShard());
1087+
}
10321088
}));
10331089
}
10341090

server/src/test/java/org/elasticsearch/search/SearchServiceTests.java

Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,16 +53,25 @@
5353
import org.elasticsearch.search.sort.BucketedSort;
5454
import org.elasticsearch.search.sort.MinAndMax;
5555
import org.elasticsearch.search.sort.SortOrder;
56+
import org.elasticsearch.telemetry.metric.MeterRegistry;
5657
import org.elasticsearch.test.MockLog;
58+
import org.elasticsearch.threadpool.DefaultBuiltInExecutorBuilders;
59+
import org.elasticsearch.threadpool.ThreadPool;
5760
import org.elasticsearch.xcontent.XContentParserConfiguration;
5861

5962
import java.io.IOException;
6063
import java.util.Collections;
64+
import java.util.Map;
65+
import java.util.concurrent.CountDownLatch;
66+
import java.util.concurrent.ExecutorService;
67+
import java.util.concurrent.TimeUnit;
6168
import java.util.concurrent.atomic.AtomicBoolean;
6269
import java.util.function.BiFunction;
6370
import java.util.function.Predicate;
6471

6572
import static org.elasticsearch.common.Strings.format;
73+
import static org.elasticsearch.common.util.concurrent.EsExecutors.DIRECT_EXECUTOR_SERVICE;
74+
import static org.elasticsearch.search.SearchService.isExecutorQueuedBeyondPrewarmingFactor;
6675
import static org.elasticsearch.search.SearchService.maybeWrapListenerForStackTrace;
6776
import static org.hamcrest.CoreMatchers.is;
6877
import static org.hamcrest.Matchers.not;
@@ -231,6 +240,83 @@ public void onFailure(Exception e) {
231240
}
232241
}
233242

243+
public void testIsExecutorQueuedBeyondPrewarmingFactor() throws InterruptedException {
244+
{
245+
final String threadPoolName = randomFrom(
246+
ThreadPool.THREAD_POOL_TYPES.entrySet()
247+
.stream()
248+
.filter(t -> t.getValue().equals(ThreadPool.ThreadPoolType.FIXED))
249+
.map(Map.Entry::getKey)
250+
.sorted()
251+
.toList()
252+
);
253+
final int size = 6;
254+
final int queueSize = size * 100;
255+
256+
ThreadPool threadPool = null;
257+
final Settings nodeSettings = Settings.builder()
258+
.put("node.name", "testPrewarmingBasedOnQueuedItems")
259+
.put("thread_pool." + threadPoolName + ".size", size)
260+
.put("thread_pool." + threadPoolName + ".queue_size", queueSize)
261+
.build();
262+
final CountDownLatch blockThreadPoolToQueueItems = new CountDownLatch(1);
263+
264+
try {
265+
threadPool = new ThreadPool(nodeSettings, MeterRegistry.NOOP, new DefaultBuiltInExecutorBuilders());
266+
ExecutorService executor = threadPool.executor(threadPoolName);
267+
268+
// these tasks will consume the thread pool causing further
269+
// submissions to queue
270+
final CountDownLatch occupyAllThreads = new CountDownLatch(size);
271+
for (int i = 0; i < size; i++) {
272+
executor.execute(() -> {
273+
try {
274+
occupyAllThreads.countDown();
275+
blockThreadPoolToQueueItems.await();
276+
} catch (InterruptedException e) {
277+
fail(e.toString());
278+
}
279+
});
280+
}
281+
282+
// wait for all threads to have an active task in their hands
283+
occupyAllThreads.await();
284+
285+
// now on to the fun stuff, let's queue up items - 2 queued items
286+
// for every thread in the pool (plus one more for one thread)
287+
for (int i = 0; i < 13; i++) {
288+
executor.execute(() -> {});
289+
}
290+
291+
// 13 queued up items
292+
assertThat(isExecutorQueuedBeyondPrewarmingFactor(executor, 10), is(false));
293+
assertThat(isExecutorQueuedBeyondPrewarmingFactor(executor, 2), is(true));
294+
295+
// let's get us at the 10 factor (6 threads * 10 + 1= 61 queued up items - at which point we should indicate
296+
// prewarming should not happen)
297+
for (int i = 0; i < 48; i++) {
298+
executor.execute(() -> {});
299+
}
300+
301+
// 61 queued up items
302+
assertThat(isExecutorQueuedBeyondPrewarmingFactor(executor, 10), is(true));
303+
blockThreadPoolToQueueItems.countDown();
304+
} catch (AssertionError e) {
305+
// terminate more gracefully if there's an assertion error above
306+
blockThreadPoolToQueueItems.countDown();
307+
throw e;
308+
} finally {
309+
ThreadPool.terminate(threadPool, 10, TimeUnit.SECONDS);
310+
}
311+
}
312+
313+
{
314+
// executors that are not ThreadPoolExecutor (i.e. no stats available) are always
315+
// allowing prewarming
316+
assertThat(isExecutorQueuedBeyondPrewarmingFactor(DIRECT_EXECUTOR_SERVICE, 2), is(false));
317+
}
318+
}
319+
234320
private void doTestCanMatch(
235321
SearchRequest searchRequest,
236322
SortField sortField,

server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@
4949
import org.elasticsearch.action.bulk.TransportShardBulkAction;
5050
import org.elasticsearch.action.index.IndexRequest;
5151
import org.elasticsearch.action.resync.TransportResyncReplicationAction;
52+
import org.elasticsearch.action.search.OnlinePrewarmingService;
5253
import org.elasticsearch.action.search.SearchExecutionStatsCollector;
5354
import org.elasticsearch.action.search.SearchPhaseController;
5455
import org.elasticsearch.action.search.SearchRequest;
@@ -2314,7 +2315,8 @@ public RecyclerBytesStreamOutput newNetworkBytesStream() {
23142315
new FetchPhase(Collections.emptyList()),
23152316
new NoneCircuitBreakerService(),
23162317
EmptySystemIndices.INSTANCE.getExecutorSelector(),
2317-
Tracer.NOOP
2318+
Tracer.NOOP,
2319+
OnlinePrewarmingService.NOOP
23182320
);
23192321

23202322
final SnapshotFilesProvider snapshotFilesProvider = new SnapshotFilesProvider(repositoriesService);

test/framework/src/main/java/org/elasticsearch/search/MockSearchService.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99

1010
package org.elasticsearch.search;
1111

12+
import org.elasticsearch.action.search.OnlinePrewarmingService;
1213
import org.elasticsearch.cluster.service.ClusterService;
1314
import org.elasticsearch.common.util.BigArrays;
1415
import org.elasticsearch.core.TimeValue;
@@ -95,7 +96,8 @@ public MockSearchService(
9596
fetchPhase,
9697
circuitBreakerService,
9798
executorSelector,
98-
tracer
99+
tracer,
100+
OnlinePrewarmingService.NOOP
99101
);
100102
}
101103

0 commit comments

Comments
 (0)