Skip to content

Commit a635834

Browse files
authored
ESQL: Use a little less memory in DocVector (#133319)
Prevously we made a unique `ShardRefCounted` for each `DocVector`. This shares the `ShardRefCounted`s between all `DocVector`s that are in the same shard.
1 parent 818670d commit a635834

File tree

6 files changed

+41
-11
lines changed

6 files changed

+41
-11
lines changed

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneOperator.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,10 @@ public abstract class LuceneOperator extends SourceOperator {
7070
private int sliceIndex;
7171

7272
private LuceneScorer currentScorer;
73+
/**
74+
* The {@link ShardRefCounted} for the current scorer.
75+
*/
76+
private ShardRefCounted.Single currentScorerShardRefCounted;
7377

7478
long processingNanos;
7579
int pagesEmitted;
@@ -172,6 +176,10 @@ LuceneScorer getCurrentOrLoadNextScorer() {
172176
}
173177
processedSlices++;
174178
processedShards.add(currentSlice.shardContext().shardIdentifier());
179+
int shardId = currentSlice.shardContext().index();
180+
if (currentScorerShardRefCounted == null || currentScorerShardRefCounted.index() != shardId) {
181+
currentScorerShardRefCounted = new ShardRefCounted.Single(shardId, shardContextCounters.get(shardId));
182+
}
175183
}
176184
final PartialLeafReaderContext partialLeaf = currentSlice.getLeaf(sliceIndex++);
177185
logger.trace("Starting {}", partialLeaf);
@@ -194,6 +202,13 @@ LuceneScorer getCurrentOrLoadNextScorer() {
194202
return currentScorer;
195203
}
196204

205+
/**
206+
* The {@link ShardRefCounted} for the current scorer.
207+
*/
208+
ShardRefCounted currentScorerShardRefCounted() {
209+
return currentScorerShardRefCounted;
210+
}
211+
197212
/**
198213
* Wraps a {@link BulkScorer} with shard information
199214
*/

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneSourceOperator.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -334,8 +334,7 @@ public Page getCheckedOutput() throws IOException {
334334
docs = buildDocsVector(currentPagePos);
335335
docsBuilder = blockFactory.newIntVectorBuilder(Math.min(remainingDocs, maxPageSize));
336336
int b = 0;
337-
ShardRefCounted refCounted = ShardRefCounted.single(shardId, shardContextCounters.get(shardId));
338-
blocks[b++] = new DocVector(refCounted, shard, leaf, docs, true).asBlock();
337+
blocks[b++] = new DocVector(currentScorerShardRefCounted(), shard, leaf, docs, true).asBlock();
339338
shard = null;
340339
leaf = null;
341340
docs = null;

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneTopNSourceOperator.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -114,7 +114,6 @@ public String describe() {
114114
private int offset = 0;
115115

116116
private PerShardCollector perShardCollector;
117-
private final List<? extends ShardContext> contexts;
118117
private final List<SortBuilder<?>> sorts;
119118
private final int limit;
120119
private final boolean needsScore;
@@ -129,7 +128,6 @@ public LuceneTopNSourceOperator(
129128
boolean needsScore
130129
) {
131130
super(contexts, blockFactory, maxPageSize, sliceQueue);
132-
this.contexts = contexts;
133131
this.sorts = sorts;
134132
this.limit = limit;
135133
this.needsScore = needsScore;
@@ -241,7 +239,7 @@ private Page emit(boolean startEmitting) {
241239
shard = blockFactory.newConstantIntBlockWith(shardId, size);
242240
segments = currentSegmentBuilder.build();
243241
docs = currentDocsBuilder.build();
244-
ShardRefCounted shardRefCounted = ShardRefCounted.single(shardId, contexts.get(shardId));
242+
ShardRefCounted shardRefCounted = new ShardRefCounted.Single(shardId, shardContextCounters.get(shardId));
245243
docBlock = new DocVector(shardRefCounted, shard.asVector(), segments, docs, null).asBlock();
246244
shard = null;
247245
segments = null;

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/ShardRefCounted.java

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -24,17 +24,18 @@ static ShardRefCounted fromList(List<? extends RefCounted> refCounters) {
2424
}
2525

2626
static ShardRefCounted fromShardContext(ShardContext shardContext) {
27-
return single(shardContext.index(), shardContext);
27+
return new Single(shardContext.index(), shardContext);
2828
}
2929

30-
static ShardRefCounted single(int index, RefCounted refCounted) {
31-
return shardId -> {
30+
ShardRefCounted ALWAYS_REFERENCED = shardId -> RefCounted.ALWAYS_REFERENCED;
31+
32+
record Single(int index, RefCounted refCounted) implements ShardRefCounted {
33+
@Override
34+
public RefCounted get(int shardId) {
3235
if (shardId != index) {
3336
throw new IllegalArgumentException("Invalid shardId: " + shardId + ", expected: " + index);
3437
}
3538
return refCounted;
36-
};
39+
}
3740
}
38-
39-
ShardRefCounted ALWAYS_REFERENCED = shardId -> RefCounted.ALWAYS_REFERENCED;
4041
}

x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneSourceOperatorTests.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@
6363
import static org.hamcrest.Matchers.lessThan;
6464
import static org.hamcrest.Matchers.lessThanOrEqualTo;
6565
import static org.hamcrest.Matchers.matchesRegex;
66+
import static org.hamcrest.Matchers.sameInstance;
6667

6768
public class LuceneSourceOperatorTests extends SourceOperatorTestCase {
6869
private static final MappedFieldType S_FIELD = new NumberFieldMapper.NumberFieldType("s", NumberFieldMapper.NumberType.LONG);
@@ -373,6 +374,7 @@ private void testSimple(DriverContext ctx, DataPartitioning partitioning, int nu
373374
for (Page page : results) {
374375
assertThat(page.getPositionCount(), lessThanOrEqualTo(factory.maxPageSize()));
375376
}
377+
assertAllRefCountedSameInstance(results);
376378

377379
for (Page page : results) {
378380
LongBlock sBlock = page.getBlock(initialBlockIndex(page));
@@ -483,4 +485,17 @@ public boolean hasReferences() {
483485
return true;
484486
}
485487
}
488+
489+
static void assertAllRefCountedSameInstance(List<Page> results) {
490+
ShardRefCounted firstRefCounted = null;
491+
for (Page page : results) {
492+
DocBlock docs = page.getBlock(0);
493+
ShardRefCounted refCounted = docs.asVector().shardRefCounted();
494+
if (firstRefCounted == null) {
495+
firstRefCounted = refCounted;
496+
} else {
497+
assertThat(refCounted, sameInstance(firstRefCounted));
498+
}
499+
}
500+
}
486501
}

x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneTopNSourceOperatorTests.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@
4848
import java.util.Optional;
4949
import java.util.function.Function;
5050

51+
import static org.elasticsearch.compute.lucene.LuceneSourceOperatorTests.assertAllRefCountedSameInstance;
5152
import static org.hamcrest.Matchers.equalTo;
5253
import static org.hamcrest.Matchers.hasSize;
5354
import static org.hamcrest.Matchers.matchesRegex;
@@ -207,6 +208,7 @@ protected void testSimple(DriverContext ctx, int size, int limit) {
207208
assertThat(sBlock.getLong(sBlock.getFirstValueIndex(p)), equalTo(expectedS++));
208209
}
209210
}
211+
assertAllRefCountedSameInstance(results);
210212
int pages = (int) Math.ceil((float) Math.min(size, limit) / factory.maxPageSize());
211213
assertThat(results, hasSize(pages));
212214
}

0 commit comments

Comments
 (0)