Skip to content

Commit e621964

Browse files
committed
CR comments
1 parent 2c25218 commit e621964

File tree

6 files changed

+32
-17
lines changed

6 files changed

+32
-17
lines changed

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneOperator.java

Lines changed: 1 addition & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -138,11 +138,7 @@ public final Page getOutput() {
138138
protected abstract Page getCheckedOutput() throws IOException;
139139

140140
@Override
141-
public void close() {
142-
if (currentScorer != null) {
143-
currentScorer.shardContext().decRef();
144-
}
145-
}
141+
public void close() {}
146142

147143
LuceneScorer getCurrentOrLoadNextScorer() {
148144
while (currentScorer == null || currentScorer.isDone()) {
@@ -165,11 +161,7 @@ LuceneScorer getCurrentOrLoadNextScorer() {
165161
) {
166162
final Weight weight = currentSlice.weight();
167163
processedQueries.add(weight.getQuery());
168-
var previousScorer = currentScorer;
169164
currentScorer = new LuceneScorer(currentSlice.shardContext(), weight, currentSlice.tags(), leaf);
170-
if (previousScorer != null) {
171-
previousScorer.shardContext().decRef();
172-
}
173165
}
174166
assert currentScorer.maxPosition <= partialLeaf.maxDoc() : currentScorer.maxPosition + ">" + partialLeaf.maxDoc();
175167
currentScorer.maxPosition = partialLeaf.maxDoc();
@@ -196,7 +188,6 @@ static final class LuceneScorer {
196188
private Thread executingThread;
197189

198190
LuceneScorer(ShardContext shardContext, Weight weight, List<Object> tags, LeafReaderContext leafReaderContext) {
199-
shardContext.incRef();
200191
this.shardContext = shardContext;
201192
this.weight = weight;
202193
this.tags = tags;

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/topn/ResultBuilder.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@
1111
import org.elasticsearch.compute.data.Block;
1212
import org.elasticsearch.compute.data.BlockFactory;
1313
import org.elasticsearch.compute.data.ElementType;
14+
import org.elasticsearch.core.Nullable;
15+
import org.elasticsearch.core.RefCounted;
1416
import org.elasticsearch.core.Releasable;
1517

1618
/**
@@ -33,6 +35,12 @@ interface ResultBuilder extends Releasable {
3335
*/
3436
void decodeValue(BytesRef values);
3537

38+
/**
39+
* Sets the RefCounted value, which was extracted by {@link ValueExtractor#getRefCountedForShard(int)}. By default, this is a no-op,
40+
* since most builders do not the shard ref counter.
41+
*/
42+
default void setNextRefCounted(@Nullable RefCounted nextRefCounted) { /* no-op */ }
43+
3644
/**
3745
* Build the result block.
3846
*/

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/topn/ResultBuilderForDoc.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,8 @@ public void decodeKey(BytesRef keys) {
4242
throw new AssertionError("_doc can't be a key");
4343
}
4444

45-
void setNextRefCounted(RefCounted nextRefCounted) {
45+
@Override
46+
public void setNextRefCounted(RefCounted nextRefCounted) {
4647
this.nextRefCounted = nextRefCounted;
4748
// Since rows can be closed before build is called, we need to increment the ref count to ensure the shard context isn't closed.
4849
this.nextRefCounted.mustIncRef();

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/topn/TopNOperator.java

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -215,8 +215,9 @@ private void writeKey(int position, Row row) {
215215

216216
private void writeValues(int position, Row destination) {
217217
for (ValueExtractor e : valueExtractors) {
218-
if (e instanceof ValueExtractorForDoc fd) {
219-
destination.setShardRefCountersAndShard(fd.vector().shardRefCounted().get(fd.vector().shards().getInt(position)));
218+
var refCounted = e.getRefCountedForShard(position);
219+
if (refCounted != null) {
220+
destination.setShardRefCountersAndShard(refCounted);
220221
}
221222
e.writeValue(destination.values, position);
222223
}
@@ -486,10 +487,7 @@ private Iterator<Page> toPages() {
486487

487488
BytesRef values = row.values.bytesRefView();
488489
for (ResultBuilder builder : builders) {
489-
if (builder instanceof ResultBuilderForDoc fd) {
490-
assert row.shardRefCounter != null : "shardRefCounter must be set for ResultBuilderForDoc";
491-
fd.setNextRefCounted(row.shardRefCounter);
492-
}
490+
builder.setNextRefCounted(row.shardRefCounter);
493491
builder.decodeValue(values);
494492
}
495493
if (values.length != 0) {

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/topn/ValueExtractor.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,13 +18,24 @@
1818
import org.elasticsearch.compute.data.IntBlock;
1919
import org.elasticsearch.compute.data.LongBlock;
2020
import org.elasticsearch.compute.operator.BreakingBytesRefBuilder;
21+
import org.elasticsearch.core.Nullable;
22+
import org.elasticsearch.core.RefCounted;
2123

2224
/**
2325
* Extracts values into a {@link BreakingBytesRefBuilder}.
2426
*/
2527
interface ValueExtractor {
2628
void writeValue(BreakingBytesRefBuilder values, int position);
2729

30+
/**
31+
* This should return a non-null value if the row is supposed to hold a temporary reference to a shard (including incrementing and
32+
* decrementing it) in between encoding and decoding the row values.
33+
*/
34+
@Nullable
35+
default RefCounted getRefCountedForShard(int position) {
36+
return null;
37+
}
38+
2839
static ValueExtractor extractorFor(ElementType elementType, TopNEncoder encoder, boolean inKey, Block block) {
2940
if (false == (elementType == block.elementType() || ElementType.NULL == block.elementType())) {
3041
// While this maybe should be an IllegalArgumentException, it's important to throw an exception that causes a 500 response.

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/topn/ValueExtractorForDoc.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,10 +9,16 @@
99

1010
import org.elasticsearch.compute.data.DocVector;
1111
import org.elasticsearch.compute.operator.BreakingBytesRefBuilder;
12+
import org.elasticsearch.core.RefCounted;
1213

1314
class ValueExtractorForDoc implements ValueExtractor {
1415
private final DocVector vector;
1516

17+
@Override
18+
public RefCounted getRefCountedForShard(int position) {
19+
return vector().shardRefCounted().get(vector().shards().getInt(position));
20+
}
21+
1622
ValueExtractorForDoc(TopNEncoder encoder, DocVector vector) {
1723
assert encoder == TopNEncoder.DEFAULT_UNSORTABLE;
1824
this.vector = vector;

0 commit comments

Comments
 (0)