Skip to content
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
23ff7c5
ESQL: Heuristics to pick efficient partitioning
nik9000 Mar 26, 2025
47bb86c
Update docs/changelog/125739.yaml
nik9000 Mar 26, 2025
527c69a
Fix TODO
nik9000 Mar 27, 2025
815dec5
Merge remote-tracking branch 'nik9000/esql_auto_partition' into esql_…
nik9000 Mar 27, 2025
c72af75
Report partitioning strategies
nik9000 Mar 27, 2025
e2e817d
Test report
nik9000 Mar 27, 2025
a348ef9
test
nik9000 Mar 27, 2025
d201b4a
[CI] Auto commit changes from spotless
Mar 27, 2025
fd41476
Merge branch 'main' into esql_auto_partition
nik9000 Mar 27, 2025
8533d07
Merge branch 'main' into esql_auto_partition
nik9000 Mar 28, 2025
e72673c
Merge branch 'main' into esql_auto_partition
nik9000 Mar 31, 2025
05e487e
Merge branch 'main' into esql_auto_partition
nik9000 Apr 1, 2025
bc92147
[CI] Auto commit changes from spotless
Apr 1, 2025
05175e5
Cluster setting
nik9000 Apr 1, 2025
03f029c
in
nik9000 Apr 1, 2025
caf5f05
Merge remote-tracking branch 'nik9000/esql_auto_partition' into esql_…
nik9000 Apr 1, 2025
5ab7032
Fix rewrite
nik9000 Apr 1, 2025
8f72b1d
Merge branch 'main' into esql_auto_partition
nik9000 Apr 2, 2025
ae48a6f
Merge branch 'main' into esql_auto_partition
nik9000 Apr 7, 2025
a4fd6c5
Merge branch 'main' into esql_auto_partition
nik9000 Apr 8, 2025
2f7ecc8
Merge branch 'main' into esql_auto_partition
nik9000 Apr 9, 2025
2ab82a8
Merge branch 'main' into esql_auto_partition
nik9000 Apr 9, 2025
c816934
Fixup merge
nik9000 Apr 9, 2025
bf3c076
Merge branch 'main' into esql_auto_partition
nik9000 Apr 9, 2025
ad9e064
Try and make this more consistent
nik9000 Apr 9, 2025
334c1bc
Merge branch 'main' into esql_auto_partition
nik9000 Apr 9, 2025
66aa691
Merge branch 'main' into esql_auto_partition
nik9000 Apr 9, 2025
409067c
Merge branch 'main' into esql_auto_partition
nik9000 Apr 9, 2025
9b7728e
Merge branch 'main' into esql_auto_partition
nik9000 Apr 10, 2025
a0ebc79
Merge branch 'main' into esql_auto_partition
nik9000 Apr 10, 2025
77608d3
Merge branch 'main' into esql_auto_partition
nik9000 Apr 10, 2025
2eb1ac7
Merge remote-tracking branch 'nik9000/esql_auto_partition' into esql_…
nik9000 Apr 10, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/125739.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 125739
summary: Heuristics to pick efficient partitioning
area: ES|QL
type: enhancement
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,7 @@ static TransportVersion def(int id) {
public static final TransportVersion RESCORE_VECTOR_ALLOW_ZERO = def(9_039_0_00);
public static final TransportVersion PROJECT_ID_IN_SNAPSHOT = def(9_040_0_00);
public static final TransportVersion INDEX_STATS_AND_METADATA_INCLUDE_PEAK_WRITE_LOAD = def(9_041_0_00);
public static final TransportVersion ESQL_REPORT_SHARD_PARTITIONING = def(9_042_00_0);

/*
* STOP! READ THIS FIRST! No, really,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,39 @@

package org.elasticsearch.compute.lucene;

public enum DataPartitioning {
import org.elasticsearch.compute.operator.Driver;

/**
* How we partition the data across {@link Driver}s. Each request forks into
* {@code min(cpus, partition_count)} threads on the data node. More partitions
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unfortunately, we currently have to use min(1.5 * cpus, partition_count) because drivers consist of both I/O and CPU-bound operations.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll update the comment.

* allow us to bring more threads to bear on CPU intensive data node side tasks.
*/
public enum DataPartitioning {
/**
* Automatically select the data partitioning based on the query and index.
* Usually that's {@link #SEGMENT}, but for small indices it's {@link #SHARD}.
* When the additional overhead from {@link #DOC} is fairly low then it'll
* pick {@link #DOC}.
*/
AUTO,
/**
* Make one partition per shard. This is generally the slowest option, but it
* has the lowest CPU overhead.
*/
SHARD,

/**
* Partition on segment boundaries, this doesn't allow forking to as many CPUs
* as {@link #DOC} but it has much lower overhead.
* <p>
* It packs segments smaller than {@link LuceneSliceQueue#MAX_DOCS_PER_SLICE}
* docs together into a partition. Larger segments get their own partition.
* Each slice contains no more than {@link LuceneSliceQueue#MAX_SEGMENTS_PER_SLICE}.
*/
SEGMENT,

/**
* Partition each shard into {@code task_concurrency} partitions, splitting
* larger segments into slices. This allows bringing the most CPUs to bear on
* the problem but adds extra overhead, especially in query preparation.
*/
DOC,
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,15 @@ public Factory(
int taskConcurrency,
int limit
) {
super(contexts, queryFunction, dataPartitioning, taskConcurrency, limit, ScoreMode.COMPLETE_NO_SCORES);
super(
contexts,
queryFunction,
dataPartitioning,
query -> LuceneSliceQueue.PartitioningStrategy.SHARD,
taskConcurrency,
limit,
ScoreMode.COMPLETE_NO_SCORES
);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,15 @@ public LuceneMaxFactory(
NumberType numberType,
int limit
) {
super(contexts, queryFunction, dataPartitioning, taskConcurrency, limit, ScoreMode.COMPLETE_NO_SCORES);
super(
contexts,
queryFunction,
dataPartitioning,
query -> LuceneSliceQueue.PartitioningStrategy.SHARD,
taskConcurrency,
limit,
ScoreMode.COMPLETE_NO_SCORES
);
this.fieldName = fieldName;
this.numberType = numberType;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,15 @@ public LuceneMinFactory(
NumberType numberType,
int limit
) {
super(contexts, queryFunction, dataPartitioning, taskConcurrency, limit, ScoreMode.COMPLETE_NO_SCORES);
super(
contexts,
queryFunction,
dataPartitioning,
query -> LuceneSliceQueue.PartitioningStrategy.SHARD,
taskConcurrency,
limit,
ScoreMode.COMPLETE_NO_SCORES
);
this.fieldName = fieldName;
this.numberType = numberType;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@

import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.search.BulkScorer;
import org.apache.lucene.search.ConstantScoreQuery;
import org.apache.lucene.search.DocIdSetIterator;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.LeafCollector;
Expand Down Expand Up @@ -38,12 +37,16 @@
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.function.Function;
import java.util.stream.Collectors;

import static org.elasticsearch.TransportVersions.ESQL_REPORT_SHARD_PARTITIONING;

public abstract class LuceneOperator extends SourceOperator {
private static final Logger logger = LogManager.getLogger(LuceneOperator.class);

Expand Down Expand Up @@ -96,15 +99,15 @@ protected Factory(
List<? extends ShardContext> contexts,
Function<ShardContext, Query> queryFunction,
DataPartitioning dataPartitioning,
Function<Query, LuceneSliceQueue.PartitioningStrategy> autoStrategy,
int taskConcurrency,
int limit,
ScoreMode scoreMode
) {
this.limit = limit;
this.scoreMode = scoreMode;
this.dataPartitioning = dataPartitioning;
var weightFunction = weightFunction(queryFunction, scoreMode);
this.sliceQueue = LuceneSliceQueue.create(contexts, weightFunction, dataPartitioning, taskConcurrency);
this.sliceQueue = LuceneSliceQueue.create(contexts, queryFunction, dataPartitioning, autoStrategy, taskConcurrency, scoreMode);
this.taskConcurrency = Math.min(sliceQueue.totalSlices(), taskConcurrency);
}

Expand Down Expand Up @@ -271,6 +274,7 @@ public static class Status implements Operator.Status {
private final int sliceMax;
private final int current;
private final long rowsEmitted;
private final Map<String, LuceneSliceQueue.PartitioningStrategy> partitioningStrategies;

private Status(LuceneOperator operator) {
processedSlices = operator.processedSlices;
Expand All @@ -296,6 +300,7 @@ private Status(LuceneOperator operator) {
}
pagesEmitted = operator.pagesEmitted;
rowsEmitted = operator.rowsEmitted;
partitioningStrategies = operator.sliceQueue.partitioningStrategies();
}

Status(
Expand All @@ -309,7 +314,8 @@ private Status(LuceneOperator operator) {
int sliceMin,
int sliceMax,
int current,
long rowsEmitted
long rowsEmitted,
Map<String, LuceneSliceQueue.PartitioningStrategy> partitioningStrategies
) {
this.processedSlices = processedSlices;
this.processedQueries = processedQueries;
Expand All @@ -322,6 +328,7 @@ private Status(LuceneOperator operator) {
this.sliceMax = sliceMax;
this.current = current;
this.rowsEmitted = rowsEmitted;
this.partitioningStrategies = partitioningStrategies;
}

Status(StreamInput in) throws IOException {
Expand All @@ -345,6 +352,9 @@ private Status(LuceneOperator operator) {
} else {
rowsEmitted = 0;
}
partitioningStrategies = in.getTransportVersion().onOrAfter(ESQL_REPORT_SHARD_PARTITIONING)
? in.readMap(LuceneSliceQueue.PartitioningStrategy::readFrom)
: Map.of();
}

@Override
Expand All @@ -366,6 +376,9 @@ public void writeTo(StreamOutput out) throws IOException {
if (out.getTransportVersion().onOrAfter(TransportVersions.ESQL_PROFILE_ROWS_PROCESSED)) {
out.writeVLong(rowsEmitted);
}
if (out.getTransportVersion().onOrAfter(ESQL_REPORT_SHARD_PARTITIONING)) {
out.writeMap(partitioningStrategies, StreamOutput::writeString, StreamOutput::writeWriteable);
}
}

@Override
Expand Down Expand Up @@ -417,6 +430,10 @@ public long rowsEmitted() {
return rowsEmitted;
}

public Map<String, LuceneSliceQueue.PartitioningStrategy> partitioningStrategies() {
return partitioningStrategies;
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
Expand All @@ -434,6 +451,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
builder.field("slice_max", sliceMax);
builder.field("current", current);
builder.field("rows_emitted", rowsEmitted);
builder.field("partitioning_strategies", new TreeMap<>(this.partitioningStrategies));
return builder.endObject();
}

Expand All @@ -452,12 +470,23 @@ public boolean equals(Object o) {
&& sliceMin == status.sliceMin
&& sliceMax == status.sliceMax
&& current == status.current
&& rowsEmitted == status.rowsEmitted;
&& rowsEmitted == status.rowsEmitted
&& partitioningStrategies.equals(status.partitioningStrategies);
}

@Override
public int hashCode() {
return Objects.hash(processedSlices, sliceIndex, totalSlices, pagesEmitted, sliceMin, sliceMax, current, rowsEmitted);
return Objects.hash(
processedSlices,
sliceIndex,
totalSlices,
pagesEmitted,
sliceMin,
sliceMax,
current,
rowsEmitted,
partitioningStrategies
);
}

@Override
Expand All @@ -470,17 +499,4 @@ public TransportVersion getMinimalSupportedVersion() {
return TransportVersions.V_8_11_X;
}
}

static Function<ShardContext, Weight> weightFunction(Function<ShardContext, Query> queryFunction, ScoreMode scoreMode) {
return ctx -> {
final var query = queryFunction.apply(ctx);
final var searcher = ctx.searcher();
try {
Query actualQuery = scoreMode.needsScores() ? query : new ConstantScoreQuery(query);
return searcher.createWeight(searcher.rewrite(actualQuery), scoreMode, 1);
} catch (IOException e) {
throw new UncheckedIOException(e);
}
};
}
}
Loading
Loading