2424import org .elasticsearch .action .ResolvedIndices ;
2525import org .elasticsearch .action .search .CanMatchNodeRequest ;
2626import org .elasticsearch .action .search .CanMatchNodeResponse ;
27+ import org .elasticsearch .action .search .OnlinePrewarmingService ;
2728import org .elasticsearch .action .search .SearchShardTask ;
2829import org .elasticsearch .action .search .SearchType ;
2930import org .elasticsearch .action .support .TransportActions ;
147148import java .util .Set ;
148149import java .util .concurrent .ExecutionException ;
149150import java .util .concurrent .Executor ;
151+ import java .util .concurrent .ThreadPoolExecutor ;
150152import java .util .concurrent .TimeoutException ;
151153import java .util .concurrent .atomic .AtomicBoolean ;
152154import java .util .concurrent .atomic .AtomicInteger ;
@@ -283,6 +285,16 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv
283285 Property .NodeScope
284286 );
285287
288+ // This setting ensures that we skip online prewarming tasks if the queuing in the search thread pool
289+ // reaches the configured factor X number of max threads in the search thread pool, such that
290+ // the system has a chance to catch up and prewarming doesn't take over the network bandwidth
291+ public static final Setting <Integer > PREWARMING_THRESHOLD_THREADPOOL_SIZE_FACTOR_POOL_SIZE = Setting .intSetting (
292+ "search.online_prewarming_threshold_poolsize_factor" ,
293+ 10 ,
294+ 0 , // 0 would mean we only execute online prewarming if there's no queuing in the search tp
295+ Setting .Property .NodeScope
296+ );
297+
286298 private static final boolean BATCHED_QUERY_PHASE_FEATURE_FLAG = new FeatureFlag ("batched_query_phase" ).isEnabled ();
287299
288300 /**
@@ -317,6 +329,8 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv
317329
318330 private final FetchPhase fetchPhase ;
319331 private final CircuitBreaker circuitBreaker ;
332+ private final OnlinePrewarmingService onlinePrewarmingService ;
333+ private final int prewarmingMaxPoolFactorThreshold ;
320334 private volatile Executor searchExecutor ;
321335 private volatile boolean enableQueryPhaseParallelCollection ;
322336
@@ -362,7 +376,8 @@ public SearchService(
362376 FetchPhase fetchPhase ,
363377 CircuitBreakerService circuitBreakerService ,
364378 ExecutorSelector executorSelector ,
365- Tracer tracer
379+ Tracer tracer ,
380+ OnlinePrewarmingService onlinePrewarmingService
366381 ) {
367382 Settings settings = clusterService .getSettings ();
368383 this .threadPool = threadPool ;
@@ -375,7 +390,7 @@ public SearchService(
375390 this .multiBucketConsumerService = new MultiBucketConsumerService (clusterService , settings , circuitBreaker );
376391 this .executorSelector = executorSelector ;
377392 this .tracer = tracer ;
378-
393+ this . onlinePrewarmingService = onlinePrewarmingService ;
379394 TimeValue keepAliveInterval = KEEPALIVE_INTERVAL_SETTING .get (settings );
380395 setKeepAlives (DEFAULT_KEEPALIVE_SETTING .get (settings ), MAX_KEEPALIVE_SETTING .get (settings ));
381396
@@ -427,6 +442,7 @@ public SearchService(
427442 memoryAccountingBufferSize = MEMORY_ACCOUNTING_BUFFER_SIZE .get (settings ).getBytes ();
428443 clusterService .getClusterSettings ()
429444 .addSettingsUpdateConsumer (MEMORY_ACCOUNTING_BUFFER_SIZE , newValue -> this .memoryAccountingBufferSize = newValue .getBytes ());
445+ prewarmingMaxPoolFactorThreshold = PREWARMING_THRESHOLD_THREADPOOL_SIZE_FACTOR_POOL_SIZE .get (settings );
430446 }
431447
432448 public CircuitBreaker getCircuitBreaker () {
@@ -702,6 +718,12 @@ private <T extends RefCounted> void ensureAfterSeqNoRefreshed(
702718 try {
703719 if (waitForCheckpoint <= UNASSIGNED_SEQ_NO ) {
704720 runAsync (executor , executable , listener );
721+ // we successfully submitted the async task to the search pool so let's prewarm the shard
722+ onlinePrewarmingService .prewarm (
723+ shard ,
724+ executor instanceof ThreadPoolExecutor tpe
725+ && ((tpe .getMaximumPoolSize () * prewarmingMaxPoolFactorThreshold ) < tpe .getQueue ().size ())
726+ );
705727 return ;
706728 }
707729 if (shard .indexSettings ().getRefreshInterval ().getMillis () <= 0 ) {
@@ -778,6 +800,12 @@ private void searchReady() {
778800 timeoutTask .cancel ();
779801 }
780802 runAsync (executor , executable , listener );
803+ // we successfully submitted the async task to the search pool so let's prewarm the shard
804+ onlinePrewarmingService .prewarm (
805+ shard ,
806+ executor instanceof ThreadPoolExecutor tpe
807+ && ((tpe .getMaximumPoolSize () * prewarmingMaxPoolFactorThreshold ) < tpe .getQueue ().size ())
808+ );
781809 }
782810 }
783811 });
@@ -939,7 +967,8 @@ public void executeQueryPhase(
939967 freeReaderContext (readerContext .id ());
940968 throw e ;
941969 }
942- runAsync (getExecutor (readerContext .indexShard ()), () -> {
970+ Executor executor = getExecutor (readerContext .indexShard ());
971+ runAsync (executor , () -> {
943972 final ShardSearchRequest shardSearchRequest = readerContext .getShardSearchRequest (null );
944973 try (SearchContext searchContext = createContext (readerContext , shardSearchRequest , task , ResultsType .QUERY , false );) {
945974 var opsListener = searchContext .indexShard ().getSearchOperationListener ();
@@ -965,6 +994,12 @@ public void executeQueryPhase(
965994 throw e ;
966995 }
967996 }, wrapFailureListener (listener , readerContext , markAsUsed ));
997+ // we successfully submitted the async task to the search pool so let's prewarm the shard
998+ onlinePrewarmingService .prewarm (
999+ readerContext .indexShard (),
1000+ executor instanceof ThreadPoolExecutor tpe
1001+ && ((tpe .getMaximumPoolSize () * prewarmingMaxPoolFactorThreshold ) < tpe .getQueue ().size ())
1002+ );
9681003 }
9691004
9701005 /**
@@ -991,7 +1026,8 @@ public void executeQueryPhase(
9911026 final Releasable markAsUsed = readerContext .markAsUsed (getKeepAlive (shardSearchRequest ));
9921027 rewriteAndFetchShardRequest (readerContext .indexShard (), shardSearchRequest , listener .delegateFailure ((l , rewritten ) -> {
9931028 // fork the execution in the search thread pool
994- runAsync (getExecutor (readerContext .indexShard ()), () -> {
1029+ Executor executor = getExecutor (readerContext .indexShard ());
1030+ runAsync (executor , () -> {
9951031 readerContext .setAggregatedDfs (request .dfs ());
9961032 try (SearchContext searchContext = createContext (readerContext , shardSearchRequest , task , ResultsType .QUERY , true );) {
9971033 final QuerySearchResult queryResult ;
@@ -1029,6 +1065,12 @@ public void executeQueryPhase(
10291065 throw e ;
10301066 }
10311067 }, wrapFailureListener (l , readerContext , markAsUsed ));
1068+ // we successfully submitted the async task to the search pool so let's prewarm the shard
1069+ onlinePrewarmingService .prewarm (
1070+ readerContext .indexShard (),
1071+ executor instanceof ThreadPoolExecutor tpe
1072+ && ((tpe .getMaximumPoolSize () * prewarmingMaxPoolFactorThreshold ) < tpe .getQueue ().size ())
1073+ );
10321074 }));
10331075 }
10341076
0 commit comments