Skip to content

Commit 22d8230

Browse files
committed
javadoc
1 parent 50ebdd3 commit 22d8230

File tree

2 files changed

+40
-24
lines changed

2 files changed

+40
-24
lines changed

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/DataPartitioning.java

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -42,17 +42,17 @@ public enum DataPartitioning {
4242
* Partitions into dynamic-sized slices to improve CPU utilization while keeping overhead low.
4343
* This approach is more flexible than {@link #SEGMENT} and works as follows:
4444
*
45-
* <p>1. The slice size starts from a desired size based on {@code task_concurrency} but is capped
46-
* at around {@link LuceneSliceQueue#MAX_DOCS_PER_SLICE}. This prevents poor CPU usage when
47-
* matching documents are clustered together.
48-
*
49-
* <p>2. For small and medium segments (less than five times the desired slice size), it uses a
50-
* slightly different {@link #SEGMENT} strategy, which also splits segments that are larger
51-
* than the desired size. See {@link org.apache.lucene.search.IndexSearcher#slices(List, int, int, boolean)}.
52-
*
53-
* <p>3. For very large segments, multiple segments are not combined into a single slice. This allows
54-
* one driver to process an entire large segment until other drivers steal the work after finishing
55-
* their own tasks. See {@link LuceneSliceQueue#nextSlice(LuceneSlice)}.
45+
* <ol>
46+
* <li>The slice size starts from a desired size based on {@code task_concurrency} but is capped
47+
* at around {@link LuceneSliceQueue#MAX_DOCS_PER_SLICE}. This prevents poor CPU usage when
48+
* matching documents are clustered together.</li>
49+
* <li>For small and medium segments (less than five times the desired slice size), it uses a
50+
* slightly different {@link #SEGMENT} strategy, which also splits segments that are larger
51+
* than the desired size. See {@link org.apache.lucene.search.IndexSearcher#slices(List, int, int, boolean)}.</li>
52+
* <li>For very large segments, multiple segments are not combined into a single slice. This allows
53+
* one driver to process an entire large segment until other drivers steal the work after finishing
54+
* their own tasks. See {@link LuceneSliceQueue#nextSlice(LuceneSlice)}.</li>
55+
* </ol>
5656
*/
5757
DOC
5858
}

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneSliceQueue.java

Lines changed: 29 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -80,37 +80,53 @@ public record QueryAndTags(Query query, List<Object> tags) {}
8080
public static final int MAX_SEGMENTS_PER_SLICE = 5; // copied from IndexSearcher
8181

8282
private final int totalSlices;
83-
private final AtomicReferenceArray<LuceneSlice> slices;
84-
private final Queue<Integer> startedPositions;
85-
private final Queue<Integer> followedPositions;
8683
private final Map<String, PartitioningStrategy> partitioningStrategies;
8784

85+
private final AtomicReferenceArray<LuceneSlice> slices;
86+
/**
87+
* Queue of slice IDs that are the primary entry point for a new group of segments.
88+
* A driver should prioritize polling from this queue after failing to get a sequential
89+
* slice (the segment affinity). This ensures that threads start work on fresh,
90+
* independent segment groups before resorting to work stealing.
91+
*/
92+
private final Queue<Integer> sliceHeads;
93+
94+
/**
95+
* Queue of slice IDs that are not the primary entry point for a segment group.
96+
* This queue serves as a fallback pool for work stealing. When a thread has no more independent work,
97+
* it will "steal" a slice from this queue to keep itself utilized. A driver should pull tasks from
98+
* this queue only when {@code sliceHeads} has been exhausted.
99+
*/
100+
private final Queue<Integer> stealableSlices;
101+
88102
LuceneSliceQueue(List<LuceneSlice> sliceList, Map<String, PartitioningStrategy> partitioningStrategies) {
89103
this.totalSlices = sliceList.size();
90104
this.slices = new AtomicReferenceArray<>(sliceList.size());
91105
for (int i = 0; i < sliceList.size(); i++) {
92106
slices.set(i, sliceList.get(i));
93107
}
94108
this.partitioningStrategies = partitioningStrategies;
95-
this.startedPositions = ConcurrentCollections.newQueue();
96-
this.followedPositions = ConcurrentCollections.newQueue();
109+
this.sliceHeads = ConcurrentCollections.newQueue();
110+
this.stealableSlices = ConcurrentCollections.newQueue();
97111
for (LuceneSlice slice : sliceList) {
98112
if (slice.getLeaf(0).minDoc() == 0) {
99-
startedPositions.add(slice.slicePosition());
113+
sliceHeads.add(slice.slicePosition());
100114
} else {
101-
followedPositions.add(slice.slicePosition());
115+
stealableSlices.add(slice.slicePosition());
102116
}
103117
}
104118
}
105119

106120
/**
107121
* Retrieves the next available {@link LuceneSlice} for processing.
108-
* If a previous slice is provided, this method first attempts to return the next sequential slice to maintain segment affinity
109-
* and minimize the cost of switching between segments.
110122
* <p>
111-
* If no sequential slice is available, it returns the next slice from the {@code startedPositions} queue, which starts a new
112-
* group of segments. If all started positions are exhausted, it steals a slice from the {@code followedPositions} queue,
113-
* enabling work stealing.
123+
* This method implements a three-tiered strategy to minimize the overhead of switching between segments:
124+
* 1. If a previous slice is provided, it first attempts to return the next sequential slice.
125+
* This keeps a thread working on the same segments, minimizing the overhead of segment switching.
126+
* 2. If affinity fails, it returns a slice from the {@link #sliceHeads} queue, which is an entry point for
127+
* a new, independent group of segments, allowing the calling Driver to work on a fresh set of segments.
128+
* 3. If the {@link #sliceHeads} queue is exhausted, it "steals" a slice
129+
* from the {@link #stealableSlices} queue. This fallback ensures all threads remain utilized.
114130
*
115131
* @param prev the previously returned {@link LuceneSlice}, or {@code null} if starting
116132
* @return the next available {@link LuceneSlice}, or {@code null} if exhausted
@@ -126,7 +142,7 @@ public LuceneSlice nextSlice(LuceneSlice prev) {
126142
}
127143
}
128144
}
129-
for (var ids : List.of(startedPositions, followedPositions)) {
145+
for (var ids : List.of(sliceHeads, stealableSlices)) {
130146
Integer nextId;
131147
while ((nextId = ids.poll()) != null) {
132148
var slice = slices.getAndSet(nextId, null);

0 commit comments

Comments
 (0)