Skip to content

Commit 4be9122

Browse files
authored
Enhance empty queue conditional in slicing logic (#114911) (#114940)
With recent changes in Lucene 9.12 around not forking execution when not necessary (see apache/lucene#13472), we have removed the search worker thread pool in #111099. The worker thread pool had unlimited queue, and we feared that we couuld have much more queueing on the search thread pool if we execute segment level searches on the same thread pool as the shard level searches, because every shard search would take up to a thread per slice when executing the query phase. We have then introduced an additional conditional to stop parallelizing when there is a queue. That is perhaps a bit extreme, as it's a decision made when creating the searcher, while a queue may no longer be there once the search is executing. This has caused some benchmarks regressions, given that having a queue may be a transient scenario, especially with short-lived segment searches being queued up. We may end up disabling inter-segment concurrency more aggressively than we would want, penalizing requests that do benefit from concurrency. At the same time, we do want to have some kind of protection against rejections of shard searches that would be caused by excessive slicing. When the queue is above a certain size, we can turn off the slicing and effectively disable inter-segment concurrency. With this commit we set that threshold to be the number of threads in the search pool.
1 parent e4f7eea commit 4be9122

File tree

2 files changed

+148
-63
lines changed

2 files changed

+148
-63
lines changed

server/src/main/java/org/elasticsearch/search/DefaultSearchContext.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -291,7 +291,7 @@ static int determineMaximumNumberOfSlices(
291291
ToLongFunction<String> fieldCardinality
292292
) {
293293
return executor instanceof ThreadPoolExecutor tpe
294-
&& tpe.getQueue().isEmpty()
294+
&& tpe.getQueue().size() <= tpe.getMaximumPoolSize()
295295
&& isParallelCollectionSupportedForResults(resultsType, request.source(), fieldCardinality, enableQueryPhaseParallelCollection)
296296
? tpe.getMaximumPoolSize()
297297
: 1;

server/src/test/java/org/elasticsearch/search/DefaultSearchContextTests.java

Lines changed: 147 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,7 @@
8181
import java.util.UUID;
8282
import java.util.concurrent.ExecutorService;
8383
import java.util.concurrent.Executors;
84+
import java.util.concurrent.ThreadPoolExecutor;
8485
import java.util.function.Function;
8586
import java.util.function.Supplier;
8687
import java.util.function.ToLongFunction;
@@ -507,10 +508,10 @@ public void testNewIdLoaderWithTsdbAndRoutingPathMatch() throws Exception {
507508
}
508509
}
509510

510-
public void testDetermineMaximumNumberOfSlices() {
511+
private static ShardSearchRequest createParallelRequest() {
511512
IndexShard indexShard = mock(IndexShard.class);
512513
when(indexShard.shardId()).thenReturn(new ShardId("index", "uuid", 0));
513-
ShardSearchRequest parallelReq = new ShardSearchRequest(
514+
return new ShardSearchRequest(
514515
OriginalIndices.NONE,
515516
new SearchRequest().allowPartialSearchResults(randomBoolean()),
516517
indexShard.shardId(),
@@ -521,69 +522,74 @@ public void testDetermineMaximumNumberOfSlices() {
521522
System.currentTimeMillis(),
522523
null
523524
);
524-
ShardSearchRequest singleSliceReq = new ShardSearchRequest(
525-
OriginalIndices.NONE,
526-
new SearchRequest().allowPartialSearchResults(randomBoolean())
527-
.source(new SearchSourceBuilder().sort(SortBuilders.fieldSort(FieldSortBuilder.DOC_FIELD_NAME))),
528-
indexShard.shardId(),
529-
0,
530-
1,
531-
AliasFilter.EMPTY,
532-
1f,
533-
System.currentTimeMillis(),
534-
null
535-
);
536-
int executorPoolSize = randomIntBetween(1, 100);
537-
ExecutorService threadPoolExecutor = EsExecutors.newFixed(
538-
"test",
539-
executorPoolSize,
540-
0,
541-
Thread::new,
542-
new ThreadContext(Settings.EMPTY),
543-
EsExecutors.TaskTrackingConfig.DO_NOT_TRACK
544-
);
545-
ExecutorService notThreadPoolExecutor = Executors.newWorkStealingPool();
546-
ToLongFunction<String> fieldCardinality = name -> -1;
547525

526+
}
527+
528+
public void testDetermineMaximumNumberOfSlicesNoExecutor() {
529+
ToLongFunction<String> fieldCardinality = name -> { throw new UnsupportedOperationException(); };
548530
assertEquals(
549-
executorPoolSize,
531+
1,
550532
DefaultSearchContext.determineMaximumNumberOfSlices(
551-
threadPoolExecutor,
552-
parallelReq,
533+
null,
534+
createParallelRequest(),
553535
SearchService.ResultsType.DFS,
554-
true,
536+
randomBoolean(),
555537
fieldCardinality
556538
)
557539
);
558540
assertEquals(
559-
executorPoolSize,
541+
1,
560542
DefaultSearchContext.determineMaximumNumberOfSlices(
561-
threadPoolExecutor,
562-
singleSliceReq,
563-
SearchService.ResultsType.DFS,
564-
true,
543+
null,
544+
createParallelRequest(),
545+
SearchService.ResultsType.QUERY,
546+
randomBoolean(),
565547
fieldCardinality
566548
)
567549
);
550+
}
551+
552+
public void testDetermineMaximumNumberOfSlicesNotThreadPoolExecutor() {
553+
ExecutorService notThreadPoolExecutor = Executors.newWorkStealingPool();
554+
ToLongFunction<String> fieldCardinality = name -> { throw new UnsupportedOperationException(); };
568555
assertEquals(
569556
1,
570-
DefaultSearchContext.determineMaximumNumberOfSlices(null, parallelReq, SearchService.ResultsType.DFS, true, fieldCardinality)
557+
DefaultSearchContext.determineMaximumNumberOfSlices(
558+
notThreadPoolExecutor,
559+
createParallelRequest(),
560+
SearchService.ResultsType.DFS,
561+
randomBoolean(),
562+
fieldCardinality
563+
)
571564
);
572565
assertEquals(
573-
executorPoolSize,
566+
1,
574567
DefaultSearchContext.determineMaximumNumberOfSlices(
575-
threadPoolExecutor,
576-
parallelReq,
568+
notThreadPoolExecutor,
569+
createParallelRequest(),
577570
SearchService.ResultsType.QUERY,
578-
true,
571+
randomBoolean(),
579572
fieldCardinality
580573
)
581574
);
575+
}
576+
577+
public void testDetermineMaximumNumberOfSlicesEnableQueryPhaseParallelCollection() {
578+
int executorPoolSize = randomIntBetween(1, 100);
579+
ThreadPoolExecutor threadPoolExecutor = EsExecutors.newFixed(
580+
"test",
581+
executorPoolSize,
582+
0,
583+
Thread::new,
584+
new ThreadContext(Settings.EMPTY),
585+
EsExecutors.TaskTrackingConfig.DO_NOT_TRACK
586+
);
587+
ToLongFunction<String> fieldCardinality = name -> -1;
582588
assertEquals(
583-
1,
589+
executorPoolSize,
584590
DefaultSearchContext.determineMaximumNumberOfSlices(
585591
threadPoolExecutor,
586-
singleSliceReq,
592+
createParallelRequest(),
587593
SearchService.ResultsType.QUERY,
588594
true,
589595
fieldCardinality
@@ -592,54 +598,133 @@ public void testDetermineMaximumNumberOfSlices() {
592598
assertEquals(
593599
1,
594600
DefaultSearchContext.determineMaximumNumberOfSlices(
595-
notThreadPoolExecutor,
596-
parallelReq,
597-
SearchService.ResultsType.DFS,
598-
true,
601+
threadPoolExecutor,
602+
createParallelRequest(),
603+
SearchService.ResultsType.QUERY,
604+
false,
599605
fieldCardinality
600606
)
601607
);
602-
603608
assertEquals(
604609
executorPoolSize,
605610
DefaultSearchContext.determineMaximumNumberOfSlices(
606611
threadPoolExecutor,
607-
parallelReq,
612+
createParallelRequest(),
608613
SearchService.ResultsType.DFS,
609-
false,
614+
randomBoolean(),
610615
fieldCardinality
611616
)
612617
);
613-
assertEquals(
618+
}
619+
620+
public void testDetermineMaximumNumberOfSlicesSingleSortByField() {
621+
IndexShard indexShard = mock(IndexShard.class);
622+
when(indexShard.shardId()).thenReturn(new ShardId("index", "uuid", 0));
623+
ShardSearchRequest singleSliceReq = new ShardSearchRequest(
624+
OriginalIndices.NONE,
625+
new SearchRequest().allowPartialSearchResults(randomBoolean())
626+
.source(new SearchSourceBuilder().sort(SortBuilders.fieldSort(FieldSortBuilder.DOC_FIELD_NAME))),
627+
indexShard.shardId(),
628+
0,
614629
1,
615-
DefaultSearchContext.determineMaximumNumberOfSlices(null, parallelReq, SearchService.ResultsType.DFS, false, fieldCardinality)
630+
AliasFilter.EMPTY,
631+
1f,
632+
System.currentTimeMillis(),
633+
null
616634
);
635+
ToLongFunction<String> fieldCardinality = name -> { throw new UnsupportedOperationException(); };
636+
int executorPoolSize = randomIntBetween(1, 100);
637+
ThreadPoolExecutor threadPoolExecutor = EsExecutors.newFixed(
638+
"test",
639+
executorPoolSize,
640+
0,
641+
Thread::new,
642+
new ThreadContext(Settings.EMPTY),
643+
EsExecutors.TaskTrackingConfig.DO_NOT_TRACK
644+
);
645+
// DFS concurrency does not rely on slices, hence it kicks in regardless of the request (supportsParallelCollection is not called)
617646
assertEquals(
618-
1,
647+
executorPoolSize,
619648
DefaultSearchContext.determineMaximumNumberOfSlices(
620649
threadPoolExecutor,
621-
parallelReq,
622-
SearchService.ResultsType.QUERY,
623-
false,
650+
singleSliceReq,
651+
SearchService.ResultsType.DFS,
652+
true,
624653
fieldCardinality
625654
)
626655
);
627-
assertEquals(
628-
1,
629-
DefaultSearchContext.determineMaximumNumberOfSlices(null, parallelReq, SearchService.ResultsType.QUERY, false, fieldCardinality)
630-
);
631656
assertEquals(
632657
1,
633658
DefaultSearchContext.determineMaximumNumberOfSlices(
634-
notThreadPoolExecutor,
635-
parallelReq,
636-
SearchService.ResultsType.DFS,
637-
false,
659+
threadPoolExecutor,
660+
singleSliceReq,
661+
SearchService.ResultsType.QUERY,
662+
true,
638663
fieldCardinality
639664
)
640665
);
641666
}
642667

668+
public void testDetermineMaximumNumberOfSlicesWithQueue() {
669+
int executorPoolSize = randomIntBetween(1, 100);
670+
ThreadPoolExecutor threadPoolExecutor = EsExecutors.newFixed(
671+
"test",
672+
executorPoolSize,
673+
1000,
674+
Thread::new,
675+
new ThreadContext(Settings.EMPTY),
676+
EsExecutors.TaskTrackingConfig.DO_NOT_TRACK
677+
);
678+
ToLongFunction<String> fieldCardinality = name -> { throw new UnsupportedOperationException(); };
679+
680+
for (int i = 0; i < executorPoolSize; i++) {
681+
assertTrue(threadPoolExecutor.getQueue().offer(() -> {}));
682+
assertEquals(
683+
executorPoolSize,
684+
DefaultSearchContext.determineMaximumNumberOfSlices(
685+
threadPoolExecutor,
686+
createParallelRequest(),
687+
SearchService.ResultsType.DFS,
688+
true,
689+
fieldCardinality
690+
)
691+
);
692+
assertEquals(
693+
executorPoolSize,
694+
DefaultSearchContext.determineMaximumNumberOfSlices(
695+
threadPoolExecutor,
696+
createParallelRequest(),
697+
SearchService.ResultsType.QUERY,
698+
true,
699+
fieldCardinality
700+
)
701+
);
702+
}
703+
for (int i = 0; i < 100; i++) {
704+
assertTrue(threadPoolExecutor.getQueue().offer(() -> {}));
705+
assertEquals(
706+
1,
707+
DefaultSearchContext.determineMaximumNumberOfSlices(
708+
threadPoolExecutor,
709+
createParallelRequest(),
710+
SearchService.ResultsType.DFS,
711+
true,
712+
fieldCardinality
713+
)
714+
);
715+
assertEquals(
716+
1,
717+
DefaultSearchContext.determineMaximumNumberOfSlices(
718+
threadPoolExecutor,
719+
createParallelRequest(),
720+
SearchService.ResultsType.QUERY,
721+
true,
722+
fieldCardinality
723+
)
724+
);
725+
}
726+
}
727+
643728
public void testIsParallelCollectionSupportedForResults() {
644729
SearchSourceBuilder searchSourceBuilderOrNull = randomBoolean() ? null : new SearchSourceBuilder();
645730
ToLongFunction<String> fieldCardinality = name -> -1;

0 commit comments

Comments
 (0)