Skip to content

Commit 8aee9ef

Browse files
committed
Merge remote-tracking branch 'upstream/main' into es819-merge-optimization
2 parents 76f78d2 + 7e62862 commit 8aee9ef

File tree

3 files changed

+17
-30
lines changed

3 files changed

+17
-30
lines changed

docs/reference/elasticsearch/configuration-reference/thread-pool-settings.md

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -94,11 +94,7 @@ The following are the types of thread pools and their respective parameters:
9494
9595
### `fixed` [fixed-thread-pool]
9696

97-
The `fixed` thread pool holds a fixed size of threads to handle the requests with a queue (optionally bounded) for pending requests that have no threads to service them.
98-
99-
The `size` parameter controls the number of threads.
100-
101-
The `queue_size` allows to control the size of the queue of pending requests that have no threads to execute them. By default, it is set to `-1` which means its unbounded. When a request comes in and the queue is full, it will abort the request.
97+
A `fixed` thread pool holds a fixed number of threads as determined by the `size` parameter. If a task is submitted to a `fixed` thread pool and there are fewer than `size` busy threads in the pool then the task will execute immediately. If all the threads are busy when a task is submitted then it will be held in a queue for later execution. The `queue_size` parameter controls the maximum size of this queue. A `queue_size` of `-1` means that the queue is unbounded, but most `fixed` thread pools specify a bound on their queue size by default. If a bounded queue is full then it will reject further work, which typically causes the corresponding requests to fail.
10298

10399
```yaml
104100
thread_pool:
@@ -114,6 +110,8 @@ The `scaling` thread pool holds a dynamic number of threads. This number is prop
114110

115111
The `keep_alive` parameter determines how long a thread should be kept around in the thread pool without it doing any work.
116112

113+
If a task is submitted to a `scaling` thread pool when its maximum number of threads are already busy with other tasks, the new task will be held in a queue for later execution. The queue in a `scaling` thread pool is always unbounded.
114+
117115
```yaml
118116
thread_pool:
119117
warmer:

muted-tests.yml

Lines changed: 3 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -408,30 +408,12 @@ tests:
408408
- class: org.elasticsearch.cli.keystore.AddStringKeyStoreCommandTests
409409
method: testStdinWithMultipleValues
410410
issue: https://github.com/elastic/elasticsearch/issues/126882
411-
- class: org.elasticsearch.xpack.search.AsyncSearchActionIT
412-
method: testDeleteCancelRunningTask
413-
issue: https://github.com/elastic/elasticsearch/issues/126994
414-
- class: org.elasticsearch.xpack.search.AsyncSearchActionIT
415-
method: testMaxResponseSize
416-
issue: https://github.com/elastic/elasticsearch/issues/126995
417-
- class: org.elasticsearch.xpack.search.AsyncSearchActionIT
418-
method: testRemoveAsyncIndex
419-
issue: https://github.com/elastic/elasticsearch/issues/126975
420-
- class: org.elasticsearch.xpack.search.AsyncSearchActionIT
421-
method: testCleanupOnFailure
422-
issue: https://github.com/elastic/elasticsearch/issues/126999
423-
- class: org.elasticsearch.xpack.search.AsyncSearchActionIT
424-
method: testUpdateStoreKeepAlive
425-
issue: https://github.com/elastic/elasticsearch/issues/127001
426-
- class: org.elasticsearch.xpack.search.AsyncSearchActionIT
427-
method: testRestartAfterCompletion
428-
issue: https://github.com/elastic/elasticsearch/issues/126974
429-
- class: org.elasticsearch.xpack.search.AsyncSearchActionIT
430-
method: testDeleteCleanupIndex
431-
issue: https://github.com/elastic/elasticsearch/issues/127008
432411
- class: org.elasticsearch.packaging.test.DockerTests
433412
method: test024InstallPluginFromArchiveUsingConfigFile
434413
issue: https://github.com/elastic/elasticsearch/issues/126936
414+
- class: org.elasticsearch.repositories.blobstore.testkit.analyze.RepositoryAnalysisFailureIT
415+
method: testFailsOnReadError
416+
issue: https://github.com/elastic/elasticsearch/issues/127029
435417

436418
# Examples:
437419
#

server/src/main/java/org/elasticsearch/action/search/QueryPhaseResultConsumer.java

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
2727
import org.elasticsearch.core.Nullable;
2828
import org.elasticsearch.core.Releasable;
29+
import org.elasticsearch.core.Releasables;
2930
import org.elasticsearch.core.Tuple;
3031
import org.elasticsearch.search.SearchPhaseResult;
3132
import org.elasticsearch.search.SearchService;
@@ -162,7 +163,7 @@ public void consumeResult(SearchPhaseResult result, Runnable next) {
162163
consume(querySearchResult, next);
163164
}
164165

165-
private final List<Tuple<TopDocsStats, MergeResult>> batchedResults = new ArrayList<>();
166+
private final ArrayDeque<Tuple<TopDocsStats, MergeResult>> batchedResults = new ArrayDeque<>();
166167

167168
/**
168169
* Unlinks partial merge results from this instance and returns them as a partial merge result to be sent to the coordinating node.
@@ -214,7 +215,7 @@ public SearchPhaseController.ReducedQueryPhase reduce() throws Exception {
214215
buffer.sort(RESULT_COMPARATOR);
215216
final TopDocsStats topDocsStats = this.topDocsStats;
216217
var mergeResult = this.mergeResult;
217-
final List<Tuple<TopDocsStats, MergeResult>> batchedResults;
218+
final ArrayDeque<Tuple<TopDocsStats, MergeResult>> batchedResults;
218219
synchronized (this.batchedResults) {
219220
batchedResults = this.batchedResults;
220221
}
@@ -226,8 +227,8 @@ public SearchPhaseController.ReducedQueryPhase reduce() throws Exception {
226227
if (mergeResult != null) {
227228
consumePartialMergeResult(mergeResult, topDocsList, aggsList);
228229
}
229-
for (int i = 0; i < batchedResults.size(); i++) {
230-
Tuple<TopDocsStats, MergeResult> batchedResult = batchedResults.set(i, null);
230+
Tuple<TopDocsStats, MergeResult> batchedResult;
231+
while ((batchedResult = batchedResults.poll()) != null) {
231232
topDocsStats.add(batchedResult.v1());
232233
consumePartialMergeResult(batchedResult.v2(), topDocsList, aggsList);
233234
}
@@ -528,6 +529,12 @@ private void releaseBuffer() {
528529
querySearchResult.releaseAggs();
529530
}
530531
}
532+
synchronized (this.batchedResults) {
533+
Tuple<TopDocsStats, MergeResult> batchedResult;
534+
while ((batchedResult = batchedResults.poll()) != null) {
535+
Releasables.close(batchedResult.v2().reducedAggs());
536+
}
537+
}
531538
}
532539

533540
private synchronized void onMergeFailure(Exception exc) {

0 commit comments

Comments
 (0)