Skip to content

Commit c20d2fc

Browse files
javannajpountz
andauthored
[8.x] Limit the number of tasks that a single search can submit (#115932)
Since we removed the search workers thread pool with #111099, we execute many more tasks in the search thread pool, given that each shard search request parallelizes across slices or even segments (knn query rewrite. There are also rare situations where segment level tasks may parallelize further (e.g. createWeight), that cause the creation of many many tasks for a single top-level request. These are rather small tasks that previously queued up in the unbounded search workers queue. With recent improvements in Lucene, these tasks queue up in the search queue, yet they get executed by the caller thread while they are still in the queue, and remain in the queue as no-op until they are pulled out of the queue. We have protection against rejections based on turning off search concurrency when we have more than maxPoolSize items in the queue, yet that is not enough if enough parallel requests see an empty queue and manage to submit enough tasks to fill the queue at once. That will cause rejections for top-level searches that should not be rejected. This commit introduces wrapping for the executor to limit the number of tasks that a single search instance can submit to the executor, to prevent the situation where a single search submits way more tasks than threads available. Co-authored-by: Adrien Grand <[email protected]>
1 parent 37b286d commit c20d2fc

File tree

3 files changed

+209
-4
lines changed

3 files changed

+209
-4
lines changed

docs/changelog/115932.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 115932
2+
summary: "[8.x] Limit the number of tasks that a single search can submit"
3+
area: Search
4+
type: bug
5+
issues: []

server/src/main/java/org/elasticsearch/search/DefaultSearchContext.java

Lines changed: 34 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,7 @@
8787
import java.util.TreeSet;
8888
import java.util.concurrent.Executor;
8989
import java.util.concurrent.ThreadPoolExecutor;
90+
import java.util.concurrent.atomic.AtomicInteger;
9091
import java.util.function.LongSupplier;
9192
import java.util.function.ToLongFunction;
9293

@@ -202,7 +203,7 @@ final class DefaultSearchContext extends SearchContext {
202203
engineSearcher.getQueryCache(),
203204
engineSearcher.getQueryCachingPolicy(),
204205
lowLevelCancellation,
205-
executor,
206+
wrapExecutor(executor),
206207
maximumNumberOfSlices,
207208
minimumDocsPerSlice
208209
);
@@ -229,6 +230,36 @@ final class DefaultSearchContext extends SearchContext {
229230
}
230231
}
231232

233+
private static Executor wrapExecutor(Executor executor) {
234+
if (executor instanceof ThreadPoolExecutor tpe) {
235+
// let this searcher fork to a limited maximum number of tasks, to protect against situations where Lucene may
236+
// submit too many segment level tasks. With enough parallel search requests and segments per shards, they may all see
237+
// an empty queue and start parallelizing, filling up the queue very quickly and causing rejections, due to
238+
// many small tasks in the queue that become no-op because the active caller thread will execute them instead.
239+
// Note that despite all tasks are completed, TaskExecutor#invokeAll leaves the leftover no-op tasks in queue hence
240+
// they contribute to the queue size until they are removed from it.
241+
AtomicInteger segmentLevelTasks = new AtomicInteger(0);
242+
return command -> {
243+
if (segmentLevelTasks.incrementAndGet() > tpe.getMaximumPoolSize()) {
244+
try {
245+
command.run();
246+
} finally {
247+
segmentLevelTasks.decrementAndGet();
248+
}
249+
} else {
250+
executor.execute(() -> {
251+
try {
252+
command.run();
253+
} finally {
254+
segmentLevelTasks.decrementAndGet();
255+
}
256+
});
257+
}
258+
};
259+
}
260+
return executor;
261+
}
262+
232263
static long getFieldCardinality(String field, IndexService indexService, DirectoryReader directoryReader) {
233264
MappedFieldType mappedFieldType = indexService.mapperService().fieldType(field);
234265
if (mappedFieldType == null) {
@@ -290,6 +321,8 @@ static int determineMaximumNumberOfSlices(
290321
boolean enableQueryPhaseParallelCollection,
291322
ToLongFunction<String> fieldCardinality
292323
) {
324+
// Note: although this method refers to parallel collection, it affects any kind of parallelism, including query rewrite,
325+
// given that if 1 is the returned value, no executor is provided to the searcher.
293326
return executor instanceof ThreadPoolExecutor tpe
294327
&& tpe.getQueue().size() <= tpe.getMaximumPoolSize()
295328
&& isParallelCollectionSupportedForResults(resultsType, request.source(), fieldCardinality, enableQueryPhaseParallelCollection)

server/src/test/java/org/elasticsearch/search/DefaultSearchContextTests.java

Lines changed: 170 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
import org.elasticsearch.common.settings.Settings;
3636
import org.elasticsearch.common.util.concurrent.EsExecutors;
3737
import org.elasticsearch.common.util.concurrent.ThreadContext;
38+
import org.elasticsearch.core.SuppressForbidden;
3839
import org.elasticsearch.core.TimeValue;
3940
import org.elasticsearch.index.IndexService;
4041
import org.elasticsearch.index.IndexSettings;
@@ -78,17 +79,33 @@
7879
import org.elasticsearch.xcontent.XContentBuilder;
7980

8081
import java.io.IOException;
82+
import java.io.UncheckedIOException;
83+
import java.util.ArrayList;
84+
import java.util.Collection;
85+
import java.util.List;
8186
import java.util.UUID;
87+
import java.util.concurrent.Callable;
88+
import java.util.concurrent.ExecutionException;
89+
import java.util.concurrent.Executor;
8290
import java.util.concurrent.ExecutorService;
8391
import java.util.concurrent.Executors;
92+
import java.util.concurrent.Future;
93+
import java.util.concurrent.FutureTask;
94+
import java.util.concurrent.LinkedBlockingQueue;
95+
import java.util.concurrent.RunnableFuture;
8496
import java.util.concurrent.ThreadPoolExecutor;
97+
import java.util.concurrent.TimeUnit;
98+
import java.util.concurrent.atomic.AtomicBoolean;
99+
import java.util.concurrent.atomic.AtomicInteger;
85100
import java.util.function.Function;
86101
import java.util.function.Supplier;
87102
import java.util.function.ToLongFunction;
88103

89104
import static org.hamcrest.Matchers.equalTo;
105+
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
90106
import static org.hamcrest.Matchers.instanceOf;
91107
import static org.hamcrest.Matchers.is;
108+
import static org.hamcrest.Matchers.lessThan;
92109
import static org.mockito.ArgumentMatchers.any;
93110
import static org.mockito.ArgumentMatchers.anyString;
94111
import static org.mockito.ArgumentMatchers.eq;
@@ -959,11 +976,161 @@ public void testGetFieldCardinalityRuntimeField() {
959976
assertEquals(-1, DefaultSearchContext.getFieldCardinality("field", indexService, null));
960977
}
961978

979+
public void testSingleThreadNoSearchConcurrency() throws IOException, ExecutionException, InterruptedException {
980+
// with a single thread in the pool the max number of slices will always be 1, hence we won't provide the executor to the searcher
981+
int executorPoolSize = 1;
982+
int numIters = randomIntBetween(10, 50);
983+
int numSegmentTasks = randomIntBetween(50, 100);
984+
AtomicInteger completedTasks = new AtomicInteger(0);
985+
ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(executorPoolSize);
986+
try {
987+
doTestSearchConcurrency(executor, numIters, numSegmentTasks, completedTasks);
988+
} finally {
989+
terminate(executor);
990+
}
991+
// Tasks are still created, but the internal executor is a direct one hence there is no parallelism in practice
992+
assertEquals((long) numIters * numSegmentTasks + numIters, completedTasks.get());
993+
assertEquals(numIters, executor.getCompletedTaskCount());
994+
}
995+
996+
@SuppressForbidden(reason = "need to provide queue to ThreadPoolExecutor")
997+
public void testNoSearchConcurrencyWhenQueueing() throws IOException, ExecutionException, InterruptedException {
998+
// with multiple threads, but constant queueing, the max number of slices will always be 1, hence we won't provide the
999+
// executor to the searcher
1000+
int executorPoolSize = randomIntBetween(2, 5);
1001+
int numIters = randomIntBetween(10, 50);
1002+
int numSegmentTasks = randomIntBetween(50, 100);
1003+
AtomicInteger completedTasks = new AtomicInteger(0);
1004+
final AtomicBoolean terminating = new AtomicBoolean(false);
1005+
LinkedBlockingQueue<Runnable> queue = new LinkedBlockingQueue<>() {
1006+
@Override
1007+
public int size() {
1008+
// for the purpose of this test we pretend that we always have more items in the queue than threads, but we need to revert
1009+
// to normal behaviour to ensure graceful shutdown
1010+
if (terminating.get()) {
1011+
return super.size();
1012+
}
1013+
return randomIntBetween(executorPoolSize + 1, Integer.MAX_VALUE);
1014+
}
1015+
};
1016+
ThreadPoolExecutor executor = new ThreadPoolExecutor(executorPoolSize, executorPoolSize, 0L, TimeUnit.MILLISECONDS, queue);
1017+
try {
1018+
doTestSearchConcurrency(executor, numIters, numSegmentTasks, completedTasks);
1019+
terminating.set(true);
1020+
} finally {
1021+
terminate(executor);
1022+
}
1023+
// Tasks are still created, but the internal executor is a direct one hence there is no parallelism in practice
1024+
assertEquals((long) numIters * numSegmentTasks + numIters, completedTasks.get());
1025+
assertEquals(numIters, executor.getCompletedTaskCount());
1026+
}
1027+
1028+
@SuppressForbidden(reason = "need to provide queue to ThreadPoolExecutor")
1029+
public void testSearchConcurrencyDoesNotCreateMoreTasksThanThreads() throws Exception {
1030+
// with multiple threads, but not enough queueing to disable parallelism, we will provide the executor to the searcher
1031+
int executorPoolSize = randomIntBetween(2, 5);
1032+
int numIters = randomIntBetween(10, 50);
1033+
int numSegmentTasks = randomIntBetween(50, 100);
1034+
AtomicInteger completedTasks = new AtomicInteger(0);
1035+
final AtomicBoolean terminating = new AtomicBoolean(false);
1036+
LinkedBlockingQueue<Runnable> queue = new LinkedBlockingQueue<>() {
1037+
@Override
1038+
public int size() {
1039+
int size = super.size();
1040+
// for the purpose of this test we pretend that we only ever have as many items in the queue as number of threads, but we
1041+
// need to revert to normal behaviour to ensure graceful shutdown
1042+
if (size <= executorPoolSize || terminating.get()) {
1043+
return size;
1044+
}
1045+
return randomIntBetween(0, executorPoolSize);
1046+
}
1047+
};
1048+
ThreadPoolExecutor executor = new ThreadPoolExecutor(executorPoolSize, executorPoolSize, 0L, TimeUnit.MILLISECONDS, queue);
1049+
try {
1050+
doTestSearchConcurrency(executor, numIters, numSegmentTasks, completedTasks);
1051+
terminating.set(true);
1052+
} finally {
1053+
terminate(executor);
1054+
}
1055+
// make sure that we do parallelize execution: each operation will use at minimum as many tasks as threads available
1056+
assertThat(executor.getCompletedTaskCount(), greaterThanOrEqualTo((long) numIters * executorPoolSize));
1057+
// while we parallelize we also limit the number of tasks that each searcher submits
1058+
assertThat(executor.getCompletedTaskCount(), lessThan((long) numIters * numSegmentTasks));
1059+
// *3 is just a wild guess to account for tasks that get executed while we are still submitting
1060+
assertThat(executor.getCompletedTaskCount(), lessThan((long) numIters * executorPoolSize * 3));
1061+
}
1062+
1063+
private void doTestSearchConcurrency(ThreadPoolExecutor executor, int numIters, int numSegmentTasks, AtomicInteger completedTasks)
1064+
throws IOException, ExecutionException, InterruptedException {
1065+
DefaultSearchContext[] contexts = new DefaultSearchContext[numIters];
1066+
for (int i = 0; i < numIters; i++) {
1067+
contexts[i] = createDefaultSearchContext(executor, randomFrom(SearchService.ResultsType.DFS, SearchService.ResultsType.QUERY));
1068+
}
1069+
List<Future<?>> futures = new ArrayList<>(numIters);
1070+
try {
1071+
for (int i = 0; i < numIters; i++) {
1072+
// simulate multiple concurrent search operations that parallelize each their execution across many segment level tasks
1073+
// via Lucene's TaskExecutor. Segment level tasks are never rejected (they execute on the caller upon rejection), but
1074+
// the top-level execute call is subject to rejection once the queue is filled with segment level tasks. That is why
1075+
// we want to limit the number of tasks that each search can parallelize to
1076+
// NOTE: DefaultSearchContext does not provide the executor to the searcher once it sees maxPoolSize items in the queue.
1077+
DefaultSearchContext searchContext = contexts[i];
1078+
AtomicInteger segmentTasksCompleted = new AtomicInteger(0);
1079+
RunnableFuture<Void> task = new FutureTask<>(() -> {
1080+
Collection<Callable<Void>> tasks = new ArrayList<>();
1081+
for (int j = 0; j < numSegmentTasks; j++) {
1082+
tasks.add(() -> {
1083+
segmentTasksCompleted.incrementAndGet();
1084+
completedTasks.incrementAndGet();
1085+
return null;
1086+
});
1087+
}
1088+
try {
1089+
searchContext.searcher().getTaskExecutor().invokeAll(tasks);
1090+
// TODO additional calls to invokeAll
1091+
1092+
// invokeAll is blocking, hence at this point we are done executing all the sub-tasks, but the queue may
1093+
// still be filled up with no-op leftover tasks
1094+
assertEquals(numSegmentTasks, segmentTasksCompleted.get());
1095+
} catch (IOException e) {
1096+
throw new UncheckedIOException(e);
1097+
} finally {
1098+
completedTasks.incrementAndGet();
1099+
}
1100+
return null;
1101+
});
1102+
futures.add(task);
1103+
executor.execute(task);
1104+
}
1105+
for (Future<?> future : futures) {
1106+
future.get();
1107+
}
1108+
} finally {
1109+
for (DefaultSearchContext searchContext : contexts) {
1110+
searchContext.indexShard().getThreadPool().shutdown();
1111+
searchContext.close();
1112+
}
1113+
}
1114+
}
1115+
1116+
private DefaultSearchContext createDefaultSearchContext(Executor executor, SearchService.ResultsType resultsType) throws IOException {
1117+
return createDefaultSearchContext(Settings.EMPTY, null, executor, resultsType);
1118+
}
1119+
9621120
private DefaultSearchContext createDefaultSearchContext(Settings providedIndexSettings) throws IOException {
9631121
return createDefaultSearchContext(providedIndexSettings, null);
9641122
}
9651123

9661124
private DefaultSearchContext createDefaultSearchContext(Settings providedIndexSettings, XContentBuilder mappings) throws IOException {
1125+
return createDefaultSearchContext(providedIndexSettings, mappings, null, randomFrom(SearchService.ResultsType.values()));
1126+
}
1127+
1128+
private DefaultSearchContext createDefaultSearchContext(
1129+
Settings providedIndexSettings,
1130+
XContentBuilder mappings,
1131+
Executor executor,
1132+
SearchService.ResultsType resultsType
1133+
) throws IOException {
9671134
TimeValue timeout = new TimeValue(randomIntBetween(1, 100));
9681135
ShardSearchRequest shardSearchRequest = mock(ShardSearchRequest.class);
9691136
when(shardSearchRequest.searchType()).thenReturn(SearchType.DEFAULT);
@@ -1047,9 +1214,9 @@ protected Engine.Searcher acquireSearcherInternal(String source) {
10471214
timeout,
10481215
null,
10491216
false,
1050-
null,
1051-
randomFrom(SearchService.ResultsType.values()),
1052-
randomBoolean(),
1217+
executor,
1218+
resultsType,
1219+
executor != null || randomBoolean(),
10531220
randomInt()
10541221
);
10551222
}

0 commit comments

Comments
 (0)