Skip to content

Commit f70498a

Browse files
committed
Limit the number of tasks that a single search can submit
Since we removed the search workers thread pool with #111099, we execute many more tasks in the search thread pool, given that each shard search request parallelizes across slices or even segments (knn query rewrite. There are also rare situations where segment level tasks may parallelize further (e.g. createWeight), that cause the creation of many many tasks for a single top-level request. These are rather small tasks that previously queued up in the unbounded search workers queue. With recent improvements in Lucene, these tasks queue up in the search queue, yet they get executed by the caller thread while they are still in the queue, and remain in the queue as no-op until they are pulled out of the queue. We have protection against rejections based on turning off search concurrency when we have more than maxPoolSize items in the queue, yet that is not enough if enough parallel requests see an empty queue and manage to submit enough tasks to fill the queue at once. That will cause rejections for top-level searches that should not be rejected. This commit introduces wrapping for the executor to limit the number of tasks that a single search instance can submit to the executor, to prevent the situation where a single search submits way more tasks than threads available.
1 parent 6f7bd55 commit f70498a

File tree

2 files changed

+125
-4
lines changed

2 files changed

+125
-4
lines changed

server/src/main/java/org/elasticsearch/search/DefaultSearchContext.java

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,7 @@
8787
import java.util.TreeSet;
8888
import java.util.concurrent.Executor;
8989
import java.util.concurrent.ThreadPoolExecutor;
90+
import java.util.concurrent.atomic.AtomicInteger;
9091
import java.util.function.LongSupplier;
9192
import java.util.function.ToLongFunction;
9293

@@ -202,7 +203,7 @@ final class DefaultSearchContext extends SearchContext {
202203
engineSearcher.getQueryCache(),
203204
engineSearcher.getQueryCachingPolicy(),
204205
lowLevelCancellation,
205-
executor,
206+
wrapExecutor(executor),
206207
maximumNumberOfSlices,
207208
minimumDocsPerSlice
208209
);
@@ -229,6 +230,30 @@ final class DefaultSearchContext extends SearchContext {
229230
}
230231
}
231232

233+
private static Executor wrapExecutor(Executor executor) {
234+
if (executor instanceof ThreadPoolExecutor tpe) {
235+
// let this searcher fork to a limited maximum number of tasks, to protect against situations where Lucene may
236+
// submit too many segment level tasks. With enough parallel search requests and segments per shards, they may all see
237+
// an empty queue and start parallelizing, filling up the queue very quickly and causing rejections, due to
238+
// many small tasks in the queue that become no-op because the active caller thread will execute them instead.
239+
// Note that despite all tasks are completed, TaskExecutor#invokeAll leaves the leftover no-op tasks in queue hence
240+
// they contribute to the queue size until they are removed from it.
241+
AtomicInteger segmentLevelTasks = new AtomicInteger(0);
242+
return command -> {
243+
if (segmentLevelTasks.get() > tpe.getMaximumPoolSize()) {
244+
command.run();
245+
return;
246+
}
247+
if (segmentLevelTasks.incrementAndGet() > tpe.getMaximumPoolSize()) {
248+
command.run();
249+
} else {
250+
executor.execute(command);
251+
}
252+
};
253+
}
254+
return executor;
255+
}
256+
232257
static long getFieldCardinality(String field, IndexService indexService, DirectoryReader directoryReader) {
233258
MappedFieldType mappedFieldType = indexService.mapperService().fieldType(field);
234259
if (mappedFieldType == null) {

server/src/test/java/org/elasticsearch/search/DefaultSearchContextTests.java

Lines changed: 99 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -78,17 +78,32 @@
7878
import org.elasticsearch.xcontent.XContentBuilder;
7979

8080
import java.io.IOException;
81+
import java.io.UncheckedIOException;
82+
import java.util.ArrayList;
83+
import java.util.Collection;
84+
import java.util.List;
8185
import java.util.UUID;
86+
import java.util.concurrent.Callable;
87+
import java.util.concurrent.ExecutionException;
88+
import java.util.concurrent.Executor;
8289
import java.util.concurrent.ExecutorService;
8390
import java.util.concurrent.Executors;
91+
import java.util.concurrent.Future;
92+
import java.util.concurrent.FutureTask;
93+
import java.util.concurrent.LinkedBlockingQueue;
94+
import java.util.concurrent.RunnableFuture;
8495
import java.util.concurrent.ThreadPoolExecutor;
96+
import java.util.concurrent.TimeUnit;
97+
import java.util.concurrent.atomic.AtomicInteger;
8598
import java.util.function.Function;
8699
import java.util.function.Supplier;
87100
import java.util.function.ToLongFunction;
88101

89102
import static org.hamcrest.Matchers.equalTo;
103+
import static org.hamcrest.Matchers.greaterThan;
90104
import static org.hamcrest.Matchers.instanceOf;
91105
import static org.hamcrest.Matchers.is;
106+
import static org.hamcrest.Matchers.lessThanOrEqualTo;
92107
import static org.mockito.ArgumentMatchers.any;
93108
import static org.mockito.ArgumentMatchers.anyString;
94109
import static org.mockito.ArgumentMatchers.eq;
@@ -959,11 +974,92 @@ public void testGetFieldCardinalityRuntimeField() {
959974
assertEquals(-1, DefaultSearchContext.getFieldCardinality("field", indexService, null));
960975
}
961976

977+
public void testDoNotCreateMoreTasksThanAvailableThreads() throws IOException, ExecutionException, InterruptedException {
978+
int executorPoolSize = randomIntBetween(2, 5);
979+
int numIters = randomIntBetween(10, 50);
980+
int numSegmentTasks = randomIntBetween(50, 100);
981+
AtomicInteger completedTasks = new AtomicInteger(0);
982+
ThreadPoolExecutor executor = new ThreadPoolExecutor(
983+
executorPoolSize,
984+
executorPoolSize,
985+
0L,
986+
TimeUnit.MILLISECONDS,
987+
new LinkedBlockingQueue<>()
988+
);
989+
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());
990+
DefaultSearchContext[] contexts = new DefaultSearchContext[numIters];
991+
List<Future<?>> futures = new ArrayList<>(numIters);
992+
for (int i = 0; i < numIters; i++) {
993+
contexts[i] = createDefaultSearchContext(executor, randomFrom(SearchService.ResultsType.DFS, SearchService.ResultsType.QUERY));
994+
}
995+
try {
996+
for (int i = 0; i < numIters; i++) {
997+
// simulate multiple concurrent search operations that parallelize each their execution across many segment level tasks
998+
// via Lucene's TaskExecutor. Segment level tasks are never rejected (they execute on the caller upon rejection), but
999+
// the top-level execute call is subject to rejection once the queue is filled with segment level tasks. That is why
1000+
// we want to limit the number of tasks that each search can parallelize to
1001+
// NOTE: DefaultSearchContext does not provide the executor to the searcher once it sees maxPoolSize items in the queue.
1002+
DefaultSearchContext searchContext = contexts[i];
1003+
AtomicInteger segmentTasksCompleted = new AtomicInteger(0);
1004+
RunnableFuture<Void> task = new FutureTask<>(() -> {
1005+
Collection<Callable<Void>> tasks = new ArrayList<>();
1006+
for (int j = 0; j < numSegmentTasks; j++) {
1007+
tasks.add(() -> {
1008+
segmentTasksCompleted.incrementAndGet();
1009+
completedTasks.incrementAndGet();
1010+
return null;
1011+
});
1012+
}
1013+
try {
1014+
searchContext.searcher().getTaskExecutor().invokeAll(tasks);
1015+
// invokeAll is blocking, hence at this point we are done executing all the sub-tasks, but the queue may
1016+
// still be filled up with no-op leftover tasks
1017+
assertEquals(numSegmentTasks, segmentTasksCompleted.get());
1018+
} catch (IOException e) {
1019+
throw new UncheckedIOException(e);
1020+
} finally {
1021+
completedTasks.incrementAndGet();
1022+
}
1023+
return null;
1024+
});
1025+
futures.add(task);
1026+
executor.execute(task);
1027+
}
1028+
for (Future<?> future : futures) {
1029+
future.get();
1030+
}
1031+
assertEquals((long) numIters * numSegmentTasks + numIters, completedTasks.get());
1032+
} finally {
1033+
terminate(executor);
1034+
for (DefaultSearchContext searchContext : contexts) {
1035+
searchContext.indexShard().getThreadPool().shutdown();
1036+
searchContext.close();
1037+
}
1038+
}
1039+
// make sure that we do parallelize execution across segments
1040+
assertThat(executor.getCompletedTaskCount(), greaterThan((long) numIters));
1041+
// while not creating too many segment level tasks
1042+
assertThat(executor.getCompletedTaskCount(), lessThanOrEqualTo((long) numIters * executorPoolSize + numIters));
1043+
}
1044+
1045+
private DefaultSearchContext createDefaultSearchContext(Executor executor, SearchService.ResultsType resultsType) throws IOException {
1046+
return createDefaultSearchContext(Settings.EMPTY, null, executor, resultsType);
1047+
}
1048+
9621049
private DefaultSearchContext createDefaultSearchContext(Settings providedIndexSettings) throws IOException {
9631050
return createDefaultSearchContext(providedIndexSettings, null);
9641051
}
9651052

9661053
private DefaultSearchContext createDefaultSearchContext(Settings providedIndexSettings, XContentBuilder mappings) throws IOException {
1054+
return createDefaultSearchContext(providedIndexSettings, mappings, null, randomFrom(SearchService.ResultsType.values()));
1055+
}
1056+
1057+
private DefaultSearchContext createDefaultSearchContext(
1058+
Settings providedIndexSettings,
1059+
XContentBuilder mappings,
1060+
Executor executor,
1061+
SearchService.ResultsType resultsType
1062+
) throws IOException {
9671063
TimeValue timeout = new TimeValue(randomIntBetween(1, 100));
9681064
ShardSearchRequest shardSearchRequest = mock(ShardSearchRequest.class);
9691065
when(shardSearchRequest.searchType()).thenReturn(SearchType.DEFAULT);
@@ -1047,9 +1143,9 @@ protected Engine.Searcher acquireSearcherInternal(String source) {
10471143
timeout,
10481144
null,
10491145
false,
1050-
null,
1051-
randomFrom(SearchService.ResultsType.values()),
1052-
randomBoolean(),
1146+
executor,
1147+
resultsType,
1148+
executor != null || randomBoolean(),
10531149
randomInt()
10541150
);
10551151
}

0 commit comments

Comments
 (0)