Skip to content

Commit 067addd

Browse files
committed
Merge remote-tracking branch 'es/main' into speedup_loading_dense_keyword_fields
2 parents 8e57dd0 + f9cdaaf commit 067addd

File tree

11 files changed

+556
-68
lines changed

11 files changed

+556
-68
lines changed

docs/changelog/132774.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 132774
2+
summary: Improve cpu utilization with dynamic slice size in doc partitioning
3+
area: ES|QL
4+
type: enhancement
5+
issues: []

docs/changelog/132973.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 132973
2+
summary: Preserve lost thread context in node inference action. A lost context causes a memory leak if APM tracing is enabled
3+
area: Machine Learning
4+
type: bug
5+
issues: []

muted-tests.yml

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -549,6 +549,15 @@ tests:
549549
- class: org.elasticsearch.xpack.esql.analysis.AnalyzerTests
550550
method: testMagnitudePlanWithDenseVectorImplicitCasting
551551
issue: https://github.com/elastic/elasticsearch/issues/132985
552+
- class: org.elasticsearch.xpack.search.AsyncSearchErrorTraceIT
553+
method: testAsyncSearchFailingQueryErrorTraceDefault
554+
issue: https://github.com/elastic/elasticsearch/issues/133010
555+
- class: org.elasticsearch.xpack.esql.plugin.IndexResolutionIT
556+
method: testDoesNotResolveClosedIndex
557+
issue: https://github.com/elastic/elasticsearch/issues/133011
558+
- class: org.elasticsearch.test.rest.yaml.CcsCommonYamlTestSuiteIT
559+
method: test {p0=search/510_range_query_out_of_bounds/Test range query for float field with out of bounds lower limit}
560+
issue: https://github.com/elastic/elasticsearch/issues/133012
552561

553562
# Examples:
554563
#

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/DataPartitioning.java

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,8 @@
99

1010
import org.elasticsearch.compute.operator.Driver;
1111

12+
import java.util.List;
13+
1214
/**
1315
* How we partition the data across {@link Driver}s. Each request forks into
1416
* {@code min(1.5 * cpus, partition_count)} threads on the data node. More partitions
@@ -37,9 +39,20 @@ public enum DataPartitioning {
3739
*/
3840
SEGMENT,
3941
/**
40-
* Partition each shard into {@code task_concurrency} partitions, splitting
41-
* larger segments into slices. This allows bringing the most CPUs to bear on
42-
* the problem but adds extra overhead, especially in query preparation.
42+
* Partitions into dynamic-sized slices to improve CPU utilization while keeping overhead low.
43+
* This approach is more flexible than {@link #SEGMENT} and works as follows:
44+
*
45+
* <ol>
46+
* <li>The slice size starts from a desired size based on {@code task_concurrency} but is capped
47+
* at around {@link LuceneSliceQueue#MAX_DOCS_PER_SLICE}. This prevents poor CPU usage when
48+
* matching documents are clustered together.</li>
49+
* <li>For small and medium segments (less than five times the desired slice size), it uses a
50+
* slightly different {@link #SEGMENT} strategy, which also splits segments that are larger
51+
* than the desired size. See {@link org.apache.lucene.search.IndexSearcher#slices(List, int, int, boolean)}.</li>
52+
* <li>For very large segments, multiple segments are not combined into a single slice. This allows
53+
* one driver to process an entire large segment until other drivers steal the work after finishing
54+
* their own tasks. See {@link LuceneSliceQueue#nextSlice(LuceneSlice)}.</li>
55+
* </ol>
4356
*/
44-
DOC,
57+
DOC
4558
}

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneOperator.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -165,7 +165,7 @@ LuceneScorer getCurrentOrLoadNextScorer() {
165165
while (currentScorer == null || currentScorer.isDone()) {
166166
if (currentSlice == null || sliceIndex >= currentSlice.numLeaves()) {
167167
sliceIndex = 0;
168-
currentSlice = sliceQueue.nextSlice();
168+
currentSlice = sliceQueue.nextSlice(currentSlice);
169169
if (currentSlice == null) {
170170
doneCollecting = true;
171171
return null;

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneSlice.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,13 @@
1414
/**
1515
* Holds a list of multiple partial Lucene segments
1616
*/
17-
public record LuceneSlice(ShardContext shardContext, List<PartialLeafReaderContext> leaves, Weight weight, List<Object> tags) {
17+
public record LuceneSlice(
18+
int slicePosition,
19+
ShardContext shardContext,
20+
List<PartialLeafReaderContext> leaves,
21+
Weight weight,
22+
List<Object> tags
23+
) {
1824
int numLeaves() {
1925
return leaves.size();
2026
}

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneSliceQueue.java

Lines changed: 150 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -16,18 +16,21 @@
1616
import org.elasticsearch.common.io.stream.StreamInput;
1717
import org.elasticsearch.common.io.stream.StreamOutput;
1818
import org.elasticsearch.common.io.stream.Writeable;
19+
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
1920
import org.elasticsearch.core.Nullable;
2021

2122
import java.io.IOException;
2223
import java.io.UncheckedIOException;
2324
import java.util.ArrayList;
2425
import java.util.Arrays;
2526
import java.util.Collection;
27+
import java.util.Collections;
28+
import java.util.Comparator;
2629
import java.util.HashMap;
2730
import java.util.List;
2831
import java.util.Map;
2932
import java.util.Queue;
30-
import java.util.concurrent.ConcurrentLinkedQueue;
33+
import java.util.concurrent.atomic.AtomicReferenceArray;
3134
import java.util.function.Function;
3235

3336
/**
@@ -77,18 +80,78 @@ public record QueryAndTags(Query query, List<Object> tags) {}
7780
public static final int MAX_SEGMENTS_PER_SLICE = 5; // copied from IndexSearcher
7881

7982
private final int totalSlices;
80-
private final Queue<LuceneSlice> slices;
8183
private final Map<String, PartitioningStrategy> partitioningStrategies;
8284

83-
private LuceneSliceQueue(List<LuceneSlice> slices, Map<String, PartitioningStrategy> partitioningStrategies) {
84-
this.totalSlices = slices.size();
85-
this.slices = new ConcurrentLinkedQueue<>(slices);
85+
private final AtomicReferenceArray<LuceneSlice> slices;
86+
/**
87+
* Queue of slice IDs that are the primary entry point for a new group of segments.
88+
* A driver should prioritize polling from this queue after failing to get a sequential
89+
* slice (the segment affinity). This ensures that threads start work on fresh,
90+
* independent segment groups before resorting to work stealing.
91+
*/
92+
private final Queue<Integer> sliceHeads;
93+
94+
/**
95+
* Queue of slice IDs that are not the primary entry point for a segment group.
96+
* This queue serves as a fallback pool for work stealing. When a thread has no more independent work,
97+
* it will "steal" a slice from this queue to keep itself utilized. A driver should pull tasks from
98+
* this queue only when {@code sliceHeads} has been exhausted.
99+
*/
100+
private final Queue<Integer> stealableSlices;
101+
102+
LuceneSliceQueue(List<LuceneSlice> sliceList, Map<String, PartitioningStrategy> partitioningStrategies) {
103+
this.totalSlices = sliceList.size();
104+
this.slices = new AtomicReferenceArray<>(sliceList.size());
105+
for (int i = 0; i < sliceList.size(); i++) {
106+
slices.set(i, sliceList.get(i));
107+
}
86108
this.partitioningStrategies = partitioningStrategies;
109+
this.sliceHeads = ConcurrentCollections.newQueue();
110+
this.stealableSlices = ConcurrentCollections.newQueue();
111+
for (LuceneSlice slice : sliceList) {
112+
if (slice.getLeaf(0).minDoc() == 0) {
113+
sliceHeads.add(slice.slicePosition());
114+
} else {
115+
stealableSlices.add(slice.slicePosition());
116+
}
117+
}
87118
}
88119

120+
/**
121+
* Retrieves the next available {@link LuceneSlice} for processing.
122+
* <p>
123+
* This method implements a three-tiered strategy to minimize the overhead of switching between segments:
124+
* 1. If a previous slice is provided, it first attempts to return the next sequential slice.
125+
* This keeps a thread working on the same segments, minimizing the overhead of segment switching.
126+
* 2. If affinity fails, it returns a slice from the {@link #sliceHeads} queue, which is an entry point for
127+
* a new, independent group of segments, allowing the calling Driver to work on a fresh set of segments.
128+
* 3. If the {@link #sliceHeads} queue is exhausted, it "steals" a slice
129+
* from the {@link #stealableSlices} queue. This fallback ensures all threads remain utilized.
130+
*
131+
* @param prev the previously returned {@link LuceneSlice}, or {@code null} if starting
132+
* @return the next available {@link LuceneSlice}, or {@code null} if exhausted
133+
*/
89134
@Nullable
90-
public LuceneSlice nextSlice() {
91-
return slices.poll();
135+
public LuceneSlice nextSlice(LuceneSlice prev) {
136+
if (prev != null) {
137+
final int nextId = prev.slicePosition() + 1;
138+
if (nextId < totalSlices) {
139+
var slice = slices.getAndSet(nextId, null);
140+
if (slice != null) {
141+
return slice;
142+
}
143+
}
144+
}
145+
for (var ids : List.of(sliceHeads, stealableSlices)) {
146+
Integer nextId;
147+
while ((nextId = ids.poll()) != null) {
148+
var slice = slices.getAndSet(nextId, null);
149+
if (slice != null) {
150+
return slice;
151+
}
152+
}
153+
}
154+
return null;
92155
}
93156

94157
public int totalSlices() {
@@ -103,7 +166,14 @@ public Map<String, PartitioningStrategy> partitioningStrategies() {
103166
}
104167

105168
public Collection<String> remainingShardsIdentifiers() {
106-
return slices.stream().map(slice -> slice.shardContext().shardIdentifier()).toList();
169+
List<String> remaining = new ArrayList<>(slices.length());
170+
for (int i = 0; i < slices.length(); i++) {
171+
LuceneSlice slice = slices.get(i);
172+
if (slice != null) {
173+
remaining.add(slice.shardContext().shardIdentifier());
174+
}
175+
}
176+
return remaining;
107177
}
108178

109179
public static LuceneSliceQueue create(
@@ -117,6 +187,7 @@ public static LuceneSliceQueue create(
117187
List<LuceneSlice> slices = new ArrayList<>();
118188
Map<String, PartitioningStrategy> partitioningStrategies = new HashMap<>(contexts.size());
119189

190+
int nextSliceId = 0;
120191
for (ShardContext ctx : contexts) {
121192
for (QueryAndTags queryAndExtra : queryFunction.apply(ctx)) {
122193
var scoreMode = scoreModeFunction.apply(ctx);
@@ -140,7 +211,7 @@ public static LuceneSliceQueue create(
140211
Weight weight = weight(ctx, query, scoreMode);
141212
for (List<PartialLeafReaderContext> group : groups) {
142213
if (group.isEmpty() == false) {
143-
slices.add(new LuceneSlice(ctx, group, weight, queryAndExtra.tags));
214+
slices.add(new LuceneSlice(nextSliceId++, ctx, group, weight, queryAndExtra.tags));
144215
}
145216
}
146217
}
@@ -158,7 +229,7 @@ public enum PartitioningStrategy implements Writeable {
158229
*/
159230
SHARD(0) {
160231
@Override
161-
List<List<PartialLeafReaderContext>> groups(IndexSearcher searcher, int requestedNumSlices) {
232+
List<List<PartialLeafReaderContext>> groups(IndexSearcher searcher, int taskConcurrency) {
162233
return List.of(searcher.getLeafContexts().stream().map(PartialLeafReaderContext::new).toList());
163234
}
164235
},
@@ -167,7 +238,7 @@ List<List<PartialLeafReaderContext>> groups(IndexSearcher searcher, int requeste
167238
*/
168239
SEGMENT(1) {
169240
@Override
170-
List<List<PartialLeafReaderContext>> groups(IndexSearcher searcher, int requestedNumSlices) {
241+
List<List<PartialLeafReaderContext>> groups(IndexSearcher searcher, int taskConcurrency) {
171242
IndexSearcher.LeafSlice[] gs = IndexSearcher.slices(
172243
searcher.getLeafContexts(),
173244
MAX_DOCS_PER_SLICE,
@@ -182,52 +253,11 @@ List<List<PartialLeafReaderContext>> groups(IndexSearcher searcher, int requeste
182253
*/
183254
DOC(2) {
184255
@Override
185-
List<List<PartialLeafReaderContext>> groups(IndexSearcher searcher, int requestedNumSlices) {
256+
List<List<PartialLeafReaderContext>> groups(IndexSearcher searcher, int taskConcurrency) {
186257
final int totalDocCount = searcher.getIndexReader().maxDoc();
187-
final int normalMaxDocsPerSlice = totalDocCount / requestedNumSlices;
188-
final int extraDocsInFirstSlice = totalDocCount % requestedNumSlices;
189-
final List<List<PartialLeafReaderContext>> slices = new ArrayList<>();
190-
int docsAllocatedInCurrentSlice = 0;
191-
List<PartialLeafReaderContext> currentSlice = null;
192-
int maxDocsPerSlice = normalMaxDocsPerSlice + extraDocsInFirstSlice;
193-
for (LeafReaderContext ctx : searcher.getLeafContexts()) {
194-
final int numDocsInLeaf = ctx.reader().maxDoc();
195-
int minDoc = 0;
196-
while (minDoc < numDocsInLeaf) {
197-
int numDocsToUse = Math.min(maxDocsPerSlice - docsAllocatedInCurrentSlice, numDocsInLeaf - minDoc);
198-
if (numDocsToUse <= 0) {
199-
break;
200-
}
201-
if (currentSlice == null) {
202-
currentSlice = new ArrayList<>();
203-
}
204-
currentSlice.add(new PartialLeafReaderContext(ctx, minDoc, minDoc + numDocsToUse));
205-
minDoc += numDocsToUse;
206-
docsAllocatedInCurrentSlice += numDocsToUse;
207-
if (docsAllocatedInCurrentSlice == maxDocsPerSlice) {
208-
slices.add(currentSlice);
209-
// once the first slice with the extra docs is added, no need for extra docs
210-
maxDocsPerSlice = normalMaxDocsPerSlice;
211-
currentSlice = null;
212-
docsAllocatedInCurrentSlice = 0;
213-
}
214-
}
215-
}
216-
if (currentSlice != null) {
217-
slices.add(currentSlice);
218-
}
219-
if (requestedNumSlices < totalDocCount && slices.size() != requestedNumSlices) {
220-
throw new IllegalStateException("wrong number of slices, expected " + requestedNumSlices + " but got " + slices.size());
221-
}
222-
if (slices.stream()
223-
.flatMapToInt(
224-
l -> l.stream()
225-
.mapToInt(partialLeafReaderContext -> partialLeafReaderContext.maxDoc() - partialLeafReaderContext.minDoc())
226-
)
227-
.sum() != totalDocCount) {
228-
throw new IllegalStateException("wrong doc count");
229-
}
230-
return slices;
258+
// Cap the desired slice to prevent CPU underutilization when matching documents are concentrated in one segment region.
259+
int desiredSliceSize = Math.clamp(Math.ceilDiv(totalDocCount, taskConcurrency), 1, MAX_DOCS_PER_SLICE);
260+
return new AdaptivePartitioner(Math.max(1, desiredSliceSize), MAX_SEGMENTS_PER_SLICE).partition(searcher.getLeafContexts());
231261
}
232262
};
233263

@@ -252,7 +282,7 @@ public void writeTo(StreamOutput out) throws IOException {
252282
out.writeByte(id);
253283
}
254284

255-
abstract List<List<PartialLeafReaderContext>> groups(IndexSearcher searcher, int requestedNumSlices);
285+
abstract List<List<PartialLeafReaderContext>> groups(IndexSearcher searcher, int taskConcurrency);
256286

257287
private static PartitioningStrategy pick(
258288
DataPartitioning dataPartitioning,
@@ -291,4 +321,67 @@ static Weight weight(ShardContext ctx, Query query, ScoreMode scoreMode) {
291321
throw new UncheckedIOException(e);
292322
}
293323
}
324+
325+
static final class AdaptivePartitioner {
326+
final int desiredDocsPerSlice;
327+
final int maxDocsPerSlice;
328+
final int maxSegmentsPerSlice;
329+
330+
AdaptivePartitioner(int desiredDocsPerSlice, int maxSegmentsPerSlice) {
331+
this.desiredDocsPerSlice = desiredDocsPerSlice;
332+
this.maxDocsPerSlice = desiredDocsPerSlice * 5 / 4;
333+
this.maxSegmentsPerSlice = maxSegmentsPerSlice;
334+
}
335+
336+
List<List<PartialLeafReaderContext>> partition(List<LeafReaderContext> leaves) {
337+
List<LeafReaderContext> smallSegments = new ArrayList<>();
338+
List<LeafReaderContext> largeSegments = new ArrayList<>();
339+
List<List<PartialLeafReaderContext>> results = new ArrayList<>();
340+
for (LeafReaderContext leaf : leaves) {
341+
if (leaf.reader().maxDoc() >= 5 * desiredDocsPerSlice) {
342+
largeSegments.add(leaf);
343+
} else {
344+
smallSegments.add(leaf);
345+
}
346+
}
347+
largeSegments.sort(Collections.reverseOrder(Comparator.comparingInt(l -> l.reader().maxDoc())));
348+
for (LeafReaderContext segment : largeSegments) {
349+
results.addAll(partitionOneLargeSegment(segment));
350+
}
351+
results.addAll(partitionSmallSegments(smallSegments));
352+
return results;
353+
}
354+
355+
List<List<PartialLeafReaderContext>> partitionOneLargeSegment(LeafReaderContext leaf) {
356+
int numDocsInLeaf = leaf.reader().maxDoc();
357+
int numSlices = Math.max(1, numDocsInLeaf / desiredDocsPerSlice);
358+
while (Math.ceilDiv(numDocsInLeaf, numSlices) > maxDocsPerSlice) {
359+
numSlices++;
360+
}
361+
int docPerSlice = numDocsInLeaf / numSlices;
362+
int leftoverDocs = numDocsInLeaf % numSlices;
363+
int minDoc = 0;
364+
List<List<PartialLeafReaderContext>> results = new ArrayList<>();
365+
while (minDoc < numDocsInLeaf) {
366+
int docsToUse = docPerSlice;
367+
if (leftoverDocs > 0) {
368+
--leftoverDocs;
369+
docsToUse++;
370+
}
371+
int maxDoc = Math.min(minDoc + docsToUse, numDocsInLeaf);
372+
results.add(List.of(new PartialLeafReaderContext(leaf, minDoc, maxDoc)));
373+
minDoc = maxDoc;
374+
}
375+
assert leftoverDocs == 0 : leftoverDocs;
376+
assert results.stream().allMatch(s -> s.size() == 1) : "must have one partial leaf per slice";
377+
assert results.stream().flatMapToInt(ss -> ss.stream().mapToInt(s -> s.maxDoc() - s.minDoc())).sum() == numDocsInLeaf;
378+
return results;
379+
}
380+
381+
List<List<PartialLeafReaderContext>> partitionSmallSegments(List<LeafReaderContext> leaves) {
382+
var slices = IndexSearcher.slices(leaves, maxDocsPerSlice, maxSegmentsPerSlice, true);
383+
return Arrays.stream(slices).map(g -> Arrays.stream(g.partitions).map(PartialLeafReaderContext::new).toList()).toList();
384+
}
385+
}
386+
294387
}

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/TimeSeriesSourceOperator.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,7 @@ public Page getCheckedOutput() throws IOException {
9797
long startInNanos = System.nanoTime();
9898
try {
9999
if (iterator == null) {
100-
var slice = sliceQueue.nextSlice();
100+
var slice = sliceQueue.nextSlice(null);
101101
if (slice == null) {
102102
doneCollecting = true;
103103
return null;

0 commit comments

Comments
 (0)