Skip to content

Commit 5689dfa

Browse files
authored
ESQL: Heuristics to pick efficient partitioning (#125739)
Adds heuristics to pick an efficient partitioning strategy based on the index and rewritten query. This speeds up some queries by throwing more cores at the problem: ``` FROM test | STATS SUM(b) Before: took: 31 CPU: 222.3% After: took: 15 CPU: 806.9% ``` It also lowers the overhead of simpler queries by throwing less cores at the problem when it won't really speed anything up: ``` FROM test Before: took: 1 CPU: 48.5% After: took: 1 CPU: 70.4% ``` We have had a `pragma` to control our data partitioning for a long time, this just looks at the query to pick a partitioning scheme. The partitioning options: * `shard`: use one core per shard * `segment`: use one core per large segment * `doc`: break each shard into as many segments as there are cores `doc` is the fastest, but has a lot of overhead, especially for complex Lucene queries. `segment` is fast, but doesn't make the most out of CPUs when there are few segments. `shard` has the lowest overhead. Previously we always used `segment` partitioning because it doesn't have the terrible overhead but is fast. With this change we use `doc` when the top level query matches all documents - those have very very low overhead even in the `doc` partitioning. That's the twice as fast example above. This also uses the `shard` partitioning for queries that don't have to do much work like `FROM foo` or `FROM foo | LIMIT 1` or `FROM foo | SORT a`. That's the lower CPU example above. This forking choice is taken very late on the data node. So queries like this: ``` FROM test | WHERE @timestamp > "2025-01-01T00:00:00Z" | STATS SUM(b) ``` can also use the `doc` partitioning when all documents are after the timestamp and all documents have `b`.
1 parent b96a2f6 commit 5689dfa

File tree

25 files changed

+921
-136
lines changed

25 files changed

+921
-136
lines changed

docs/changelog/125739.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 125739
2+
summary: Heuristics to pick efficient partitioning
3+
area: ES|QL
4+
type: enhancement
5+
issues: []

server/src/main/java/org/elasticsearch/TransportVersions.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -218,6 +218,7 @@ static TransportVersion def(int id) {
218218
public static final TransportVersion SEMANTIC_TEXT_CHUNKING_CONFIG = def(9_047_00_0);
219219
public static final TransportVersion REPO_ANALYSIS_COPY_BLOB = def(9_048_00_0);
220220
public static final TransportVersion AMAZON_BEDROCK_TASK_SETTINGS = def(9_049_00_0);
221+
public static final TransportVersion ESQL_REPORT_SHARD_PARTITIONING = def(9_050_00_0);
221222

222223
/*
223224
* STOP! READ THIS FIRST! No, really,

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/DataPartitioning.java

Lines changed: 31 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,11 +7,39 @@
77

88
package org.elasticsearch.compute.lucene;
99

10-
public enum DataPartitioning {
10+
import org.elasticsearch.compute.operator.Driver;
1111

12+
/**
13+
* How we partition the data across {@link Driver}s. Each request forks into
14+
* {@code min(1.5 * cpus, partition_count)} threads on the data node. More partitions
15+
* allow us to bring more threads to bear on CPU intensive data node side tasks.
16+
*/
17+
public enum DataPartitioning {
18+
/**
19+
* Automatically select the data partitioning based on the query and index.
20+
* Usually that's {@link #SEGMENT}, but for small indices it's {@link #SHARD}.
21+
* When the additional overhead from {@link #DOC} is fairly low then it'll
22+
* pick {@link #DOC}.
23+
*/
24+
AUTO,
25+
/**
26+
* Make one partition per shard. This is generally the slowest option, but it
27+
* has the lowest CPU overhead.
28+
*/
1229
SHARD,
13-
30+
/**
31+
* Partition on segment boundaries, this doesn't allow forking to as many CPUs
32+
* as {@link #DOC} but it has much lower overhead.
33+
* <p>
34+
* It packs segments smaller than {@link LuceneSliceQueue#MAX_DOCS_PER_SLICE}
35+
* docs together into a partition. Larger segments get their own partition.
36+
* Each slice contains no more than {@link LuceneSliceQueue#MAX_SEGMENTS_PER_SLICE}.
37+
*/
1438
SEGMENT,
15-
39+
/**
40+
* Partition each shard into {@code task_concurrency} partitions, splitting
41+
* larger segments into slices. This allows bringing the most CPUs to bear on
42+
* the problem but adds extra overhead, especially in query preparation.
43+
*/
1644
DOC,
1745
}

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneCountOperator.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,16 @@ public Factory(
4949
int taskConcurrency,
5050
int limit
5151
) {
52-
super(contexts, weightFunction(queryFunction, ScoreMode.COMPLETE_NO_SCORES), dataPartitioning, taskConcurrency, limit, false);
52+
super(
53+
contexts,
54+
queryFunction,
55+
dataPartitioning,
56+
query -> LuceneSliceQueue.PartitioningStrategy.SHARD,
57+
taskConcurrency,
58+
limit,
59+
false,
60+
ScoreMode.COMPLETE_NO_SCORES
61+
);
5362
}
5463

5564
@Override

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneMaxFactory.java

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,6 @@
2323
import java.util.List;
2424
import java.util.function.Function;
2525

26-
import static org.elasticsearch.compute.lucene.LuceneOperator.weightFunction;
27-
2826
/**
2927
* Factory that generates an operator that finds the max value of a field using the {@link LuceneMinMaxOperator}.
3028
*/
@@ -123,7 +121,16 @@ public LuceneMaxFactory(
123121
NumberType numberType,
124122
int limit
125123
) {
126-
super(contexts, weightFunction(queryFunction, ScoreMode.COMPLETE_NO_SCORES), dataPartitioning, taskConcurrency, limit, false);
124+
super(
125+
contexts,
126+
queryFunction,
127+
dataPartitioning,
128+
query -> LuceneSliceQueue.PartitioningStrategy.SHARD,
129+
taskConcurrency,
130+
limit,
131+
false,
132+
ScoreMode.COMPLETE_NO_SCORES
133+
);
127134
this.fieldName = fieldName;
128135
this.numberType = numberType;
129136
}

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneMinFactory.java

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,6 @@
2323
import java.util.List;
2424
import java.util.function.Function;
2525

26-
import static org.elasticsearch.compute.lucene.LuceneOperator.weightFunction;
27-
2826
/**
2927
* Factory that generates an operator that finds the min value of a field using the {@link LuceneMinMaxOperator}.
3028
*/
@@ -123,7 +121,16 @@ public LuceneMinFactory(
123121
NumberType numberType,
124122
int limit
125123
) {
126-
super(contexts, weightFunction(queryFunction, ScoreMode.COMPLETE_NO_SCORES), dataPartitioning, taskConcurrency, limit, false);
124+
super(
125+
contexts,
126+
queryFunction,
127+
dataPartitioning,
128+
query -> LuceneSliceQueue.PartitioningStrategy.SHARD,
129+
taskConcurrency,
130+
limit,
131+
false,
132+
ScoreMode.COMPLETE_NO_SCORES
133+
);
127134
this.fieldName = fieldName;
128135
this.numberType = numberType;
129136
}

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneOperator.java

Lines changed: 38 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@
99

1010
import org.apache.lucene.index.LeafReaderContext;
1111
import org.apache.lucene.search.BulkScorer;
12-
import org.apache.lucene.search.ConstantScoreQuery;
1312
import org.apache.lucene.search.DocIdSetIterator;
1413
import org.apache.lucene.search.LeafCollector;
1514
import org.apache.lucene.search.Query;
@@ -37,12 +36,16 @@
3736
import java.util.Collections;
3837
import java.util.HashSet;
3938
import java.util.List;
39+
import java.util.Map;
4040
import java.util.Objects;
4141
import java.util.Set;
42+
import java.util.TreeMap;
4243
import java.util.TreeSet;
4344
import java.util.function.Function;
4445
import java.util.stream.Collectors;
4546

47+
import static org.elasticsearch.TransportVersions.ESQL_REPORT_SHARD_PARTITIONING;
48+
4649
public abstract class LuceneOperator extends SourceOperator {
4750
private static final Logger logger = LogManager.getLogger(LuceneOperator.class);
4851

@@ -93,15 +96,17 @@ public abstract static class Factory implements SourceOperator.SourceOperatorFac
9396
*/
9497
protected Factory(
9598
List<? extends ShardContext> contexts,
96-
Function<ShardContext, Weight> weightFunction,
99+
Function<ShardContext, Query> queryFunction,
97100
DataPartitioning dataPartitioning,
101+
Function<Query, LuceneSliceQueue.PartitioningStrategy> autoStrategy,
98102
int taskConcurrency,
99103
int limit,
100-
boolean needsScore
104+
boolean needsScore,
105+
ScoreMode scoreMode
101106
) {
102107
this.limit = limit;
103108
this.dataPartitioning = dataPartitioning;
104-
this.sliceQueue = LuceneSliceQueue.create(contexts, weightFunction, dataPartitioning, taskConcurrency);
109+
this.sliceQueue = LuceneSliceQueue.create(contexts, queryFunction, dataPartitioning, autoStrategy, taskConcurrency, scoreMode);
105110
this.taskConcurrency = Math.min(sliceQueue.totalSlices(), taskConcurrency);
106111
this.needsScore = needsScore;
107112
}
@@ -269,6 +274,7 @@ public static class Status implements Operator.Status {
269274
private final int sliceMax;
270275
private final int current;
271276
private final long rowsEmitted;
277+
private final Map<String, LuceneSliceQueue.PartitioningStrategy> partitioningStrategies;
272278

273279
private Status(LuceneOperator operator) {
274280
processedSlices = operator.processedSlices;
@@ -294,6 +300,7 @@ private Status(LuceneOperator operator) {
294300
}
295301
pagesEmitted = operator.pagesEmitted;
296302
rowsEmitted = operator.rowsEmitted;
303+
partitioningStrategies = operator.sliceQueue.partitioningStrategies();
297304
}
298305

299306
Status(
@@ -307,7 +314,8 @@ private Status(LuceneOperator operator) {
307314
int sliceMin,
308315
int sliceMax,
309316
int current,
310-
long rowsEmitted
317+
long rowsEmitted,
318+
Map<String, LuceneSliceQueue.PartitioningStrategy> partitioningStrategies
311319
) {
312320
this.processedSlices = processedSlices;
313321
this.processedQueries = processedQueries;
@@ -320,6 +328,7 @@ private Status(LuceneOperator operator) {
320328
this.sliceMax = sliceMax;
321329
this.current = current;
322330
this.rowsEmitted = rowsEmitted;
331+
this.partitioningStrategies = partitioningStrategies;
323332
}
324333

325334
Status(StreamInput in) throws IOException {
@@ -343,6 +352,9 @@ private Status(LuceneOperator operator) {
343352
} else {
344353
rowsEmitted = 0;
345354
}
355+
partitioningStrategies = in.getTransportVersion().onOrAfter(ESQL_REPORT_SHARD_PARTITIONING)
356+
? in.readMap(LuceneSliceQueue.PartitioningStrategy::readFrom)
357+
: Map.of();
346358
}
347359

348360
@Override
@@ -364,6 +376,9 @@ public void writeTo(StreamOutput out) throws IOException {
364376
if (out.getTransportVersion().onOrAfter(TransportVersions.ESQL_PROFILE_ROWS_PROCESSED)) {
365377
out.writeVLong(rowsEmitted);
366378
}
379+
if (out.getTransportVersion().onOrAfter(ESQL_REPORT_SHARD_PARTITIONING)) {
380+
out.writeMap(partitioningStrategies, StreamOutput::writeString, StreamOutput::writeWriteable);
381+
}
367382
}
368383

369384
@Override
@@ -415,6 +430,10 @@ public long rowsEmitted() {
415430
return rowsEmitted;
416431
}
417432

433+
public Map<String, LuceneSliceQueue.PartitioningStrategy> partitioningStrategies() {
434+
return partitioningStrategies;
435+
}
436+
418437
@Override
419438
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
420439
builder.startObject();
@@ -432,6 +451,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
432451
builder.field("slice_max", sliceMax);
433452
builder.field("current", current);
434453
builder.field("rows_emitted", rowsEmitted);
454+
builder.field("partitioning_strategies", new TreeMap<>(this.partitioningStrategies));
435455
return builder.endObject();
436456
}
437457

@@ -450,12 +470,23 @@ public boolean equals(Object o) {
450470
&& sliceMin == status.sliceMin
451471
&& sliceMax == status.sliceMax
452472
&& current == status.current
453-
&& rowsEmitted == status.rowsEmitted;
473+
&& rowsEmitted == status.rowsEmitted
474+
&& partitioningStrategies.equals(status.partitioningStrategies);
454475
}
455476

456477
@Override
457478
public int hashCode() {
458-
return Objects.hash(processedSlices, sliceIndex, totalSlices, pagesEmitted, sliceMin, sliceMax, current, rowsEmitted);
479+
return Objects.hash(
480+
processedSlices,
481+
sliceIndex,
482+
totalSlices,
483+
pagesEmitted,
484+
sliceMin,
485+
sliceMax,
486+
current,
487+
rowsEmitted,
488+
partitioningStrategies
489+
);
459490
}
460491

461492
@Override
@@ -468,17 +499,4 @@ public TransportVersion getMinimalSupportedVersion() {
468499
return TransportVersions.V_8_11_X;
469500
}
470501
}
471-
472-
static Function<ShardContext, Weight> weightFunction(Function<ShardContext, Query> queryFunction, ScoreMode scoreMode) {
473-
return ctx -> {
474-
final var query = queryFunction.apply(ctx);
475-
final var searcher = ctx.searcher();
476-
try {
477-
Query actualQuery = scoreMode.needsScores() ? query : new ConstantScoreQuery(query);
478-
return searcher.createWeight(searcher.rewrite(actualQuery), scoreMode, 1);
479-
} catch (IOException e) {
480-
throw new UncheckedIOException(e);
481-
}
482-
};
483-
}
484502
}

0 commit comments

Comments
 (0)