Skip to content

Commit 7d678a3

Browse files
authored
Add query heads priority to SliceQueue (elastic#133245)
With query and tags, SliceQueue will contain more slices (see elastic#132512). This change introduces an additional priority for query heads, allowing Drivers to pull slices from the same query and segment first. This minimizes the overhead of switching between queries and segments. Relates elastic#132774
1 parent 2b65b0f commit 7d678a3

File tree

4 files changed

+83
-39
lines changed

4 files changed

+83
-39
lines changed

docs/changelog/133245.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 133245
2+
summary: Add query heads priority to `SliceQueue`
3+
area: ES|QL
4+
type: enhancement
5+
issues: []

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneSlice.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
*/
1717
public record LuceneSlice(
1818
int slicePosition,
19+
boolean queryHead,
1920
ShardContext shardContext,
2021
List<PartialLeafReaderContext> leaves,
2122
Weight weight,

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneSliceQueue.java

Lines changed: 27 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -83,13 +83,21 @@ public record QueryAndTags(Query query, List<Object> tags) {}
8383
private final Map<String, PartitioningStrategy> partitioningStrategies;
8484

8585
private final AtomicReferenceArray<LuceneSlice> slices;
86+
/**
87+
* Queue of slice IDs that are the primary entry point for a new query.
88+
* A driver should prioritize polling from this queue after failing to get a sequential
89+
* slice (the query/segment affinity). This ensures that threads start work on fresh,
90+
* independent query before stealing segments from other queries.
91+
*/
92+
private final Queue<Integer> queryHeads;
93+
8694
/**
8795
* Queue of slice IDs that are the primary entry point for a new group of segments.
8896
* A driver should prioritize polling from this queue after failing to get a sequential
8997
* slice (the segment affinity). This ensures that threads start work on fresh,
9098
* independent segment groups before resorting to work stealing.
9199
*/
92-
private final Queue<Integer> sliceHeads;
100+
private final Queue<Integer> segmentHeads;
93101

94102
/**
95103
* Queue of slice IDs that are not the primary entry point for a segment group.
@@ -106,11 +114,14 @@ public record QueryAndTags(Query query, List<Object> tags) {}
106114
slices.set(i, sliceList.get(i));
107115
}
108116
this.partitioningStrategies = partitioningStrategies;
109-
this.sliceHeads = ConcurrentCollections.newQueue();
117+
this.queryHeads = ConcurrentCollections.newQueue();
118+
this.segmentHeads = ConcurrentCollections.newQueue();
110119
this.stealableSlices = ConcurrentCollections.newQueue();
111120
for (LuceneSlice slice : sliceList) {
112-
if (slice.getLeaf(0).minDoc() == 0) {
113-
sliceHeads.add(slice.slicePosition());
121+
if (slice.queryHead()) {
122+
queryHeads.add(slice.slicePosition());
123+
} else if (slice.getLeaf(0).minDoc() == 0) {
124+
segmentHeads.add(slice.slicePosition());
114125
} else {
115126
stealableSlices.add(slice.slicePosition());
116127
}
@@ -120,12 +131,14 @@ public record QueryAndTags(Query query, List<Object> tags) {}
120131
/**
121132
* Retrieves the next available {@link LuceneSlice} for processing.
122133
* <p>
123-
* This method implements a three-tiered strategy to minimize the overhead of switching between segments:
134+
* This method implements a four-tiered strategy to minimize the overhead of switching between queries/segments:
124135
* 1. If a previous slice is provided, it first attempts to return the next sequential slice.
125-
* This keeps a thread working on the same segments, minimizing the overhead of segment switching.
126-
* 2. If affinity fails, it returns a slice from the {@link #sliceHeads} queue, which is an entry point for
127-
* a new, independent group of segments, allowing the calling Driver to work on a fresh set of segments.
128-
* 3. If the {@link #sliceHeads} queue is exhausted, it "steals" a slice
136+
* This keeps a thread working on the same query and same segment, minimizing the overhead of query/segment switching.
137+
* 2. If affinity fails, it returns a slice from the {@link #queryHeads} queue, which is an entry point for
138+
* a new query, allowing the calling Driver to work on a fresh query with a new set of segments.
139+
* 3. If the {@link #queryHeads} queue is exhausted, it returns a slice from the {@link #segmentHeads} queue of other queries,
140+
* which is an entry point for a new, independent group of segments, allowing the calling Driver to work on a fresh set of segments.
141+
* 4. If the {@link #segmentHeads} queue is exhausted, it "steals" a slice
129142
* from the {@link #stealableSlices} queue. This fallback ensures all threads remain utilized.
130143
*
131144
* @param prev the previously returned {@link LuceneSlice}, or {@code null} if starting
@@ -142,7 +155,7 @@ public LuceneSlice nextSlice(LuceneSlice prev) {
142155
}
143156
}
144157
}
145-
for (var ids : List.of(sliceHeads, stealableSlices)) {
158+
for (var ids : List.of(queryHeads, segmentHeads, stealableSlices)) {
146159
Integer nextId;
147160
while ((nextId = ids.poll()) != null) {
148161
var slice = slices.getAndSet(nextId, null);
@@ -209,9 +222,12 @@ public static LuceneSliceQueue create(
209222
partitioningStrategies.put(ctx.shardIdentifier(), partitioning);
210223
List<List<PartialLeafReaderContext>> groups = partitioning.groups(ctx.searcher(), taskConcurrency);
211224
Weight weight = weight(ctx, query, scoreMode);
225+
boolean queryHead = true;
212226
for (List<PartialLeafReaderContext> group : groups) {
213227
if (group.isEmpty() == false) {
214-
slices.add(new LuceneSlice(nextSliceId++, ctx, group, weight, queryAndExtra.tags));
228+
final int slicePosition = nextSliceId++;
229+
slices.add(new LuceneSlice(slicePosition, queryHead, ctx, group, weight, queryAndExtra.tags));
230+
queryHead = false;
215231
}
216232
}
217233
}

x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneSliceQueueTests.java

Lines changed: 50 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -50,18 +50,28 @@ public void testBasics() {
5050
LeafReaderContext leaf2 = new MockLeafReader(1000).getContext();
5151
LeafReaderContext leaf3 = new MockLeafReader(1000).getContext();
5252
LeafReaderContext leaf4 = new MockLeafReader(1000).getContext();
53-
var slice1 = new LuceneSlice(0, null, List.of(new PartialLeafReaderContext(leaf1, 0, 10)), null, null);
54-
55-
var slice2 = new LuceneSlice(1, null, List.of(new PartialLeafReaderContext(leaf2, 0, 10)), null, null);
56-
var slice3 = new LuceneSlice(2, null, List.of(new PartialLeafReaderContext(leaf2, 10, 20)), null, null);
57-
58-
var slice4 = new LuceneSlice(3, null, List.of(new PartialLeafReaderContext(leaf3, 0, 20)), null, null);
59-
var slice5 = new LuceneSlice(4, null, List.of(new PartialLeafReaderContext(leaf3, 10, 20)), null, null);
60-
var slice6 = new LuceneSlice(5, null, List.of(new PartialLeafReaderContext(leaf3, 20, 30)), null, null);
61-
62-
var slice7 = new LuceneSlice(6, null, List.of(new PartialLeafReaderContext(leaf4, 0, 10)), null, null);
63-
var slice8 = new LuceneSlice(7, null, List.of(new PartialLeafReaderContext(leaf4, 10, 20)), null, null);
64-
List<LuceneSlice> sliceList = List.of(slice1, slice2, slice3, slice4, slice5, slice6, slice7, slice8);
53+
List<Object> query1 = List.of("1");
54+
List<Object> query2 = List.of("q2");
55+
List<LuceneSlice> sliceList = List.of(
56+
// query1: new segment
57+
new LuceneSlice(0, true, null, List.of(new PartialLeafReaderContext(leaf1, 0, 10)), null, query1),
58+
new LuceneSlice(1, false, null, List.of(new PartialLeafReaderContext(leaf2, 0, 10)), null, query1),
59+
new LuceneSlice(2, false, null, List.of(new PartialLeafReaderContext(leaf2, 10, 20)), null, query1),
60+
// query1: new segment
61+
new LuceneSlice(3, false, null, List.of(new PartialLeafReaderContext(leaf3, 0, 20)), null, query1),
62+
new LuceneSlice(4, false, null, List.of(new PartialLeafReaderContext(leaf3, 10, 20)), null, query1),
63+
new LuceneSlice(5, false, null, List.of(new PartialLeafReaderContext(leaf3, 20, 30)), null, query1),
64+
// query1: new segment
65+
new LuceneSlice(6, false, null, List.of(new PartialLeafReaderContext(leaf4, 0, 10)), null, query1),
66+
new LuceneSlice(7, false, null, List.of(new PartialLeafReaderContext(leaf4, 10, 20)), null, query1),
67+
// query2: new segment
68+
new LuceneSlice(8, true, null, List.of(new PartialLeafReaderContext(leaf2, 0, 10)), null, query2),
69+
new LuceneSlice(9, false, null, List.of(new PartialLeafReaderContext(leaf2, 10, 20)), null, query2),
70+
// query1: new segment
71+
new LuceneSlice(10, false, null, List.of(new PartialLeafReaderContext(leaf3, 0, 20)), null, query2),
72+
new LuceneSlice(11, false, null, List.of(new PartialLeafReaderContext(leaf3, 10, 20)), null, query2),
73+
new LuceneSlice(12, false, null, List.of(new PartialLeafReaderContext(leaf3, 20, 30)), null, query2)
74+
);
6575
// single driver
6676
{
6777
LuceneSliceQueue queue = new LuceneSliceQueue(sliceList, Map.of());
@@ -72,32 +82,43 @@ public void testBasics() {
7282
}
7383
assertNull(queue.nextSlice(randomBoolean() ? last : null));
7484
}
75-
// two drivers
85+
// three drivers
7686
{
7787
LuceneSliceQueue queue = new LuceneSliceQueue(sliceList, Map.of());
88+
7889
LuceneSlice first = null;
7990
LuceneSlice second = null;
91+
LuceneSlice third = null;
8092
first = queue.nextSlice(first);
81-
assertEquals(slice1, first);
93+
assertEquals(sliceList.get(0), first);
8294
first = queue.nextSlice(first);
83-
assertEquals(slice2, first);
95+
assertEquals(sliceList.get(1), first);
8496

8597
second = queue.nextSlice(second);
86-
assertEquals(slice4, second);
98+
assertEquals(sliceList.get(8), second);
8799
second = queue.nextSlice(second);
88-
assertEquals(slice5, second);
100+
assertEquals(sliceList.get(9), second);
89101

90102
first = queue.nextSlice(first);
91-
assertEquals(slice3, first);
92-
second = queue.nextSlice(second);
93-
assertEquals(slice6, second);
103+
assertEquals(sliceList.get(2), first);
104+
third = queue.nextSlice(third);
105+
assertEquals(sliceList.get(3), third);
94106
first = queue.nextSlice(first);
95-
assertEquals(slice7, first);
96-
97-
assertEquals(slice8, queue.nextSlice(randomFrom(first, second)));
107+
assertEquals(sliceList.get(6), first);
98108

99-
assertNull(queue.nextSlice(first));
100-
assertNull(queue.nextSlice(second));
109+
first = queue.nextSlice(first);
110+
assertEquals(sliceList.get(7), first);
111+
third = queue.nextSlice(third);
112+
assertEquals(sliceList.get(4), third);
113+
first = queue.nextSlice(first);
114+
assertEquals(sliceList.get(10), first);
115+
first = queue.nextSlice(first);
116+
assertEquals(sliceList.get(11), first);
117+
second = queue.nextSlice(second);
118+
assertEquals(sliceList.get(5), second);
119+
second = queue.nextSlice(second);
120+
assertEquals(sliceList.get(12), second);
121+
assertNull(null, queue.nextSlice(randomFrom(sliceList)));
101122
}
102123
}
103124

@@ -108,13 +129,14 @@ public void testRandom() throws Exception {
108129
for (int shard = 0; shard < numShards; shard++) {
109130
int numSegments = randomIntBetween(1, 10);
110131
for (int segment = 0; segment < numSegments; segment++) {
111-
int numSlices = randomBoolean() ? 1 : between(2, 5);
132+
int numSlices = between(10, 50);
112133
LeafReaderContext leafContext = new MockLeafReader(randomIntBetween(1000, 2000)).getContext();
113134
for (int i = 0; i < numSlices; i++) {
114135
final int minDoc = i * 10;
115136
final int maxDoc = minDoc + 10;
116137
LuceneSlice slice = new LuceneSlice(
117138
slicePosition++,
139+
false,
118140
mock(ShardContext.class),
119141
List.of(new PartialLeafReaderContext(leafContext, minDoc, maxDoc)),
120142
null,
@@ -147,8 +169,8 @@ public void testRandom() throws Exception {
147169
var currentLeaf = processedSlices.get(i).getLeaf(0);
148170
for (int p = 0; p < i; p++) {
149171
PartialLeafReaderContext prevLeaf = processedSlices.get(p).getLeaf(0);
150-
if (prevLeaf == currentLeaf) {
151-
assertThat(prevLeaf.minDoc(), Matchers.lessThanOrEqualTo(processedSlices.get(i).leaves().getFirst().maxDoc()));
172+
if (prevLeaf.leafReaderContext() == currentLeaf.leafReaderContext()) {
173+
assertThat(prevLeaf.maxDoc(), Matchers.lessThanOrEqualTo(currentLeaf.minDoc()));
152174
}
153175
}
154176
}

0 commit comments

Comments
 (0)