Skip to content

Commit 6270833

Browse files
committed
Move shardRefCounter logic to the super class
1 parent aab0407 commit 6270833

File tree

8 files changed

+49
-47
lines changed

8 files changed

+49
-47
lines changed

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneCountOperator.java

Lines changed: 1 addition & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,6 @@ public class LuceneCountOperator extends LuceneOperator {
3535

3636
private static final int PAGE_SIZE = 1;
3737

38-
private final List<? extends RefCounted> shardRefCounters;
3938
private int totalHits = 0;
4039
private int remainingDocs;
4140

@@ -81,9 +80,7 @@ public LuceneCountOperator(
8180
LuceneSliceQueue sliceQueue,
8281
int limit
8382
) {
84-
super(blockFactory, PAGE_SIZE, sliceQueue);
85-
this.shardRefCounters = shardRefCounters;
86-
shardRefCounters.forEach(RefCounted::mustIncRef);
83+
super(shardRefCounters, blockFactory, PAGE_SIZE, sliceQueue);
8784
this.remainingDocs = limit;
8885
this.leafCollector = new LeafCollector() {
8986
@Override
@@ -182,9 +179,4 @@ protected Page getCheckedOutput() throws IOException {
182179
protected void describe(StringBuilder sb) {
183180
sb.append(", remainingDocs=").append(remainingDocs);
184181
}
185-
186-
@Override
187-
public void close() {
188-
shardRefCounters.forEach(RefCounted::decRef);
189-
}
190182
}

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneMaxFactory.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,7 @@ public final long evaluate(long value1, long value2) {
108108
abstract long bytesToLong(byte[] bytes);
109109
}
110110

111+
private final List<? extends ShardContext> contexts;
111112
private final String fieldName;
112113
private final NumberType numberType;
113114

@@ -130,13 +131,14 @@ public LuceneMaxFactory(
130131
false,
131132
ScoreMode.COMPLETE_NO_SCORES
132133
);
134+
this.contexts = contexts;
133135
this.fieldName = fieldName;
134136
this.numberType = numberType;
135137
}
136138

137139
@Override
138140
public SourceOperator get(DriverContext driverContext) {
139-
return new LuceneMinMaxOperator(driverContext.blockFactory(), sliceQueue, fieldName, numberType, limit, Long.MIN_VALUE);
141+
return new LuceneMinMaxOperator(contexts, driverContext.blockFactory(), sliceQueue, fieldName, numberType, limit, Long.MIN_VALUE);
140142
}
141143

142144
@Override

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneMinFactory.java

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
import org.elasticsearch.compute.data.BlockFactory;
1717
import org.elasticsearch.compute.operator.DriverContext;
1818
import org.elasticsearch.compute.operator.SourceOperator;
19+
import org.elasticsearch.core.RefCounted;
1920
import org.elasticsearch.search.MultiValueMode;
2021

2122
import java.io.IOException;
@@ -108,6 +109,7 @@ public final long evaluate(long value1, long value2) {
108109
abstract long bytesToLong(byte[] bytes);
109110
}
110111

112+
private final List<? extends RefCounted> shardRefCounters;
111113
private final String fieldName;
112114
private final NumberType numberType;
113115

@@ -130,13 +132,22 @@ public LuceneMinFactory(
130132
false,
131133
ScoreMode.COMPLETE_NO_SCORES
132134
);
135+
this.shardRefCounters = contexts;
133136
this.fieldName = fieldName;
134137
this.numberType = numberType;
135138
}
136139

137140
@Override
138141
public SourceOperator get(DriverContext driverContext) {
139-
return new LuceneMinMaxOperator(driverContext.blockFactory(), sliceQueue, fieldName, numberType, limit, Long.MAX_VALUE);
142+
return new LuceneMinMaxOperator(
143+
shardRefCounters,
144+
driverContext.blockFactory(),
145+
sliceQueue,
146+
fieldName,
147+
numberType,
148+
limit,
149+
Long.MAX_VALUE
150+
);
140151
}
141152

142153
@Override

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneMinMaxOperator.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,10 +20,12 @@
2020
import org.elasticsearch.compute.data.BlockFactory;
2121
import org.elasticsearch.compute.data.BooleanBlock;
2222
import org.elasticsearch.compute.data.Page;
23+
import org.elasticsearch.core.RefCounted;
2324
import org.elasticsearch.core.Releasables;
2425
import org.elasticsearch.search.MultiValueMode;
2526

2627
import java.io.IOException;
28+
import java.util.List;
2729

2830
/**
2931
* Operator that finds the min or max value of a field using Lucene searches
@@ -65,14 +67,15 @@ sealed interface NumberType permits LuceneMinFactory.NumberType, LuceneMaxFactor
6567
private final String fieldName;
6668

6769
LuceneMinMaxOperator(
70+
List<? extends RefCounted> shardRefCounters,
6871
BlockFactory blockFactory,
6972
LuceneSliceQueue sliceQueue,
7073
String fieldName,
7174
NumberType numberType,
7275
int limit,
7376
long initialResult
7477
) {
75-
super(blockFactory, PAGE_SIZE, sliceQueue);
78+
super(shardRefCounters, blockFactory, PAGE_SIZE, sliceQueue);
7679
this.remainingDocs = limit;
7780
this.numberType = numberType;
7881
this.fieldName = fieldName;

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneOperator.java

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import org.elasticsearch.compute.data.Page;
2626
import org.elasticsearch.compute.operator.Operator;
2727
import org.elasticsearch.compute.operator.SourceOperator;
28+
import org.elasticsearch.core.RefCounted;
2829
import org.elasticsearch.core.TimeValue;
2930
import org.elasticsearch.logging.LogManager;
3031
import org.elasticsearch.logging.Logger;
@@ -52,6 +53,7 @@ public abstract class LuceneOperator extends SourceOperator {
5253

5354
public static final int NO_LIMIT = Integer.MAX_VALUE;
5455

56+
protected final List<? extends RefCounted> shardContextCounters;
5557
protected final BlockFactory blockFactory;
5658

5759
/**
@@ -77,7 +79,14 @@ public abstract class LuceneOperator extends SourceOperator {
7779
*/
7880
long rowsEmitted;
7981

80-
protected LuceneOperator(BlockFactory blockFactory, int maxPageSize, LuceneSliceQueue sliceQueue) {
82+
protected LuceneOperator(
83+
List<? extends RefCounted> shardContextCounters,
84+
BlockFactory blockFactory,
85+
int maxPageSize,
86+
LuceneSliceQueue sliceQueue
87+
) {
88+
this.shardContextCounters = shardContextCounters;
89+
shardContextCounters.forEach(RefCounted::incRef);
8190
this.blockFactory = blockFactory;
8291
this.maxPageSize = maxPageSize;
8392
this.sliceQueue = sliceQueue;
@@ -138,7 +147,12 @@ public final Page getOutput() {
138147
protected abstract Page getCheckedOutput() throws IOException;
139148

140149
@Override
141-
public void close() {}
150+
public final void close() {
151+
shardContextCounters.forEach(RefCounted::decRef);
152+
additionalClose();
153+
}
154+
155+
protected void additionalClose() { /* Override this method to add any additional cleanup logic if needed */ }
142156

143157
LuceneScorer getCurrentOrLoadNextScorer() {
144158
while (currentScorer == null || currentScorer.isDone()) {

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneSourceOperator.java

Lines changed: 7 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,6 @@
4949
*/
5050
public class LuceneSourceOperator extends LuceneOperator {
5151
private static final Logger log = LogManager.getLogger(LuceneSourceOperator.class);
52-
private final List<? extends RefCounted> shardContextCounters;
5352

5453
private int currentPagePos = 0;
5554
private int remainingDocs;
@@ -227,8 +226,7 @@ public LuceneSourceOperator(
227226
Limiter limiter,
228227
boolean needsScore
229228
) {
230-
super(blockFactory, maxPageSize, sliceQueue);
231-
this.shardContextCounters = shardContextCounters;
229+
super(shardContextCounters, blockFactory, maxPageSize, sliceQueue);
232230
shardContextCounters.forEach(RefCounted::mustIncRef);
233231
this.minPageSize = Math.max(1, maxPageSize / 2);
234232
this.remainingDocs = limit;
@@ -330,12 +328,14 @@ public Page getCheckedOutput() throws IOException {
330328
Block[] blocks = new Block[1 + (scoreBuilder == null ? 0 : 1) + scorer.tags().size()];
331329
currentPagePos -= discardedDocs;
332330
try {
333-
shard = blockFactory.newConstantIntVector(scorer.shardContext().index(), currentPagePos);
331+
int shardId = scorer.shardContext().index();
332+
shard = blockFactory.newConstantIntVector(shardId, currentPagePos);
334333
leaf = blockFactory.newConstantIntVector(scorer.leafReaderContext().ord, currentPagePos);
335334
docs = buildDocsVector(currentPagePos);
336335
docsBuilder = blockFactory.newIntVectorBuilder(Math.min(remainingDocs, maxPageSize));
337336
int b = 0;
338-
blocks[b++] = new DocVector(ShardRefCounted.fromList(shardContextCounters), shard, leaf, docs, true).asBlock();
337+
ShardRefCounted refCounted = ShardRefCounted.single(shardId, shardContextCounters.get(shardId));
338+
blocks[b++] = new DocVector(refCounted, shard, leaf, docs, true).asBlock();
339339
shard = null;
340340
leaf = null;
341341
docs = null;
@@ -393,12 +393,8 @@ private DoubleVector buildScoresVector(int upToPositions) {
393393
}
394394

395395
@Override
396-
public void close() {
397-
Releasables.close(
398-
docsBuilder,
399-
scoreBuilder,
400-
Releasables.wrap(shardContextCounters.stream().map(Releasables::fromRefCounted).toList())
401-
);
396+
public void additionalClose() {
397+
Releasables.close(docsBuilder, scoreBuilder);
402398
}
403399

404400
@Override

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneTopNSourceOperator.java

Lines changed: 1 addition & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -132,9 +132,8 @@ public LuceneTopNSourceOperator(
132132
LuceneSliceQueue sliceQueue,
133133
boolean needsScore
134134
) {
135-
super(blockFactory, maxPageSize, sliceQueue);
135+
super(contexts, blockFactory, maxPageSize, sliceQueue);
136136
this.contexts = contexts;
137-
contexts.forEach(ShardContext::mustIncRef);
138137
this.sorts = sorts;
139138
this.limit = limit;
140139
this.needsScore = needsScore;
@@ -370,10 +369,4 @@ private static PerShardCollector newPerShardCollector(ShardContext context, List
370369
sort = new Sort(l.toArray(SortField[]::new));
371370
return new ScoringPerShardCollector(context, new TopFieldCollectorManager(sort, limit, null, 0).newCollector());
372371
}
373-
374-
@Override
375-
public void close() {
376-
super.close();
377-
contexts.forEach(ShardContext::decRef);
378-
}
379372
}

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/TimeSeriesSourceOperator.java

Lines changed: 5 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,6 @@
4343
import java.util.Set;
4444

4545
public final class TimeSeriesSourceOperator extends LuceneOperator {
46-
private final List<? extends ShardContext> contexts;
4746
private final int maxPageSize;
4847
private final BlockFactory blockFactory;
4948
private final LuceneSliceQueue sliceQueue;
@@ -64,9 +63,8 @@ public final class TimeSeriesSourceOperator extends LuceneOperator {
6463
int maxPageSize,
6564
int limit
6665
) {
67-
super(blockFactory, maxPageSize, sliceQueue);
66+
super(contexts, blockFactory, maxPageSize, sliceQueue);
6867
contexts.forEach(RefCounted::mustIncRef);
69-
this.contexts = contexts;
7068
this.maxPageSize = maxPageSize;
7169
this.blockFactory = blockFactory;
7270
this.remainingDocs = limit;
@@ -110,7 +108,7 @@ public Page getCheckedOutput() throws IOException {
110108
throw new UnsupportedOperationException("tags not supported by " + getClass());
111109
}
112110
iterator = new SegmentsIterator(slice);
113-
docCollector = new DocIdCollector(contexts, blockFactory, slice.shardContext());
111+
docCollector = new DocIdCollector(blockFactory, slice.shardContext());
114112
}
115113
iterator.readDocsForNextPage();
116114
if (currentPagePos > 0) {
@@ -141,13 +139,8 @@ public Page getCheckedOutput() throws IOException {
141139
}
142140

143141
@Override
144-
public void close() {
145-
Releasables.closeExpectNoException(
146-
timestampsBuilder,
147-
tsHashesBuilder,
148-
docCollector,
149-
Releasables.wrap(contexts.stream().map(Releasables::fromRefCounted).toList())
150-
);
142+
public void additionalClose() {
143+
Releasables.closeExpectNoException(timestampsBuilder, tsHashesBuilder, docCollector);
151144
}
152145

153146
class SegmentsIterator {
@@ -365,14 +358,12 @@ OrdinalBytesRefVector build() throws IOException {
365358
}
366359

367360
static final class DocIdCollector implements Releasable {
368-
private final List<? extends ShardContext> contexts;
369361
private final BlockFactory blockFactory;
370362
private final ShardContext shardContext;
371363
private IntVector.Builder docsBuilder;
372364
private IntVector.Builder segmentsBuilder;
373365

374-
DocIdCollector(List<? extends ShardContext> contexts, BlockFactory blockFactory, ShardContext shardContext) {
375-
this.contexts = contexts;
366+
DocIdCollector(BlockFactory blockFactory, ShardContext shardContext) {
376367
this.blockFactory = blockFactory;
377368
this.shardContext = shardContext;
378369
}

0 commit comments

Comments
 (0)