Skip to content

Commit 710063c

Browse files
committed
iter
1 parent c20a03a commit 710063c

File tree

2 files changed

+97
-16
lines changed

2 files changed

+97
-16
lines changed

server/src/main/java/org/elasticsearch/search/DefaultSearchContext.java

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -240,14 +240,16 @@ private static Executor wrapExecutor(Executor executor) {
240240
// they contribute to the queue size until they are removed from it.
241241
AtomicInteger segmentLevelTasks = new AtomicInteger(0);
242242
return command -> {
243-
if (segmentLevelTasks.get() > tpe.getMaximumPoolSize()) {
244-
command.run();
245-
return;
246-
}
247243
if (segmentLevelTasks.incrementAndGet() > tpe.getMaximumPoolSize()) {
248244
command.run();
249245
} else {
250-
executor.execute(command);
246+
executor.execute(() -> {
247+
try {
248+
command.run();
249+
} finally {
250+
segmentLevelTasks.decrementAndGet();
251+
}
252+
});
251253
}
252254
};
253255
}
@@ -315,6 +317,8 @@ static int determineMaximumNumberOfSlices(
315317
boolean enableQueryPhaseParallelCollection,
316318
ToLongFunction<String> fieldCardinality
317319
) {
320+
// Note: although this method refers to parallel collection, it affects any kind of parallelism, including query rewrite,
321+
// given that if 1 is the returned value, no executor is provided to the searcher.
318322
return executor instanceof ThreadPoolExecutor tpe
319323
&& tpe.getQueue().size() <= tpe.getMaximumPoolSize()
320324
&& isParallelCollectionSupportedForResults(resultsType, request.source(), fieldCardinality, enableQueryPhaseParallelCollection)

server/src/test/java/org/elasticsearch/search/DefaultSearchContextTests.java

Lines changed: 88 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -94,16 +94,17 @@
9494
import java.util.concurrent.RunnableFuture;
9595
import java.util.concurrent.ThreadPoolExecutor;
9696
import java.util.concurrent.TimeUnit;
97+
import java.util.concurrent.atomic.AtomicBoolean;
9798
import java.util.concurrent.atomic.AtomicInteger;
9899
import java.util.function.Function;
99100
import java.util.function.Supplier;
100101
import java.util.function.ToLongFunction;
101102

102103
import static org.hamcrest.Matchers.equalTo;
103-
import static org.hamcrest.Matchers.greaterThan;
104+
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
104105
import static org.hamcrest.Matchers.instanceOf;
105106
import static org.hamcrest.Matchers.is;
106-
import static org.hamcrest.Matchers.lessThanOrEqualTo;
107+
import static org.hamcrest.Matchers.lessThan;
107108
import static org.mockito.ArgumentMatchers.any;
108109
import static org.mockito.ArgumentMatchers.anyString;
109110
import static org.mockito.ArgumentMatchers.eq;
@@ -974,8 +975,9 @@ public void testGetFieldCardinalityRuntimeField() {
974975
assertEquals(-1, DefaultSearchContext.getFieldCardinality("field", indexService, null));
975976
}
976977

977-
public void testDoNotCreateMoreTasksThanAvailableThreads() throws IOException, ExecutionException, InterruptedException {
978-
int executorPoolSize = randomIntBetween(2, 5);
978+
public void testSingleThreadNoSearchConcurrency() throws IOException, ExecutionException, InterruptedException {
979+
// with a single thread in the pool the max number of slices will always be 1, hence we won't provide the executor to the searcher
980+
int executorPoolSize = 1;
979981
int numIters = randomIntBetween(10, 50);
980982
int numSegmentTasks = randomIntBetween(50, 100);
981983
AtomicInteger completedTasks = new AtomicInteger(0);
@@ -987,11 +989,90 @@ public void testDoNotCreateMoreTasksThanAvailableThreads() throws IOException, E
987989
new LinkedBlockingQueue<>()
988990
);
989991
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());
992+
try {
993+
doTestSearchConcurrency(executor, numIters, numSegmentTasks, completedTasks);
994+
} finally {
995+
terminate(executor);
996+
}
997+
// Tasks are still created, but the internal executor is a direct one hence there is no parallelism in practice
998+
assertEquals((long) numIters * numSegmentTasks + numIters, completedTasks.get());
999+
assertEquals(numIters, executor.getCompletedTaskCount());
1000+
}
1001+
1002+
public void testNoSearchConcurrencyWhenQueueing() throws IOException, ExecutionException, InterruptedException {
1003+
// with multiple threads, but constant queueing, the max number of slices will always be 1, hence we won't provide the
1004+
// executor to the searcher
1005+
int executorPoolSize = randomIntBetween(2, 5);
1006+
int numIters = randomIntBetween(10, 50);
1007+
int numSegmentTasks = randomIntBetween(50, 100);
1008+
AtomicInteger completedTasks = new AtomicInteger(0);
1009+
final AtomicBoolean terminating = new AtomicBoolean(false);
1010+
LinkedBlockingQueue<Runnable> queue = new LinkedBlockingQueue<>() {
1011+
@Override
1012+
public int size() {
1013+
// for the purpose of this test we pretend that we always have more items in the queue than threads, but we need to revert
1014+
// to normal behaviour to ensure graceful shutdown
1015+
if (terminating.get()) {
1016+
return super.size();
1017+
}
1018+
return randomIntBetween(executorPoolSize + 1, Integer.MAX_VALUE);
1019+
}
1020+
};
1021+
ThreadPoolExecutor executor = new ThreadPoolExecutor(executorPoolSize, executorPoolSize, 0L, TimeUnit.MILLISECONDS, queue);
1022+
try {
1023+
doTestSearchConcurrency(executor, numIters, numSegmentTasks, completedTasks);
1024+
terminating.set(true);
1025+
} finally {
1026+
terminate(executor);
1027+
}
1028+
// Tasks are still created, but the internal executor is a direct one hence there is no parallelism in practice
1029+
assertEquals((long) numIters * numSegmentTasks + numIters, completedTasks.get());
1030+
assertEquals(numIters, executor.getCompletedTaskCount());
1031+
}
1032+
1033+
public void testSearchConcurrencyDoesNotCreateMoreTasksThanThreads() throws Exception {
1034+
// with multiple threads, but not enough queueing to disable parallelism, we will provide the executor to the searcher
1035+
int executorPoolSize = randomIntBetween(2, 5);
1036+
int numIters = randomIntBetween(10, 50);
1037+
int numSegmentTasks = randomIntBetween(50, 100);
1038+
AtomicInteger completedTasks = new AtomicInteger(0);
1039+
final AtomicBoolean terminating = new AtomicBoolean(false);
1040+
LinkedBlockingQueue<Runnable> queue = new LinkedBlockingQueue<>() {
1041+
@Override
1042+
public int size() {
1043+
int size = super.size();
1044+
// for the purpose of this test we pretend that we only ever have as many items in the queue as number of threads, but we
1045+
// need to revert to normal behaviour to ensure graceful shutdown
1046+
if (size <= executorPoolSize || terminating.get()) {
1047+
return size;
1048+
}
1049+
return randomIntBetween(0, executorPoolSize);
1050+
}
1051+
};
1052+
ThreadPoolExecutor executor = new ThreadPoolExecutor(executorPoolSize, executorPoolSize, 0L, TimeUnit.MILLISECONDS, queue);
1053+
1054+
try {
1055+
doTestSearchConcurrency(executor, numIters, numSegmentTasks, completedTasks);
1056+
terminating.set(true);
1057+
} finally {
1058+
terminate(executor);
1059+
}
1060+
1061+
// make sure that we do parallelize execution: each operation will use at minimum as many tasks as threads available
1062+
assertThat(executor.getCompletedTaskCount(), greaterThanOrEqualTo((long) numIters * executorPoolSize));
1063+
// while we parallelize we also limit the number of tasks that each searcher submits
1064+
assertThat(executor.getCompletedTaskCount(), lessThan((long) numIters * numSegmentTasks));
1065+
// *2 is just a wild guess to account for tasks that get executed while we are still submitting
1066+
assertThat(executor.getCompletedTaskCount(), lessThan((long) numIters * executorPoolSize * 2));
1067+
}
1068+
1069+
private void doTestSearchConcurrency(ThreadPoolExecutor executor, int numIters, int numSegmentTasks, AtomicInteger completedTasks)
1070+
throws IOException, ExecutionException, InterruptedException {
9901071
DefaultSearchContext[] contexts = new DefaultSearchContext[numIters];
991-
List<Future<?>> futures = new ArrayList<>(numIters);
9921072
for (int i = 0; i < numIters; i++) {
9931073
contexts[i] = createDefaultSearchContext(executor, randomFrom(SearchService.ResultsType.DFS, SearchService.ResultsType.QUERY));
9941074
}
1075+
List<Future<?>> futures = new ArrayList<>(numIters);
9951076
try {
9961077
for (int i = 0; i < numIters; i++) {
9971078
// simulate multiple concurrent search operations that parallelize each their execution across many segment level tasks
@@ -1012,6 +1093,8 @@ public void testDoNotCreateMoreTasksThanAvailableThreads() throws IOException, E
10121093
}
10131094
try {
10141095
searchContext.searcher().getTaskExecutor().invokeAll(tasks);
1096+
// TODO additional calls to invokeAll
1097+
10151098
// invokeAll is blocking, hence at this point we are done executing all the sub-tasks, but the queue may
10161099
// still be filled up with no-op leftover tasks
10171100
assertEquals(numSegmentTasks, segmentTasksCompleted.get());
@@ -1028,18 +1111,12 @@ public void testDoNotCreateMoreTasksThanAvailableThreads() throws IOException, E
10281111
for (Future<?> future : futures) {
10291112
future.get();
10301113
}
1031-
assertEquals((long) numIters * numSegmentTasks + numIters, completedTasks.get());
10321114
} finally {
1033-
terminate(executor);
10341115
for (DefaultSearchContext searchContext : contexts) {
10351116
searchContext.indexShard().getThreadPool().shutdown();
10361117
searchContext.close();
10371118
}
10381119
}
1039-
// make sure that we do parallelize execution across segments
1040-
assertThat(executor.getCompletedTaskCount(), greaterThan((long) numIters));
1041-
// while not creating too many segment level tasks
1042-
assertThat(executor.getCompletedTaskCount(), lessThanOrEqualTo((long) numIters * executorPoolSize + numIters));
10431120
}
10441121

10451122
private DefaultSearchContext createDefaultSearchContext(Executor executor, SearchService.ResultsType resultsType) throws IOException {

0 commit comments

Comments
 (0)