Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/

package org.elasticsearch.action.search;

import org.elasticsearch.index.shard.IndexShard;

/**
* Interface for prewarming the segments of a shard, tailored for consumption at
* higher volumes than alternative warming strategies (i.e. offline / recovery warming)
* that are more speculative.
*/
public interface OnlinePrewarmingService {
OnlinePrewarmingService NOOP = indexShard -> {};

/**
* Prewarms resources (typically segments) for the given index shard.
*
* @param indexShard the index shard for which resources should be prewarmed
*/
void prewarm(IndexShard indexShard);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/

package org.elasticsearch.action.search;

import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.threadpool.ThreadPool;

public interface OnlinePrewarmingServiceProvider {
OnlinePrewarmingServiceProvider DEFAULT = (settings, threadPool, clusterService) -> OnlinePrewarmingService.NOOP;

OnlinePrewarmingService create(Settings settings, ThreadPool threadPool, ClusterService clusterService);
}
Original file line number Diff line number Diff line change
Expand Up @@ -485,6 +485,7 @@ public void apply(Settings value, Settings current, Settings previous) {
SearchService.CCS_VERSION_CHECK_SETTING,
SearchService.CCS_COLLECT_TELEMETRY,
SearchService.BATCHED_QUERY_PHASE,
SearchService.PREWARMING_THRESHOLD_THREADPOOL_SIZE_FACTOR_POOL_SIZE,
MultiBucketConsumerService.MAX_BUCKET_SETTING,
SearchService.LOW_LEVEL_CANCELLATION_SETTING,
SearchService.MAX_OPEN_SCROLL_CONTEXT,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@

package org.elasticsearch.node;

import org.elasticsearch.action.search.OnlinePrewarmingService;
import org.elasticsearch.action.search.OnlinePrewarmingServiceProvider;
import org.elasticsearch.client.internal.node.NodeClient;
import org.elasticsearch.cluster.ClusterInfoService;
import org.elasticsearch.cluster.InternalClusterInfoService;
Expand Down Expand Up @@ -123,6 +125,10 @@ SearchService newSearchService(
ExecutorSelector executorSelector,
Tracer tracer
) {
OnlinePrewarmingService onlinePrewarmingService = pluginsService.loadSingletonServiceProvider(
OnlinePrewarmingServiceProvider.class,
() -> OnlinePrewarmingServiceProvider.DEFAULT
).create(clusterService.getSettings(), threadPool, clusterService);
return new SearchService(
clusterService,
indicesService,
Expand All @@ -132,7 +138,8 @@ SearchService newSearchService(
fetchPhase,
circuitBreakerService,
executorSelector,
tracer
tracer,
onlinePrewarmingService
);
}

Expand Down
64 changes: 60 additions & 4 deletions server/src/main/java/org/elasticsearch/search/SearchService.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.elasticsearch.action.ResolvedIndices;
import org.elasticsearch.action.search.CanMatchNodeRequest;
import org.elasticsearch.action.search.CanMatchNodeResponse;
import org.elasticsearch.action.search.OnlinePrewarmingService;
import org.elasticsearch.action.search.SearchShardTask;
import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.action.support.TransportActions;
Expand Down Expand Up @@ -147,6 +148,7 @@
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
Expand Down Expand Up @@ -283,6 +285,16 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv
Property.NodeScope
);

// This setting ensures that we skip online prewarming tasks if the queuing in the search thread pool
// reaches the configured factor X number of max threads in the search thread pool, such that
// the system has a chance to catch up and prewarming doesn't take over the network bandwidth
public static final Setting<Integer> PREWARMING_THRESHOLD_THREADPOOL_SIZE_FACTOR_POOL_SIZE = Setting.intSetting(
"search.online_prewarming_threshold_poolsize_factor",
10, // we will only execute online prewarming if there are less than 10 queued up items/ search thread
0, // 0 would mean we only execute online prewarming if there's no queuing in the search tp
Setting.Property.NodeScope
);

private static final boolean BATCHED_QUERY_PHASE_FEATURE_FLAG = new FeatureFlag("batched_query_phase").isEnabled();

/**
Expand Down Expand Up @@ -317,6 +329,8 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv

private final FetchPhase fetchPhase;
private final CircuitBreaker circuitBreaker;
private final OnlinePrewarmingService onlinePrewarmingService;
private final int prewarmingMaxPoolFactorThreshold;
private volatile Executor searchExecutor;
private volatile boolean enableQueryPhaseParallelCollection;

Expand Down Expand Up @@ -362,7 +376,8 @@ public SearchService(
FetchPhase fetchPhase,
CircuitBreakerService circuitBreakerService,
ExecutorSelector executorSelector,
Tracer tracer
Tracer tracer,
OnlinePrewarmingService onlinePrewarmingService
) {
Settings settings = clusterService.getSettings();
this.threadPool = threadPool;
Expand All @@ -375,7 +390,7 @@ public SearchService(
this.multiBucketConsumerService = new MultiBucketConsumerService(clusterService, settings, circuitBreaker);
this.executorSelector = executorSelector;
this.tracer = tracer;

this.onlinePrewarmingService = onlinePrewarmingService;
TimeValue keepAliveInterval = KEEPALIVE_INTERVAL_SETTING.get(settings);
setKeepAlives(DEFAULT_KEEPALIVE_SETTING.get(settings), MAX_KEEPALIVE_SETTING.get(settings));

Expand Down Expand Up @@ -427,6 +442,7 @@ public SearchService(
memoryAccountingBufferSize = MEMORY_ACCOUNTING_BUFFER_SIZE.get(settings).getBytes();
clusterService.getClusterSettings()
.addSettingsUpdateConsumer(MEMORY_ACCOUNTING_BUFFER_SIZE, newValue -> this.memoryAccountingBufferSize = newValue.getBytes());
prewarmingMaxPoolFactorThreshold = PREWARMING_THRESHOLD_THREADPOOL_SIZE_FACTOR_POOL_SIZE.get(settings);
}

public CircuitBreaker getCircuitBreaker() {
Expand Down Expand Up @@ -702,6 +718,10 @@ private <T extends RefCounted> void ensureAfterSeqNoRefreshed(
try {
if (waitForCheckpoint <= UNASSIGNED_SEQ_NO) {
runAsync(executor, executable, listener);
// we successfully submitted the async task to the search pool so let's prewarm the shard
if (isExecutorQueuedBeyondPrewarmingFactor(executor, prewarmingMaxPoolFactorThreshold) == false) {
onlinePrewarmingService.prewarm(shard);
}
return;
}
if (shard.indexSettings().getRefreshInterval().getMillis() <= 0) {
Expand Down Expand Up @@ -778,6 +798,10 @@ private void searchReady() {
timeoutTask.cancel();
}
runAsync(executor, executable, listener);
// we successfully submitted the async task to the search pool so let's prewarm the shard
if (isExecutorQueuedBeyondPrewarmingFactor(executor, prewarmingMaxPoolFactorThreshold) == false) {
onlinePrewarmingService.prewarm(shard);
}
}
}
});
Expand All @@ -786,6 +810,28 @@ private void searchReady() {
}
}

/**
* Checks if the executor is queued beyond the prewarming factor threshold, relative to the
* number of threads in the pool.
* This is used to determine if we should prewarm the shard - i.e. if the executor doesn't
* contain queued tasks beyond the prewarming factor threshold X max pool size.
*
* @param searchOperationsExecutor the executor that executes the search operations
* @param prewarmingMaxPoolFactorThreshold maximum number of queued up items / thread in the search pool
*/
// visible for testing
static boolean isExecutorQueuedBeyondPrewarmingFactor(Executor searchOperationsExecutor, int prewarmingMaxPoolFactorThreshold) {
if (searchOperationsExecutor instanceof ThreadPoolExecutor tpe) {
return (tpe.getMaximumPoolSize() * prewarmingMaxPoolFactorThreshold) < tpe.getQueue().size();
} else {
logger.trace(
"received executor [{}] that we can't inspect for queueing. allowing online prewarming for all searches",
searchOperationsExecutor
);
return false;
}
}

private IndexShard getShard(ShardSearchRequest request) {
final ShardSearchContextId contextId = request.readerId();
if (contextId != null && sessionId.equals(contextId.getSessionId())) {
Expand Down Expand Up @@ -939,7 +985,8 @@ public void executeQueryPhase(
freeReaderContext(readerContext.id());
throw e;
}
runAsync(getExecutor(readerContext.indexShard()), () -> {
Executor executor = getExecutor(readerContext.indexShard());
runAsync(executor, () -> {
final ShardSearchRequest shardSearchRequest = readerContext.getShardSearchRequest(null);
try (SearchContext searchContext = createContext(readerContext, shardSearchRequest, task, ResultsType.QUERY, false);) {
var opsListener = searchContext.indexShard().getSearchOperationListener();
Expand All @@ -965,6 +1012,10 @@ public void executeQueryPhase(
throw e;
}
}, wrapFailureListener(listener, readerContext, markAsUsed));
// we successfully submitted the async task to the search pool so let's prewarm the shard
if (isExecutorQueuedBeyondPrewarmingFactor(executor, prewarmingMaxPoolFactorThreshold) == false) {
onlinePrewarmingService.prewarm(readerContext.indexShard());
}
}

/**
Expand All @@ -991,7 +1042,8 @@ public void executeQueryPhase(
final Releasable markAsUsed = readerContext.markAsUsed(getKeepAlive(shardSearchRequest));
rewriteAndFetchShardRequest(readerContext.indexShard(), shardSearchRequest, listener.delegateFailure((l, rewritten) -> {
// fork the execution in the search thread pool
runAsync(getExecutor(readerContext.indexShard()), () -> {
Executor executor = getExecutor(readerContext.indexShard());
runAsync(executor, () -> {
readerContext.setAggregatedDfs(request.dfs());
try (SearchContext searchContext = createContext(readerContext, shardSearchRequest, task, ResultsType.QUERY, true);) {
final QuerySearchResult queryResult;
Expand Down Expand Up @@ -1029,6 +1081,10 @@ public void executeQueryPhase(
throw e;
}
}, wrapFailureListener(l, readerContext, markAsUsed));
// we successfully submitted the async task to the search pool so let's prewarm the shard
if (isExecutorQueuedBeyondPrewarmingFactor(executor, prewarmingMaxPoolFactorThreshold) == false) {
onlinePrewarmingService.prewarm(readerContext.indexShard());
}
}));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,16 +53,25 @@
import org.elasticsearch.search.sort.BucketedSort;
import org.elasticsearch.search.sort.MinAndMax;
import org.elasticsearch.search.sort.SortOrder;
import org.elasticsearch.telemetry.metric.MeterRegistry;
import org.elasticsearch.test.MockLog;
import org.elasticsearch.threadpool.DefaultBuiltInExecutorBuilders;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xcontent.XContentParserConfiguration;

import java.io.IOException;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiFunction;
import java.util.function.Predicate;

import static org.elasticsearch.common.Strings.format;
import static org.elasticsearch.common.util.concurrent.EsExecutors.DIRECT_EXECUTOR_SERVICE;
import static org.elasticsearch.search.SearchService.isExecutorQueuedBeyondPrewarmingFactor;
import static org.elasticsearch.search.SearchService.maybeWrapListenerForStackTrace;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.Matchers.not;
Expand Down Expand Up @@ -231,6 +240,83 @@ public void onFailure(Exception e) {
}
}

public void testIsExecutorQueuedBeyondPrewarmingFactor() throws InterruptedException {
{
final String threadPoolName = randomFrom(
ThreadPool.THREAD_POOL_TYPES.entrySet()
.stream()
.filter(t -> t.getValue().equals(ThreadPool.ThreadPoolType.FIXED))
.map(Map.Entry::getKey)
.sorted()
.toList()
);
final int size = 6;
final int queueSize = size * 100;

ThreadPool threadPool = null;
final Settings nodeSettings = Settings.builder()
.put("node.name", "testPrewarmingBasedOnQueuedItems")
.put("thread_pool." + threadPoolName + ".size", size)
.put("thread_pool." + threadPoolName + ".queue_size", queueSize)
.build();
final CountDownLatch blockThreadPoolToQueueItems = new CountDownLatch(1);

try {
threadPool = new ThreadPool(nodeSettings, MeterRegistry.NOOP, new DefaultBuiltInExecutorBuilders());
ExecutorService executor = threadPool.executor(threadPoolName);

// these tasks will consume the thread pool causing further
// submissions to queue
final CountDownLatch occupyAllThreads = new CountDownLatch(size);
for (int i = 0; i < size; i++) {
executor.execute(() -> {
try {
occupyAllThreads.countDown();
blockThreadPoolToQueueItems.await();
} catch (InterruptedException e) {
fail(e.toString());
}
});
}

// wait for all threads to have an active task in their hands
occupyAllThreads.await();

// now on to the fun stuff, let's queue up items - 2 queued items
// for every thread in the pool (plus one more for one thread)
for (int i = 0; i < 13; i++) {
executor.execute(() -> {});
}

// 13 queued up items
assertThat(isExecutorQueuedBeyondPrewarmingFactor(executor, 10), is(false));
assertThat(isExecutorQueuedBeyondPrewarmingFactor(executor, 2), is(true));

// let's get us at the 10 factor (6 threads * 10 + 1= 61 queued up items - at which point we should indicate
// prewarming should not happen)
for (int i = 0; i < 48; i++) {
executor.execute(() -> {});
}

// 61 queued up items
assertThat(isExecutorQueuedBeyondPrewarmingFactor(executor, 10), is(true));
blockThreadPoolToQueueItems.countDown();
} catch (AssertionError e) {
// terminate more gracefully if there's an assertion error above
blockThreadPoolToQueueItems.countDown();
throw e;
} finally {
ThreadPool.terminate(threadPool, 10, TimeUnit.SECONDS);
}
}

{
// executors that are not ThreadPoolExecutor (i.e. no stats available) are always
// allowing prewarming
assertThat(isExecutorQueuedBeyondPrewarmingFactor(DIRECT_EXECUTOR_SERVICE, 2), is(false));
}
}

private void doTestCanMatch(
SearchRequest searchRequest,
SortField sortField,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import org.elasticsearch.action.bulk.TransportShardBulkAction;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.resync.TransportResyncReplicationAction;
import org.elasticsearch.action.search.OnlinePrewarmingService;
import org.elasticsearch.action.search.SearchExecutionStatsCollector;
import org.elasticsearch.action.search.SearchPhaseController;
import org.elasticsearch.action.search.SearchRequest;
Expand Down Expand Up @@ -2314,7 +2315,8 @@ public RecyclerBytesStreamOutput newNetworkBytesStream() {
new FetchPhase(Collections.emptyList()),
new NoneCircuitBreakerService(),
EmptySystemIndices.INSTANCE.getExecutorSelector(),
Tracer.NOOP
Tracer.NOOP,
OnlinePrewarmingService.NOOP
);

final SnapshotFilesProvider snapshotFilesProvider = new SnapshotFilesProvider(repositoriesService);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

package org.elasticsearch.search;

import org.elasticsearch.action.search.OnlinePrewarmingService;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.core.TimeValue;
Expand Down Expand Up @@ -95,7 +96,8 @@ public MockSearchService(
fetchPhase,
circuitBreakerService,
executorSelector,
tracer
tracer,
OnlinePrewarmingService.NOOP
);
}

Expand Down