Skip to content

Commit 8bc5a86

Browse files
committed
Reduce ES|QL driver parallelism
1 parent c0c0827 commit 8bc5a86

File tree

2 files changed

+44
-2
lines changed

2 files changed

+44
-2
lines changed

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/QueryPragmas.java

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
import org.elasticsearch.compute.operator.Driver;
1919
import org.elasticsearch.compute.operator.DriverStatus;
2020
import org.elasticsearch.core.TimeValue;
21-
import org.elasticsearch.threadpool.ThreadPool;
2221
import org.elasticsearch.xpack.esql.core.expression.Expression;
2322

2423
import java.io.IOException;
@@ -34,9 +33,19 @@ public final class QueryPragmas implements Writeable {
3433

3534
private static final Setting<Integer> TASK_CONCURRENCY = Setting.intSetting(
3635
"task_concurrency",
37-
ThreadPool.searchOrGetThreadPoolSize(EsExecutors.allocatedProcessors(Settings.EMPTY))
36+
allocatedProcessorsMinusOneMaxTen(EsExecutors.allocatedProcessors(Settings.EMPTY))
3837
);
3938

39+
/**
40+
* Returns the number of processors to be used in driver parallelism, starting with Lucene operators (and reading values).
41+
* One processor is reserved for node-level reduction.
42+
* The concurrency is capped at 10 as drivers can contain both I/O-bound and CPU-bound tasks. This avoids blocking the thread pool
43+
* due to waiting for I/O and reduces the resource usage by many pipelines, minimizing the cost for reduction.
44+
*/
45+
static int allocatedProcessorsMinusOneMaxTen(final int allocatedProcessors) {
46+
return Math.min(Math.max(allocatedProcessors - 1, 2), 10);
47+
}
48+
4049
public static final Setting<DataPartitioning> DATA_PARTITIONING = Setting.enumSetting(
4150
DataPartitioning.class,
4251
"data_partitioning",
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
8+
package org.elasticsearch.xpack.esql.plugin;
9+
10+
import org.elasticsearch.test.ESTestCase;
11+
12+
import static org.hamcrest.Matchers.equalTo;
13+
14+
public class QueryPragmasTests extends ESTestCase {
15+
16+
public void testTaskConcurrency() {
17+
assertThat(QueryPragmas.allocatedProcessorsMinusOneMaxTen(1), equalTo(2));
18+
assertThat(QueryPragmas.allocatedProcessorsMinusOneMaxTen(2), equalTo(2));
19+
assertThat(QueryPragmas.allocatedProcessorsMinusOneMaxTen(3), equalTo(2));
20+
assertThat(QueryPragmas.allocatedProcessorsMinusOneMaxTen(4), equalTo(3));
21+
assertThat(QueryPragmas.allocatedProcessorsMinusOneMaxTen(5), equalTo(4));
22+
assertThat(QueryPragmas.allocatedProcessorsMinusOneMaxTen(6), equalTo(5));
23+
assertThat(QueryPragmas.allocatedProcessorsMinusOneMaxTen(7), equalTo(6));
24+
assertThat(QueryPragmas.allocatedProcessorsMinusOneMaxTen(8), equalTo(7));
25+
assertThat(QueryPragmas.allocatedProcessorsMinusOneMaxTen(9), equalTo(8));
26+
assertThat(QueryPragmas.allocatedProcessorsMinusOneMaxTen(10), equalTo(9));
27+
assertThat(QueryPragmas.allocatedProcessorsMinusOneMaxTen(11), equalTo(10));
28+
for (int i = 12; i < 64; i++) {
29+
assertThat(QueryPragmas.allocatedProcessorsMinusOneMaxTen(i), equalTo(10));
30+
}
31+
assertThat(QueryPragmas.allocatedProcessorsMinusOneMaxTen(between(12, 4096)), equalTo(10));
32+
}
33+
}

0 commit comments

Comments
 (0)