Skip to content

Commit 78e47b4

Browse files
committed
Make ComputeBlockLoaderFactory Releasable
1 parent 040fd39 commit 78e47b4

File tree

1 file changed

+32
-31
lines changed

1 file changed

+32
-31
lines changed

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/ValuesSourceReaderOperator.java

Lines changed: 32 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -220,9 +220,8 @@ private void loadFromSingleLeaf(Block[] blocks, int shard, int segment, BlockLoa
220220
positionFieldWork(shard, segment, firstDoc);
221221
StoredFieldsSpec storedFieldsSpec = StoredFieldsSpec.NO_REQUIREMENTS;
222222
List<RowStrideReaderWork> rowStrideReaders = new ArrayList<>(fields.length);
223-
ComputeBlockLoaderFactory loaderBlockFactory = new ComputeBlockLoaderFactory(blockFactory, docs.count());
224223
LeafReaderContext ctx = ctx(shard, segment);
225-
try {
224+
try (ComputeBlockLoaderFactory loaderBlockFactory = new ComputeBlockLoaderFactory(blockFactory, docs.count())) {
226225
for (int f = 0; f < fields.length; f++) {
227226
FieldWork field = fields[f];
228227
BlockLoader.ColumnAtATimeReader columnAtATime = field.columnAtATime(ctx);
@@ -345,27 +344,28 @@ void run() throws IOException {
345344
builders[f] = new Block.Builder[shardContexts.size()];
346345
converters[f] = new BlockLoader[shardContexts.size()];
347346
}
348-
ComputeBlockLoaderFactory loaderBlockFactory = new ComputeBlockLoaderFactory(blockFactory, docs.getPositionCount());
349-
int p = forwards[0];
350-
int shard = shards.getInt(p);
351-
int segment = segments.getInt(p);
352-
int firstDoc = docs.getInt(p);
353-
positionFieldWork(shard, segment, firstDoc);
354-
LeafReaderContext ctx = ctx(shard, segment);
355-
fieldsMoved(ctx, shard);
356-
verifyBuilders(loaderBlockFactory, shard);
357-
read(firstDoc, shard);
358-
for (int i = 1; i < forwards.length; i++) {
359-
p = forwards[i];
360-
shard = shards.getInt(p);
361-
segment = segments.getInt(p);
362-
boolean changedSegment = positionFieldWorkDocGuarteedAscending(shard, segment);
363-
if (changedSegment) {
364-
ctx = ctx(shard, segment);
365-
fieldsMoved(ctx, shard);
366-
}
347+
try (ComputeBlockLoaderFactory loaderBlockFactory = new ComputeBlockLoaderFactory(blockFactory, docs.getPositionCount())) {
348+
int p = forwards[0];
349+
int shard = shards.getInt(p);
350+
int segment = segments.getInt(p);
351+
int firstDoc = docs.getInt(p);
352+
positionFieldWork(shard, segment, firstDoc);
353+
LeafReaderContext ctx = ctx(shard, segment);
354+
fieldsMoved(ctx, shard);
367355
verifyBuilders(loaderBlockFactory, shard);
368-
read(docs.getInt(p), shard);
356+
read(firstDoc, shard);
357+
for (int i = 1; i < forwards.length; i++) {
358+
p = forwards[i];
359+
shard = shards.getInt(p);
360+
segment = segments.getInt(p);
361+
boolean changedSegment = positionFieldWorkDocGuarteedAscending(shard, segment);
362+
if (changedSegment) {
363+
ctx = ctx(shard, segment);
364+
fieldsMoved(ctx, shard);
365+
}
366+
verifyBuilders(loaderBlockFactory, shard);
367+
read(docs.getInt(p), shard);
368+
}
369369
}
370370
for (int f = 0; f < target.length; f++) {
371371
for (int s = 0; s < shardContexts.size(); s++) {
@@ -614,7 +614,7 @@ public String toString() {
614614
}
615615
}
616616

617-
private static class ComputeBlockLoaderFactory implements BlockLoader.BlockFactory {
617+
private static class ComputeBlockLoaderFactory implements BlockLoader.BlockFactory, Releasable {
618618
private final BlockFactory factory;
619619
private final int pageSize;
620620
private Block nullBlock;
@@ -681,19 +681,20 @@ public BlockLoader.Builder nulls(int expectedCount) {
681681

682682
@Override
683683
public Block constantNulls() {
684-
// Avoid creating a new null block if we already have one.
685-
// However, downstream operators take ownership of the null block we hand to them and may close it, forcing us to create a new
686-
// one.
687-
// This could be avoided altogether if this factory itself kept a reference to the null block, but we'd have to also ensure
688-
// to release this block once the factory is not used anymore.
689-
if (nullBlock == null || nullBlock.isReleased()) {
684+
if (nullBlock == null) {
690685
nullBlock = factory.newConstantNullBlock(pageSize);
691-
} else {
692-
nullBlock.incRef();
693686
}
687+
nullBlock.incRef();
694688
return nullBlock;
695689
}
696690

691+
@Override
692+
public void close() {
693+
if (nullBlock != null) {
694+
nullBlock.close();
695+
}
696+
}
697+
697698
@Override
698699
public BytesRefBlock constantBytes(BytesRef value) {
699700
return factory.newConstantBytesRefBlockWith(value, pageSize);

0 commit comments

Comments
 (0)