Skip to content

Commit f93244a

Browse files
authored
ESQL: Heuristics to pick efficient partitioning (#125739) (#127835)
Adds heuristics to pick an efficient partitioning strategy based on the index and rewritten query. This speeds up some queries by throwing more cores at the problem: ``` FROM test | STATS SUM(b) Before: took: 31 CPU: 222.3% After: took: 15 CPU: 806.9% ``` It also lowers the overhead of simpler queries by throwing less cores at the problem when it won't really speed anything up: ``` FROM test Before: took: 1 CPU: 48.5% After: took: 1 CPU: 70.4% ``` We have had a `pragma` to control our data partitioning for a long time, this just looks at the query to pick a partitioning scheme. The partitioning options: * `shard`: use one core per shard * `segment`: use one core per large segment * `doc`: break each shard into as many segments as there are cores `doc` is the fastest, but has a lot of overhead, especially for complex Lucene queries. `segment` is fast, but doesn't make the most out of CPUs when there are few segments. `shard` has the lowest overhead. Previously we always used `segment` partitioning because it doesn't have the terrible overhead but is fast. With this change we use `doc` when the top level query matches all documents - those have very very low overhead even in the `doc` partitioning. That's the twice as fast example above. This also uses the `shard` partitioning for queries that don't have to do much work like `FROM foo` or `FROM foo | LIMIT 1` or `FROM foo | SORT a`. That's the lower CPU example above. This forking choice is taken very late on the data node. So queries like this: ``` FROM test | WHERE @timestamp > "2025-01-01T00:00:00Z" | STATS SUM(b) ``` can also use the `doc` partitioning when all documents are after the timestamp and all documents have `b`.
1 parent dbbf4bb commit f93244a

File tree

25 files changed

+908
-138
lines changed

25 files changed

+908
-138
lines changed

docs/changelog/125739.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 125739
2+
summary: Heuristics to pick efficient partitioning
3+
area: ES|QL
4+
type: enhancement
5+
issues: []

server/src/main/java/org/elasticsearch/TransportVersions.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -216,6 +216,7 @@ static TransportVersion def(int id) {
216216
public static final TransportVersion INTRODUCE_FAILURES_LIFECYCLE_BACKPORT_8_19 = def(8_841_0_25);
217217
public static final TransportVersion INTRODUCE_FAILURES_DEFAULT_RETENTION_BACKPORT_8_19 = def(8_841_0_26);
218218
public static final TransportVersion RESCORE_VECTOR_ALLOW_ZERO_BACKPORT_8_19 = def(8_841_0_27);
219+
public static final TransportVersion ESQL_REPORT_SHARD_PARTITIONING_8_19 = def(8_841_0_28);
219220

220221
/*
221222
* STOP! READ THIS FIRST! No, really,

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/DataPartitioning.java

Lines changed: 31 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,11 +7,39 @@
77

88
package org.elasticsearch.compute.lucene;
99

10-
public enum DataPartitioning {
10+
import org.elasticsearch.compute.operator.Driver;
1111

12+
/**
13+
* How we partition the data across {@link Driver}s. Each request forks into
14+
* {@code min(1.5 * cpus, partition_count)} threads on the data node. More partitions
15+
* allow us to bring more threads to bear on CPU intensive data node side tasks.
16+
*/
17+
public enum DataPartitioning {
18+
/**
19+
* Automatically select the data partitioning based on the query and index.
20+
* Usually that's {@link #SEGMENT}, but for small indices it's {@link #SHARD}.
21+
* When the additional overhead from {@link #DOC} is fairly low then it'll
22+
* pick {@link #DOC}.
23+
*/
24+
AUTO,
25+
/**
26+
* Make one partition per shard. This is generally the slowest option, but it
27+
* has the lowest CPU overhead.
28+
*/
1229
SHARD,
13-
30+
/**
31+
* Partition on segment boundaries, this doesn't allow forking to as many CPUs
32+
* as {@link #DOC} but it has much lower overhead.
33+
* <p>
34+
* It packs segments smaller than {@link LuceneSliceQueue#MAX_DOCS_PER_SLICE}
35+
* docs together into a partition. Larger segments get their own partition.
36+
* Each slice contains no more than {@link LuceneSliceQueue#MAX_SEGMENTS_PER_SLICE}.
37+
*/
1438
SEGMENT,
15-
39+
/**
40+
* Partition each shard into {@code task_concurrency} partitions, splitting
41+
* larger segments into slices. This allows bringing the most CPUs to bear on
42+
* the problem but adds extra overhead, especially in query preparation.
43+
*/
1644
DOC,
1745
}

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneCountOperator.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,16 @@ public Factory(
4949
int taskConcurrency,
5050
int limit
5151
) {
52-
super(contexts, weightFunction(queryFunction, ScoreMode.COMPLETE_NO_SCORES), dataPartitioning, taskConcurrency, limit, false);
52+
super(
53+
contexts,
54+
queryFunction,
55+
dataPartitioning,
56+
query -> LuceneSliceQueue.PartitioningStrategy.SHARD,
57+
taskConcurrency,
58+
limit,
59+
false,
60+
ScoreMode.COMPLETE_NO_SCORES
61+
);
5362
}
5463

5564
@Override

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneMaxFactory.java

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,6 @@
2323
import java.util.List;
2424
import java.util.function.Function;
2525

26-
import static org.elasticsearch.compute.lucene.LuceneOperator.weightFunction;
27-
2826
/**
2927
* Factory that generates an operator that finds the max value of a field using the {@link LuceneMinMaxOperator}.
3028
*/
@@ -123,7 +121,16 @@ public LuceneMaxFactory(
123121
NumberType numberType,
124122
int limit
125123
) {
126-
super(contexts, weightFunction(queryFunction, ScoreMode.COMPLETE_NO_SCORES), dataPartitioning, taskConcurrency, limit, false);
124+
super(
125+
contexts,
126+
queryFunction,
127+
dataPartitioning,
128+
query -> LuceneSliceQueue.PartitioningStrategy.SHARD,
129+
taskConcurrency,
130+
limit,
131+
false,
132+
ScoreMode.COMPLETE_NO_SCORES
133+
);
127134
this.fieldName = fieldName;
128135
this.numberType = numberType;
129136
}

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneMinFactory.java

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,6 @@
2323
import java.util.List;
2424
import java.util.function.Function;
2525

26-
import static org.elasticsearch.compute.lucene.LuceneOperator.weightFunction;
27-
2826
/**
2927
* Factory that generates an operator that finds the min value of a field using the {@link LuceneMinMaxOperator}.
3028
*/
@@ -123,7 +121,16 @@ public LuceneMinFactory(
123121
NumberType numberType,
124122
int limit
125123
) {
126-
super(contexts, weightFunction(queryFunction, ScoreMode.COMPLETE_NO_SCORES), dataPartitioning, taskConcurrency, limit, false);
124+
super(
125+
contexts,
126+
queryFunction,
127+
dataPartitioning,
128+
query -> LuceneSliceQueue.PartitioningStrategy.SHARD,
129+
taskConcurrency,
130+
limit,
131+
false,
132+
ScoreMode.COMPLETE_NO_SCORES
133+
);
127134
this.fieldName = fieldName;
128135
this.numberType = numberType;
129136
}

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneOperator.java

Lines changed: 38 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@
99

1010
import org.apache.lucene.index.LeafReaderContext;
1111
import org.apache.lucene.search.BulkScorer;
12-
import org.apache.lucene.search.ConstantScoreQuery;
1312
import org.apache.lucene.search.DocIdSetIterator;
1413
import org.apache.lucene.search.LeafCollector;
1514
import org.apache.lucene.search.Query;
@@ -36,12 +35,16 @@
3635
import java.util.Collections;
3736
import java.util.HashSet;
3837
import java.util.List;
38+
import java.util.Map;
3939
import java.util.Objects;
4040
import java.util.Set;
41+
import java.util.TreeMap;
4142
import java.util.TreeSet;
4243
import java.util.function.Function;
4344
import java.util.stream.Collectors;
4445

46+
import static org.elasticsearch.TransportVersions.ESQL_REPORT_SHARD_PARTITIONING_8_19;
47+
4548
public abstract class LuceneOperator extends SourceOperator {
4649
private static final Logger logger = LogManager.getLogger(LuceneOperator.class);
4750

@@ -92,15 +95,17 @@ public abstract static class Factory implements SourceOperator.SourceOperatorFac
9295
*/
9396
protected Factory(
9497
List<? extends ShardContext> contexts,
95-
Function<ShardContext, Weight> weightFunction,
98+
Function<ShardContext, Query> queryFunction,
9699
DataPartitioning dataPartitioning,
100+
Function<Query, LuceneSliceQueue.PartitioningStrategy> autoStrategy,
97101
int taskConcurrency,
98102
int limit,
99-
boolean needsScore
103+
boolean needsScore,
104+
ScoreMode scoreMode
100105
) {
101106
this.limit = limit;
102107
this.dataPartitioning = dataPartitioning;
103-
this.sliceQueue = LuceneSliceQueue.create(contexts, weightFunction, dataPartitioning, taskConcurrency);
108+
this.sliceQueue = LuceneSliceQueue.create(contexts, queryFunction, dataPartitioning, autoStrategy, taskConcurrency, scoreMode);
104109
this.taskConcurrency = Math.min(sliceQueue.totalSlices(), taskConcurrency);
105110
this.needsScore = needsScore;
106111
}
@@ -260,6 +265,7 @@ public static class Status implements Operator.Status {
260265
private final int sliceMax;
261266
private final int current;
262267
private final long rowsEmitted;
268+
private final Map<String, LuceneSliceQueue.PartitioningStrategy> partitioningStrategies;
263269

264270
private Status(LuceneOperator operator) {
265271
processedSlices = operator.processedSlices;
@@ -285,6 +291,7 @@ private Status(LuceneOperator operator) {
285291
}
286292
pagesEmitted = operator.pagesEmitted;
287293
rowsEmitted = operator.rowsEmitted;
294+
partitioningStrategies = operator.sliceQueue.partitioningStrategies();
288295
}
289296

290297
Status(
@@ -298,7 +305,8 @@ private Status(LuceneOperator operator) {
298305
int sliceMin,
299306
int sliceMax,
300307
int current,
301-
long rowsEmitted
308+
long rowsEmitted,
309+
Map<String, LuceneSliceQueue.PartitioningStrategy> partitioningStrategies
302310
) {
303311
this.processedSlices = processedSlices;
304312
this.processedQueries = processedQueries;
@@ -311,6 +319,7 @@ private Status(LuceneOperator operator) {
311319
this.sliceMax = sliceMax;
312320
this.current = current;
313321
this.rowsEmitted = rowsEmitted;
322+
this.partitioningStrategies = partitioningStrategies;
314323
}
315324

316325
Status(StreamInput in) throws IOException {
@@ -334,6 +343,9 @@ private Status(LuceneOperator operator) {
334343
} else {
335344
rowsEmitted = 0;
336345
}
346+
partitioningStrategies = in.getTransportVersion().onOrAfter(ESQL_REPORT_SHARD_PARTITIONING_8_19)
347+
? in.readMap(LuceneSliceQueue.PartitioningStrategy::readFrom)
348+
: Map.of();
337349
}
338350

339351
@Override
@@ -355,6 +367,9 @@ public void writeTo(StreamOutput out) throws IOException {
355367
if (out.getTransportVersion().onOrAfter(TransportVersions.ESQL_PROFILE_ROWS_PROCESSED)) {
356368
out.writeVLong(rowsEmitted);
357369
}
370+
if (out.getTransportVersion().onOrAfter(ESQL_REPORT_SHARD_PARTITIONING_8_19)) {
371+
out.writeMap(partitioningStrategies, StreamOutput::writeString, StreamOutput::writeWriteable);
372+
}
358373
}
359374

360375
@Override
@@ -406,6 +421,10 @@ public long rowsEmitted() {
406421
return rowsEmitted;
407422
}
408423

424+
public Map<String, LuceneSliceQueue.PartitioningStrategy> partitioningStrategies() {
425+
return partitioningStrategies;
426+
}
427+
409428
@Override
410429
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
411430
builder.startObject();
@@ -423,6 +442,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
423442
builder.field("slice_max", sliceMax);
424443
builder.field("current", current);
425444
builder.field("rows_emitted", rowsEmitted);
445+
builder.field("partitioning_strategies", new TreeMap<>(this.partitioningStrategies));
426446
return builder.endObject();
427447
}
428448

@@ -441,12 +461,23 @@ public boolean equals(Object o) {
441461
&& sliceMin == status.sliceMin
442462
&& sliceMax == status.sliceMax
443463
&& current == status.current
444-
&& rowsEmitted == status.rowsEmitted;
464+
&& rowsEmitted == status.rowsEmitted
465+
&& partitioningStrategies.equals(status.partitioningStrategies);
445466
}
446467

447468
@Override
448469
public int hashCode() {
449-
return Objects.hash(processedSlices, sliceIndex, totalSlices, pagesEmitted, sliceMin, sliceMax, current, rowsEmitted);
470+
return Objects.hash(
471+
processedSlices,
472+
sliceIndex,
473+
totalSlices,
474+
pagesEmitted,
475+
sliceMin,
476+
sliceMax,
477+
current,
478+
rowsEmitted,
479+
partitioningStrategies
480+
);
450481
}
451482

452483
@Override
@@ -459,17 +490,4 @@ public TransportVersion getMinimalSupportedVersion() {
459490
return TransportVersions.V_8_11_X;
460491
}
461492
}
462-
463-
static Function<ShardContext, Weight> weightFunction(Function<ShardContext, Query> queryFunction, ScoreMode scoreMode) {
464-
return ctx -> {
465-
final var query = queryFunction.apply(ctx);
466-
final var searcher = ctx.searcher();
467-
try {
468-
Query actualQuery = scoreMode.needsScores() ? query : new ConstantScoreQuery(query);
469-
return searcher.createWeight(searcher.rewrite(actualQuery), scoreMode, 1);
470-
} catch (IOException e) {
471-
throw new UncheckedIOException(e);
472-
}
473-
};
474-
}
475493
}

0 commit comments

Comments
 (0)