Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.compute.data.AggregateMetricDoubleBlockBuilder;
import org.elasticsearch.compute.data.BlockFactory;
import org.elasticsearch.compute.data.BlockFactoryProvider;
import org.elasticsearch.compute.data.BlockUtils;
import org.elasticsearch.compute.data.BytesRefBlock;
import org.elasticsearch.compute.data.DoubleBlock;
Expand Down Expand Up @@ -94,6 +95,7 @@
import org.elasticsearch.xpack.esql.plan.logical.local.EmptyLocalSupplier;
import org.elasticsearch.xpack.esql.plan.logical.local.LocalRelation;
import org.elasticsearch.xpack.esql.plan.logical.local.LocalSupplier;
import org.elasticsearch.xpack.esql.planner.PlannerUtils;
import org.elasticsearch.xpack.esql.plugin.EsqlPlugin;
import org.elasticsearch.xpack.esql.plugin.QueryPragmas;
import org.elasticsearch.xpack.esql.plugin.TransportActionServices;
Expand Down Expand Up @@ -430,7 +432,8 @@ public static LogicalOptimizerContext unboundLogicalOptimizerContext() {
mock(ProjectResolver.class),
mock(IndexNameExpressionResolver.class),
null,
new InferenceService(mock(Client.class))
new InferenceService(mock(Client.class)),
new BlockFactoryProvider(PlannerUtils.NON_BREAKING_BLOCK_FACTORY)
);

private static TransportService createMockTransportService() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.project.ProjectResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.compute.data.BlockFactoryProvider;
import org.elasticsearch.compute.operator.exchange.ExchangeService;
import org.elasticsearch.search.SearchService;
import org.elasticsearch.transport.TransportService;
Expand All @@ -24,5 +25,6 @@ public record TransportActionServices(
ProjectResolver projectResolver,
IndexNameExpressionResolver indexNameExpressionResolver,
UsageService usageService,
InferenceService inferenceService
InferenceService inferenceService,
BlockFactoryProvider blockFactoryProvider
) {}
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,8 @@ public TransportEsqlQueryAction(
projectResolver,
indexNameExpressionResolver,
usageService,
new InferenceService(client)
new InferenceService(client),
blockFactoryProvider
);

this.computeService = new ComputeService(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import org.elasticsearch.action.support.SubscribableListener;
import org.elasticsearch.common.collect.Iterators;
import org.elasticsearch.compute.data.Block;
import org.elasticsearch.compute.data.BlockFactory;
import org.elasticsearch.compute.data.BlockUtils;
import org.elasticsearch.compute.data.Page;
import org.elasticsearch.compute.operator.DriverCompletionInfo;
Expand Down Expand Up @@ -83,6 +84,7 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import java.util.stream.Stream;

Expand Down Expand Up @@ -124,6 +126,7 @@ public interface PlanRunner {
private final IndicesExpressionGrouper indicesExpressionGrouper;
private final InferenceService inferenceService;
private final RemoteClusterService remoteClusterService;
private final BlockFactory blockFactory;

private boolean explainMode;
private String parsedPlanString;
Expand Down Expand Up @@ -160,6 +163,7 @@ public EsqlSession(
this.inferenceService = services.inferenceService();
this.preMapper = new PreMapper(services);
this.remoteClusterService = services.transportService().getRemoteClusterService();
this.blockFactory = services.blockFactoryProvider().blockFactory();
}

public String sessionId() {
Expand Down Expand Up @@ -269,10 +273,13 @@ private void executeSubPlan(
var physicalSubPlan = logicalPlanToPhysicalPlan(subPlans.stubReplacedSubPlan(), request);

runner.run(physicalSubPlan, listener.delegateFailureAndWrap((next, result) -> {
AtomicReference<Block[]> localRelationBlocks = new AtomicReference<>();
try {
// Translate the subquery into a separate, coordinator based plan and the results 'broadcasted' as a local relation
completionInfoAccumulator.accumulate(result.completionInfo());
LocalRelation resultWrapper = resultToPlan(subPlans.stubReplacedSubPlan(), result);
localRelationBlocks.set(resultWrapper.supplier().get());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is the row which creates a special CopyingLocalSupplier (see https://github.com/elastic/elasticsearch/pull/128917/files#diff-23897d0bd181d50370709de01c3ab3acc2f91be91d15db03f5dcdf5f27cf7866R32). I don't think it has anything to do with this PR, but I am mentioning in case you notice something that might have something to do with it. Before I added that supplier, the blocks created as part of row were being double released with inlinestats queries resulting in exceptions.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Before I added that supplier, the blocks created as part of row were being double released with inlinestats queries resulting in exceptions.

I think that's to do with how inlinestats is executed: the "source" (ES or a row supplier) is accessed twice when executing INLINE STATS: once for doing the agg, then for doing the join. The operator for the agg will get the blocks from the local relation and when done release them. And then the join operator will try to do the same, but those blocks are already released. These blocks are concerned with the left-hand side of the join.

The blocks tracked with this PR are concerned with the right-hand side. The plan looks something like:

LimitExec[1000[INTEGER],12]
\_HashJoinExec[[b{r}#6],[b{r}#6],[b{r}#6],[c{r}#9]]
  |_LocalSourceExec[[x{r}#2, a{r}#4, b{r}#6],org.elasticsearch.xpack.esql.plan.logical.local.CopyingLocalSupplier@10d9bd]
  \_LocalSourceExec[[c{r}#9, b{r}#6],[IntArrayBlock[positions=3, mvOrdering=UNORDERED, vector=IntArrayVector[positions=3, values=[0, 0, 0]]], IntVectorBlock[vector=IntArrayVector[positions=3, values=[2, 3, 4]]]]]

var releasingNext = ActionListener.runAfter(next, () -> releaseLocalRelationBlocks(localRelationBlocks));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this needed? So that LocalRelation generated blocks are not kept in memory unnecessarily? For example, in case there are multiple inline stats commands, first one creates Blocks in memory, we release them then the second inline stats creates some more Blocks, we release them and so forth and so on.

Copy link
Contributor Author

@bpintea bpintea Sep 11, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So that LocalRelation generated blocks are not kept in memory unnecessarily?

Yes. Not kept in memory, we're not referencing them from the circuite breaker, but unaccounted within the circuite breaker.

first one creates Blocks in memory, we release them then the second inline stats creates some more Blocks, we release them and so forth and so on

Right. Ideally, we'd just pass the blocks from the produced result along into the LocalRelation, but they (can) come fragmented over many pages and SessionUtils#fromPages sticks them into contiguous blocks (now - i.e. with this PR - also with memory accounting).


// replace the original logical plan with the backing result
LogicalPlan newLogicalPlan = optimizedPlan.transformUp(
Expand All @@ -290,29 +297,40 @@ private void executeSubPlan(
if (newSubPlan == null) {// run the final "main" plan
LOGGER.debug("Executing final plan:\n{}", newLogicalPlan);
var newPhysicalPlan = logicalPlanToPhysicalPlan(newLogicalPlan, request);
runner.run(newPhysicalPlan, next.delegateFailureAndWrap((finalListener, finalResult) -> {
runner.run(newPhysicalPlan, releasingNext.delegateFailureAndWrap((finalListener, finalResult) -> {
completionInfoAccumulator.accumulate(finalResult.completionInfo());
finalListener.onResponse(
new Result(finalResult.schema(), finalResult.pages(), completionInfoAccumulator.finish(), executionInfo)
);
}));
} else {// continue executing the subplans
executeSubPlan(completionInfoAccumulator, newLogicalPlan, newSubPlan, executionInfo, runner, request, listener);
executeSubPlan(completionInfoAccumulator, newLogicalPlan, newSubPlan, executionInfo, runner, request, releasingNext);
}
} catch (Exception e) {
// safely release the blocks in case an exception occurs either before, but also after the "final" runner.run() forks off
// the current thread, but with the blocks still referenced
releaseLocalRelationBlocks(localRelationBlocks);
throw e;
} finally {
Releasables.closeExpectNoException(Releasables.wrap(Iterators.map(result.pages().iterator(), p -> p::releaseBlocks)));
}
}));
}

private static LocalRelation resultToPlan(LogicalPlan plan, Result result) {
private LocalRelation resultToPlan(LogicalPlan plan, Result result) {
List<Page> pages = result.pages();
List<Attribute> schema = result.schema();
// if (pages.size() > 1) {
Block[] blocks = SessionUtils.fromPages(schema, pages);
Block[] blocks = SessionUtils.fromPages(schema, pages, blockFactory);
return new LocalRelation(plan.source(), schema, LocalSupplier.of(blocks));
}

private static void releaseLocalRelationBlocks(AtomicReference<Block[]> localRelationBlocks) {
Block[] relationBlocks = localRelationBlocks.getAndSet(null);
if (relationBlocks != null) {
Releasables.closeExpectNoException(relationBlocks);
}
}

private EsqlStatement parse(String query, QueryParams params) {
var parsed = new EsqlParser().createQuery(query, params, planTelemetry, configuration);
if (LOGGER.isDebugEnabled()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@

package org.elasticsearch.xpack.esql.session;

import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.compute.data.Block;
import org.elasticsearch.compute.data.BlockFactory;
import org.elasticsearch.compute.data.BlockUtils;
import org.elasticsearch.compute.data.Page;
import org.elasticsearch.core.Releasables;
Expand All @@ -22,19 +22,13 @@ public class SessionUtils {

private SessionUtils() {}

public static Block[] fromPages(List<Attribute> schema, List<Page> pages) {
// Limit ourselves to 1mb of results similar to LOOKUP for now.
long bytesUsed = pages.stream().mapToLong(Page::ramBytesUsedByBlocks).sum();
if (bytesUsed > ByteSizeValue.ofMb(1).getBytes()) {
throw new IllegalArgumentException("sub-plan execution results too large [" + ByteSizeValue.ofBytes(bytesUsed) + "] > 1mb");
}
public static Block[] fromPages(List<Attribute> schema, List<Page> pages, BlockFactory blockFactory) {
int positionCount = pages.stream().mapToInt(Page::getPositionCount).sum();
Block.Builder[] builders = new Block.Builder[schema.size()];
Block[] blocks;
try {
for (int b = 0; b < builders.length; b++) {
builders[b] = PlannerUtils.toElementType(schema.get(b).dataType())
.newBlockBuilder(positionCount, PlannerUtils.NON_BREAKING_BLOCK_FACTORY);
builders[b] = PlannerUtils.toElementType(schema.get(b).dataType()).newBlockBuilder(positionCount, blockFactory);
}
for (Page p : pages) {
for (int b = 0; b < builders.length; b++) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.xpack.esql.session;

import org.apache.lucene.util.BytesRef;
import org.elasticsearch.common.breaker.CircuitBreaker;
import org.elasticsearch.common.breaker.CircuitBreakingException;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.MockBigArrays;
import org.elasticsearch.common.util.PageCacheRecycler;
import org.elasticsearch.compute.data.Block;
import org.elasticsearch.compute.data.BlockFactory;
import org.elasticsearch.compute.data.BytesRefBlock;
import org.elasticsearch.compute.data.Page;
import org.elasticsearch.core.Releasables;
import org.elasticsearch.indices.breaker.CircuitBreakerService;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.esql.core.expression.Attribute;
import org.elasticsearch.xpack.esql.core.expression.ReferenceAttribute;
import org.elasticsearch.xpack.esql.core.type.DataType;

import java.util.ArrayList;
import java.util.List;

import static org.elasticsearch.common.unit.ByteSizeUnit.GB;
import static org.elasticsearch.xpack.esql.plan.AbstractNodeSerializationTests.randomSource;
import static org.elasticsearch.xpack.esql.session.SessionUtils.fromPages;
import static org.hamcrest.Matchers.is;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

public class SessionUtilsTests extends ESTestCase {

/*
* 1. Generate a list of Pages with one BytesRef block, each of different positions, filled with random bytes.
* 2. Convert the list of Pages into a single BytesRefBlock Page using `fromPages()`.
* 3. Verify that the resulting BytesRefBlock contains the same bytes from the input Pages.
* 4. Verify that a CircuitBreakingException is thrown when the memory limit is too low.
*/
public void testFromPages() {
final int minBytes = 500;
final int maxBytes = randomIntBetween(minBytes, minBytes * 1_000);
byte[] inBuffer = new byte[maxBytes];
BlockFactory blockFactory = blockFactory((int) GB.toBytes(1));

BytesRefBlock.Builder builder = blockFactory.newBytesRefBlockBuilder(maxBytes);
List<Page> pages = new ArrayList<>();
int producedBytes = 0;
int producedRows = 0;
int rowsPerPage = randomIntBetween(1, 100);
int rows = 0;
while (producedBytes < maxBytes) {
int rowBytes = Math.min(randomIntBetween(1, maxBytes / minBytes), maxBytes - producedBytes);
byte[] rowValue = randomByteArrayOfLength(rowBytes);

builder.appendBytesRef(new BytesRef(rowValue));
System.arraycopy(rowValue, 0, inBuffer, producedBytes, rowBytes);

producedBytes += rowBytes;
rows++;

if (rows > rowsPerPage) {
producedRows += rows;
rows = 0;
enqueueBlock(builder, pages);
builder = blockFactory.newBytesRefBlockBuilder(maxBytes);
rowsPerPage = randomIntBetween(1, 100);
}
}
if (rows > 0) {
producedRows += rows;
enqueueBlock(builder, pages);
}

Attribute attr = new ReferenceAttribute(randomSource(), randomAlphaOfLengthOrNull(10), randomAlphaOfLength(10), DataType.KEYWORD);

Block[] outBlocks = fromPages(List.of(attr), pages, blockFactory);
assertThat(outBlocks.length, is(1));
BytesRefBlock bytesBlock = (BytesRefBlock) outBlocks[0];
assertThat(bytesBlock.getPositionCount(), is(producedRows));

byte[] outBuffer = new byte[producedBytes];
for (int i = 0, posCount = bytesBlock.getPositionCount(), outOffset = 0; i < posCount; i++) {
BytesRef ref = bytesBlock.getBytesRef(i, new BytesRef());
System.arraycopy(ref.bytes, ref.offset, outBuffer, outOffset, ref.length);
outOffset += ref.length;
}
assertThat(outBuffer, is(inBuffer));

Releasables.close(outBlocks);

BlockFactory convertBlockFactory = blockFactory(minBytes);
assertThrows(CircuitBreakingException.class, () -> fromPages(List.of(attr), pages, convertBlockFactory));

Releasables.close(pages);
}

private BlockFactory blockFactory(long maxBytes) {
CircuitBreaker breaker = new MockBigArrays.LimitedBreaker(this.getClass().getName(), ByteSizeValue.ofBytes(maxBytes));
CircuitBreakerService breakerService = mock(CircuitBreakerService.class);
when(breakerService.getBreaker(CircuitBreaker.REQUEST)).thenReturn(breaker);
BigArrays bigArrays = new MockBigArrays(PageCacheRecycler.NON_RECYCLING_INSTANCE, breakerService);
return new BlockFactory(bigArrays.breakerService().getBreaker(CircuitBreaker.REQUEST), bigArrays);
}

private void enqueueBlock(BytesRefBlock.Builder builder, List<Page> pages) {
Block block = builder.build();
pages.add(new Page(block));
Releasables.close(builder);
}
}