Skip to content

Commit b3f46c6

Browse files
committed
Minimize segment switching in LuceneSliceQueue
1 parent df5cd30 commit b3f46c6

File tree

5 files changed

+359
-12
lines changed

5 files changed

+359
-12
lines changed

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneOperator.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -165,7 +165,7 @@ LuceneScorer getCurrentOrLoadNextScorer() {
165165
while (currentScorer == null || currentScorer.isDone()) {
166166
if (currentSlice == null || sliceIndex >= currentSlice.numLeaves()) {
167167
sliceIndex = 0;
168-
currentSlice = sliceQueue.nextSlice();
168+
currentSlice = sliceQueue.nextSlice(currentSlice);
169169
if (currentSlice == null) {
170170
doneCollecting = true;
171171
return null;

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneSlice.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,13 @@
1414
/**
1515
* Holds a list of multiple partial Lucene segments
1616
*/
17-
public record LuceneSlice(ShardContext shardContext, List<PartialLeafReaderContext> leaves, Weight weight, List<Object> tags) {
17+
public record LuceneSlice(
18+
int slicePosition,
19+
ShardContext shardContext,
20+
List<PartialLeafReaderContext> leaves,
21+
Weight weight,
22+
List<Object> tags
23+
) {
1824
int numLeaves() {
1925
return leaves.size();
2026
}

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneSliceQueue.java

Lines changed: 63 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
import org.elasticsearch.common.io.stream.StreamInput;
1717
import org.elasticsearch.common.io.stream.StreamOutput;
1818
import org.elasticsearch.common.io.stream.Writeable;
19+
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
1920
import org.elasticsearch.core.Nullable;
2021

2122
import java.io.IOException;
@@ -27,7 +28,7 @@
2728
import java.util.List;
2829
import java.util.Map;
2930
import java.util.Queue;
30-
import java.util.concurrent.ConcurrentLinkedQueue;
31+
import java.util.concurrent.atomic.AtomicReferenceArray;
3132
import java.util.function.Function;
3233

3334
/**
@@ -77,18 +78,63 @@ public record QueryAndTags(Query query, List<Object> tags) {}
7778
public static final int MAX_SEGMENTS_PER_SLICE = 5; // copied from IndexSearcher
7879

7980
private final int totalSlices;
80-
private final Queue<LuceneSlice> slices;
81+
private final AtomicReferenceArray<LuceneSlice> slices;
82+
private final Queue<Integer> startedPositions;
83+
private final Queue<Integer> followedPositions;
8184
private final Map<String, PartitioningStrategy> partitioningStrategies;
8285

83-
private LuceneSliceQueue(List<LuceneSlice> slices, Map<String, PartitioningStrategy> partitioningStrategies) {
84-
this.totalSlices = slices.size();
85-
this.slices = new ConcurrentLinkedQueue<>(slices);
86+
LuceneSliceQueue(List<LuceneSlice> sliceList, Map<String, PartitioningStrategy> partitioningStrategies) {
87+
this.totalSlices = sliceList.size();
88+
this.slices = new AtomicReferenceArray<>(sliceList.size());
89+
for (int i = 0; i < sliceList.size(); i++) {
90+
slices.set(i, sliceList.get(i));
91+
}
8692
this.partitioningStrategies = partitioningStrategies;
93+
this.startedPositions = ConcurrentCollections.newQueue();
94+
this.followedPositions = ConcurrentCollections.newQueue();
95+
for (LuceneSlice slice : sliceList) {
96+
var leaf = slice.leaves().getFirst();
97+
if (leaf.minDoc() == 0) {
98+
startedPositions.add(slice.slicePosition());
99+
} else {
100+
followedPositions.add(slice.slicePosition());
101+
}
102+
}
87103
}
88104

105+
/**
106+
* Retrieves the next available {@link LuceneSlice} for processing.
107+
* If a previous slice is provided, this method first attempts to return the next sequential slice to maintain segment affinity
108+
* and minimize the cost of switching between segments.
109+
* <p>
110+
* If no sequential slice is available, it returns the next slice from the {@code startedPositions} queue, which starts a new
111+
* group of segments. If all started positions are exhausted, it retrieves a slice from the {@code followedPositions} queue,
112+
* enabling work stealing.
113+
*
114+
* @param prev the previously returned {@link LuceneSlice}, or {@code null} if starting
115+
* @return the next available {@link LuceneSlice}, or {@code null} if exhausted
116+
*/
89117
@Nullable
90-
public LuceneSlice nextSlice() {
91-
return slices.poll();
118+
public LuceneSlice nextSlice(LuceneSlice prev) {
119+
if (prev != null) {
120+
final int nextId = prev.slicePosition() + 1;
121+
if (nextId < totalSlices) {
122+
var slice = slices.getAndSet(nextId, null);
123+
if (slice != null) {
124+
return slice;
125+
}
126+
}
127+
}
128+
for (var preferredIndices : List.of(startedPositions, followedPositions)) {
129+
Integer nextId;
130+
while ((nextId = preferredIndices.poll()) != null) {
131+
var slice = slices.getAndSet(nextId, null);
132+
if (slice != null) {
133+
return slice;
134+
}
135+
}
136+
}
137+
return null;
92138
}
93139

94140
public int totalSlices() {
@@ -103,7 +149,14 @@ public Map<String, PartitioningStrategy> partitioningStrategies() {
103149
}
104150

105151
public Collection<String> remainingShardsIdentifiers() {
106-
return slices.stream().map(slice -> slice.shardContext().shardIdentifier()).toList();
152+
List<String> remaining = new ArrayList<>(slices.length());
153+
for (int i = 0; i < slices.length(); i++) {
154+
LuceneSlice slice = slices.get(i);
155+
if (slice != null) {
156+
remaining.add(slice.shardContext().shardIdentifier());
157+
}
158+
}
159+
return remaining;
107160
}
108161

109162
public static LuceneSliceQueue create(
@@ -117,6 +170,7 @@ public static LuceneSliceQueue create(
117170
List<LuceneSlice> slices = new ArrayList<>();
118171
Map<String, PartitioningStrategy> partitioningStrategies = new HashMap<>(contexts.size());
119172

173+
int nextSliceId = 0;
120174
for (ShardContext ctx : contexts) {
121175
for (QueryAndTags queryAndExtra : queryFunction.apply(ctx)) {
122176
var scoreMode = scoreModeFunction.apply(ctx);
@@ -140,7 +194,7 @@ public static LuceneSliceQueue create(
140194
Weight weight = weight(ctx, query, scoreMode);
141195
for (List<PartialLeafReaderContext> group : groups) {
142196
if (group.isEmpty() == false) {
143-
slices.add(new LuceneSlice(ctx, group, weight, queryAndExtra.tags));
197+
slices.add(new LuceneSlice(nextSliceId++, ctx, group, weight, queryAndExtra.tags));
144198
}
145199
}
146200
}

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/TimeSeriesSourceOperator.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,7 @@ public Page getCheckedOutput() throws IOException {
9797
long startInNanos = System.nanoTime();
9898
try {
9999
if (iterator == null) {
100-
var slice = sliceQueue.nextSlice();
100+
var slice = sliceQueue.nextSlice(null);
101101
if (slice == null) {
102102
doneCollecting = true;
103103
return null;

0 commit comments

Comments
 (0)