Skip to content

Commit 8fb564a

Browse files
committed
Add ref counter to OrdinalsGroupingOperator
1 parent d6ebed2 commit 8fb564a

File tree

3 files changed

+14
-5
lines changed

3 files changed

+14
-5
lines changed

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/DocVector.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ public final class DocVector extends AbstractVector implements Vector {
5151

5252
private final ShardRefCounted shardRefCounters;
5353

54-
public ShardRefCounted shardRefCounters() {
54+
public ShardRefCounted shardRefCounted() {
5555
return shardRefCounters;
5656
}
5757

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/OrdinalsGroupingOperator.java

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import org.elasticsearch.compute.data.IntVector;
3434
import org.elasticsearch.compute.data.Page;
3535
import org.elasticsearch.compute.lucene.ValuesSourceReaderOperator;
36+
import org.elasticsearch.core.RefCounted;
3637
import org.elasticsearch.core.Releasable;
3738
import org.elasticsearch.core.Releasables;
3839
import org.elasticsearch.index.mapper.BlockLoader;
@@ -136,6 +137,7 @@ public void addInput(Page page) {
136137
requireNonNull(page, "page is null");
137138
DocVector docVector = page.<DocBlock>getBlock(docChannel).asVector();
138139
final int shardIndex = docVector.shards().getInt(0);
140+
RefCounted shardRefCounter = docVector.shardRefCounted().get(shardIndex);
139141
final var blockLoader = blockLoaders.apply(shardIndex);
140142
boolean pagePassed = false;
141143
try {
@@ -150,7 +152,8 @@ public void addInput(Page page) {
150152
driverContext.blockFactory(),
151153
this::createGroupingAggregators,
152154
() -> blockLoader.ordinals(shardContexts.get(k.shardIndex).reader().leaves().get(k.segmentIndex)),
153-
driverContext.bigArrays()
155+
driverContext.bigArrays(),
156+
shardRefCounter
154157
);
155158
} catch (IOException e) {
156159
throw new UncheckedIOException(e);
@@ -343,15 +346,19 @@ static final class OrdinalSegmentAggregator implements Releasable, SeenGroupIds
343346
private final List<GroupingAggregator> aggregators;
344347
private final CheckedSupplier<SortedSetDocValues, IOException> docValuesSupplier;
345348
private final BitArray visitedOrds;
349+
private final RefCounted shardRefCounted;
346350
private BlockOrdinalsReader currentReader;
347351

348352
OrdinalSegmentAggregator(
349353
BlockFactory blockFactory,
350354
Supplier<List<GroupingAggregator>> aggregatorsSupplier,
351355
CheckedSupplier<SortedSetDocValues, IOException> docValuesSupplier,
352-
BigArrays bigArrays
356+
BigArrays bigArrays,
357+
RefCounted shardRefCounted
353358
) throws IOException {
354359
boolean success = false;
360+
shardRefCounted.mustIncRef();
361+
this.shardRefCounted = shardRefCounted;
355362
List<GroupingAggregator> groupingAggregators = null;
356363
BitArray bitArray = null;
357364
try {
@@ -363,11 +370,13 @@ static final class OrdinalSegmentAggregator implements Releasable, SeenGroupIds
363370
this.docValuesSupplier = docValuesSupplier;
364371
this.aggregators = groupingAggregators;
365372
this.visitedOrds = bitArray;
373+
this.shardRefCounted.mustIncRef();
366374
success = true;
367375
} finally {
368376
if (success == false) {
369377
if (bitArray != null) Releasables.close(bitArray);
370378
if (groupingAggregators != null) Releasables.close(groupingAggregators);
379+
shardRefCounted.decRef();
371380
}
372381
}
373382
}
@@ -447,7 +456,7 @@ public BitArray seenGroupIds(BigArrays bigArrays) {
447456

448457
@Override
449458
public void close() {
450-
Releasables.close(visitedOrds, () -> Releasables.close(aggregators));
459+
Releasables.close(visitedOrds, () -> Releasables.close(aggregators), Releasables.fromRefCounted(shardRefCounted));
451460
}
452461
}
453462

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/topn/TopNOperator.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -216,7 +216,7 @@ private void writeKey(int position, Row row) {
216216
private void writeValues(int position, Row destination) {
217217
for (ValueExtractor e : valueExtractors) {
218218
if (e instanceof ValueExtractorForDoc fd) {
219-
destination.setShardRefCountersAndShard(fd.vector().shardRefCounters().get(fd.vector().shards().getInt(position)));
219+
destination.setShardRefCountersAndShard(fd.vector().shardRefCounted().get(fd.vector().shards().getInt(position)));
220220
}
221221
e.writeValue(destination.values, position);
222222
}

0 commit comments

Comments
 (0)