Skip to content

Commit 6970bd2

Browse files
authored
ESQL: Aggressive release of shard contexts (#129454)
Keep better track of shard contexts using RefCounted, so they can be released more aggressively during operator processing. For example, during TopN, we can potentially release some contexts if they don't pass the limit filter. This is done in preparation of TopN fetch optimization, which will delay the fetching of additional columns to the data node coordinator, instead of doing it in each individual worker, thereby reducing IO. Since the node coordinator would need to maintain the shard contexts for a potentially longer duration, it is important we try to release what we can eariler. An even more advanced optimization is to delay fetching to the main cluster coordinator, but that would be more involved, since we need to first figure out how to transport the shard contexts between nodes. Summary of main changes: DocVector now maintains a RefCounted instance per shard. Things which can build or release DocVectors (e.g., LuceneSourceOperator, TopNOperator), can also hold RefCounted instances, so they can pass them to DocVector and also ensure contexts aren't released if they can still be potentially used later. Driver's main loop iteration (runSingleLoopIteration), now closes its operators even between different operator processing. This is extra aggressive, and was mostly done to improve testability. Added a couple of tests to TopNOperator and a new integration test EsqlTopNShardManagementIT, which uses the pausable plugin framework to check that TopNOperator releases things as early as possible..
1 parent e6cf223 commit 6970bd2

File tree

68 files changed

+1021
-230
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

68 files changed

+1021
-230
lines changed

benchmarks/src/main/java/org/elasticsearch/benchmark/compute/operator/ValuesSourceReaderBenchmark.java

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
import org.elasticsearch.compute.data.LongVector;
4141
import org.elasticsearch.compute.data.Page;
4242
import org.elasticsearch.compute.lucene.LuceneSourceOperator;
43+
import org.elasticsearch.compute.lucene.ShardRefCounted;
4344
import org.elasticsearch.compute.lucene.ValuesSourceReaderOperator;
4445
import org.elasticsearch.compute.operator.topn.TopNOperator;
4546
import org.elasticsearch.core.IOUtils;
@@ -477,6 +478,7 @@ private void setupPages() {
477478
pages.add(
478479
new Page(
479480
new DocVector(
481+
ShardRefCounted.ALWAYS_REFERENCED,
480482
blockFactory.newConstantIntBlockWith(0, end - begin).asVector(),
481483
blockFactory.newConstantIntBlockWith(ctx.ord, end - begin).asVector(),
482484
docs.build(),
@@ -512,7 +514,14 @@ record ItrAndOrd(PrimitiveIterator.OfInt itr, int ord) {}
512514
if (size >= BLOCK_LENGTH) {
513515
pages.add(
514516
new Page(
515-
new DocVector(blockFactory.newConstantIntVector(0, size), leafs.build(), docs.build(), null).asBlock()
517+
new DocVector(
518+
519+
ShardRefCounted.ALWAYS_REFERENCED,
520+
blockFactory.newConstantIntVector(0, size),
521+
leafs.build(),
522+
docs.build(),
523+
null
524+
).asBlock()
516525
)
517526
);
518527
docs = blockFactory.newIntVectorBuilder(BLOCK_LENGTH);
@@ -525,6 +534,8 @@ record ItrAndOrd(PrimitiveIterator.OfInt itr, int ord) {}
525534
pages.add(
526535
new Page(
527536
new DocVector(
537+
538+
ShardRefCounted.ALWAYS_REFERENCED,
528539
blockFactory.newConstantIntBlockWith(0, size).asVector(),
529540
leafs.build().asBlock().asVector(),
530541
docs.build(),
@@ -551,6 +562,8 @@ record ItrAndOrd(PrimitiveIterator.OfInt itr, int ord) {}
551562
pages.add(
552563
new Page(
553564
new DocVector(
565+
566+
ShardRefCounted.ALWAYS_REFERENCED,
554567
blockFactory.newConstantIntVector(0, 1),
555568
blockFactory.newConstantIntVector(next.ord, 1),
556569
blockFactory.newConstantIntVector(next.itr.nextInt(), 1),

docs/changelog/129454.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 129454
2+
summary: Aggressive release of shard contexts
3+
area: ES|QL
4+
type: enhancement
5+
issues: []

libs/core/src/main/java/org/elasticsearch/core/Releasables.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -202,6 +202,11 @@ public boolean equals(Object obj) {
202202
}
203203
}
204204

205+
/** Creates a {@link Releasable} that calls {@link RefCounted#decRef()} when closed. */
206+
public static Releasable fromRefCounted(RefCounted refCounted) {
207+
return () -> refCounted.decRef();
208+
}
209+
205210
private static class ReleaseOnce extends AtomicReference<Releasable> implements Releasable {
206211
ReleaseOnce(Releasable releasable) {
207212
super(releasable);

server/src/main/java/org/elasticsearch/search/internal/SearchContext.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,10 @@ public final void close() {
115115
closeFuture.onResponse(null);
116116
}
117117

118+
public final boolean isClosed() {
119+
return closeFuture.isDone();
120+
}
121+
118122
/**
119123
* Should be called before executing the main query and after all other parameters have been set.
120124
*/

test/framework/src/main/java/org/elasticsearch/search/MockSearchService.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -152,7 +152,12 @@ protected SearchContext createContext(
152152
@Override
153153
public SearchContext createSearchContext(ShardSearchRequest request, TimeValue timeout) throws IOException {
154154
SearchContext searchContext = super.createSearchContext(request, timeout);
155-
onPutContext.accept(searchContext.readerContext());
155+
try {
156+
onCreateSearchContext.accept(searchContext);
157+
} catch (Exception e) {
158+
searchContext.close();
159+
throw e;
160+
}
156161
searchContext.addReleasable(() -> onRemoveContext.accept(searchContext.readerContext()));
157162
return searchContext;
158163
}

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/DocBlock.java

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,8 @@
99

1010
import org.elasticsearch.common.io.stream.StreamOutput;
1111
import org.elasticsearch.common.unit.ByteSizeValue;
12+
import org.elasticsearch.compute.lucene.ShardRefCounted;
13+
import org.elasticsearch.core.RefCounted;
1214
import org.elasticsearch.core.ReleasableIterator;
1315
import org.elasticsearch.core.Releasables;
1416

@@ -17,7 +19,7 @@
1719
/**
1820
* Wrapper around {@link DocVector} to make a valid {@link Block}.
1921
*/
20-
public class DocBlock extends AbstractVectorBlock implements Block {
22+
public class DocBlock extends AbstractVectorBlock implements Block, RefCounted {
2123

2224
private final DocVector vector;
2325

@@ -96,6 +98,12 @@ public static class Builder implements Block.Builder {
9698
private final IntVector.Builder shards;
9799
private final IntVector.Builder segments;
98100
private final IntVector.Builder docs;
101+
private ShardRefCounted shardRefCounters = ShardRefCounted.ALWAYS_REFERENCED;
102+
103+
public Builder setShardRefCounted(ShardRefCounted shardRefCounters) {
104+
this.shardRefCounters = shardRefCounters;
105+
return this;
106+
}
99107

100108
private Builder(BlockFactory blockFactory, int estimatedSize) {
101109
IntVector.Builder shards = null;
@@ -183,7 +191,7 @@ public DocBlock build() {
183191
shards = this.shards.build();
184192
segments = this.segments.build();
185193
docs = this.docs.build();
186-
result = new DocVector(shards, segments, docs, null);
194+
result = new DocVector(shardRefCounters, shards, segments, docs, null);
187195
return result.asBlock();
188196
} finally {
189197
if (result == null) {

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/DocVector.java

Lines changed: 44 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -10,10 +10,13 @@
1010
import org.apache.lucene.util.IntroSorter;
1111
import org.apache.lucene.util.RamUsageEstimator;
1212
import org.elasticsearch.common.unit.ByteSizeValue;
13+
import org.elasticsearch.compute.lucene.ShardRefCounted;
14+
import org.elasticsearch.core.RefCounted;
1315
import org.elasticsearch.core.ReleasableIterator;
1416
import org.elasticsearch.core.Releasables;
1517

1618
import java.util.Objects;
19+
import java.util.function.Consumer;
1720

1821
/**
1922
* {@link Vector} where each entry references a lucene document.
@@ -48,8 +51,21 @@ public final class DocVector extends AbstractVector implements Vector {
4851
*/
4952
private int[] shardSegmentDocMapBackwards;
5053

51-
public DocVector(IntVector shards, IntVector segments, IntVector docs, Boolean singleSegmentNonDecreasing) {
54+
private final ShardRefCounted shardRefCounters;
55+
56+
public ShardRefCounted shardRefCounted() {
57+
return shardRefCounters;
58+
}
59+
60+
public DocVector(
61+
ShardRefCounted shardRefCounters,
62+
IntVector shards,
63+
IntVector segments,
64+
IntVector docs,
65+
Boolean singleSegmentNonDecreasing
66+
) {
5267
super(shards.getPositionCount(), shards.blockFactory());
68+
this.shardRefCounters = shardRefCounters;
5369
this.shards = shards;
5470
this.segments = segments;
5571
this.docs = docs;
@@ -65,10 +81,19 @@ public DocVector(IntVector shards, IntVector segments, IntVector docs, Boolean s
6581
);
6682
}
6783
blockFactory().adjustBreaker(BASE_RAM_BYTES_USED);
84+
85+
forEachShardRefCounter(RefCounted::mustIncRef);
6886
}
6987

70-
public DocVector(IntVector shards, IntVector segments, IntVector docs, int[] docMapForwards, int[] docMapBackwards) {
71-
this(shards, segments, docs, null);
88+
public DocVector(
89+
ShardRefCounted shardRefCounters,
90+
IntVector shards,
91+
IntVector segments,
92+
IntVector docs,
93+
int[] docMapForwards,
94+
int[] docMapBackwards
95+
) {
96+
this(shardRefCounters, shards, segments, docs, null);
7297
this.shardSegmentDocMapForwards = docMapForwards;
7398
this.shardSegmentDocMapBackwards = docMapBackwards;
7499
}
@@ -238,7 +263,7 @@ public DocVector filter(int... positions) {
238263
filteredShards = shards.filter(positions);
239264
filteredSegments = segments.filter(positions);
240265
filteredDocs = docs.filter(positions);
241-
result = new DocVector(filteredShards, filteredSegments, filteredDocs, null);
266+
result = new DocVector(shardRefCounters, filteredShards, filteredSegments, filteredDocs, null);
242267
return result;
243268
} finally {
244269
if (result == null) {
@@ -317,5 +342,20 @@ public void closeInternal() {
317342
segments,
318343
docs
319344
);
345+
forEachShardRefCounter(RefCounted::decRef);
346+
}
347+
348+
private void forEachShardRefCounter(Consumer<RefCounted> consumer) {
349+
switch (shards) {
350+
case ConstantIntVector constantIntVector -> consumer.accept(shardRefCounters.get(constantIntVector.getInt(0)));
351+
case ConstantNullVector ignored -> {
352+
// Noop
353+
}
354+
default -> {
355+
for (int i = 0; i < shards.getPositionCount(); i++) {
356+
consumer.accept(shardRefCounters.get(shards.getInt(i)));
357+
}
358+
}
359+
}
320360
}
321361
}

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneCountOperator.java

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import org.elasticsearch.compute.data.Page;
1919
import org.elasticsearch.compute.operator.DriverContext;
2020
import org.elasticsearch.compute.operator.SourceOperator;
21+
import org.elasticsearch.core.RefCounted;
2122
import org.elasticsearch.core.Releasables;
2223

2324
import java.io.IOException;
@@ -40,6 +41,7 @@ public class LuceneCountOperator extends LuceneOperator {
4041
private final LeafCollector leafCollector;
4142

4243
public static class Factory extends LuceneOperator.Factory {
44+
private final List<? extends RefCounted> shardRefCounters;
4345

4446
public Factory(
4547
List<? extends ShardContext> contexts,
@@ -58,11 +60,12 @@ public Factory(
5860
false,
5961
ScoreMode.COMPLETE_NO_SCORES
6062
);
63+
this.shardRefCounters = contexts;
6164
}
6265

6366
@Override
6467
public SourceOperator get(DriverContext driverContext) {
65-
return new LuceneCountOperator(driverContext.blockFactory(), sliceQueue, limit);
68+
return new LuceneCountOperator(shardRefCounters, driverContext.blockFactory(), sliceQueue, limit);
6669
}
6770

6871
@Override
@@ -71,8 +74,13 @@ public String describe() {
7174
}
7275
}
7376

74-
public LuceneCountOperator(BlockFactory blockFactory, LuceneSliceQueue sliceQueue, int limit) {
75-
super(blockFactory, PAGE_SIZE, sliceQueue);
77+
public LuceneCountOperator(
78+
List<? extends RefCounted> shardRefCounters,
79+
BlockFactory blockFactory,
80+
LuceneSliceQueue sliceQueue,
81+
int limit
82+
) {
83+
super(shardRefCounters, blockFactory, PAGE_SIZE, sliceQueue);
7684
this.remainingDocs = limit;
7785
this.leafCollector = new LeafCollector() {
7886
@Override

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneMaxFactory.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,7 @@ public final long evaluate(long value1, long value2) {
108108
abstract long bytesToLong(byte[] bytes);
109109
}
110110

111+
private final List<? extends ShardContext> contexts;
111112
private final String fieldName;
112113
private final NumberType numberType;
113114

@@ -130,13 +131,14 @@ public LuceneMaxFactory(
130131
false,
131132
ScoreMode.COMPLETE_NO_SCORES
132133
);
134+
this.contexts = contexts;
133135
this.fieldName = fieldName;
134136
this.numberType = numberType;
135137
}
136138

137139
@Override
138140
public SourceOperator get(DriverContext driverContext) {
139-
return new LuceneMinMaxOperator(driverContext.blockFactory(), sliceQueue, fieldName, numberType, limit, Long.MIN_VALUE);
141+
return new LuceneMinMaxOperator(contexts, driverContext.blockFactory(), sliceQueue, fieldName, numberType, limit, Long.MIN_VALUE);
140142
}
141143

142144
@Override

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneMinFactory.java

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
import org.elasticsearch.compute.data.BlockFactory;
1717
import org.elasticsearch.compute.operator.DriverContext;
1818
import org.elasticsearch.compute.operator.SourceOperator;
19+
import org.elasticsearch.core.RefCounted;
1920
import org.elasticsearch.search.MultiValueMode;
2021

2122
import java.io.IOException;
@@ -108,6 +109,7 @@ public final long evaluate(long value1, long value2) {
108109
abstract long bytesToLong(byte[] bytes);
109110
}
110111

112+
private final List<? extends RefCounted> shardRefCounters;
111113
private final String fieldName;
112114
private final NumberType numberType;
113115

@@ -130,13 +132,22 @@ public LuceneMinFactory(
130132
false,
131133
ScoreMode.COMPLETE_NO_SCORES
132134
);
135+
this.shardRefCounters = contexts;
133136
this.fieldName = fieldName;
134137
this.numberType = numberType;
135138
}
136139

137140
@Override
138141
public SourceOperator get(DriverContext driverContext) {
139-
return new LuceneMinMaxOperator(driverContext.blockFactory(), sliceQueue, fieldName, numberType, limit, Long.MAX_VALUE);
142+
return new LuceneMinMaxOperator(
143+
shardRefCounters,
144+
driverContext.blockFactory(),
145+
sliceQueue,
146+
fieldName,
147+
numberType,
148+
limit,
149+
Long.MAX_VALUE
150+
);
140151
}
141152

142153
@Override

0 commit comments

Comments
 (0)