diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/ColumnLoadOperator.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/ColumnLoadOperator.java index 05f60c1b6834d..9eb00adb58146 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/ColumnLoadOperator.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/ColumnLoadOperator.java @@ -11,6 +11,7 @@ import org.elasticsearch.compute.data.Block; import org.elasticsearch.compute.data.Page; import org.elasticsearch.core.ReleasableIterator; +import org.elasticsearch.core.Releasables; /** * {@link Block#lookup Looks up} values from a provided {@link Block} and @@ -44,8 +45,19 @@ public String describe() { private final int positionsOrd; public ColumnLoadOperator(Values values, int positionsOrd) { - this.values = values; this.positionsOrd = positionsOrd; + this.values = clone(values); + } + + // FIXME: Since we don't have a thread-safe RefCounted for blocks/vectors, we have to clone the values block to avoid + // data races of reference when sharing blocks/vectors across threads. Remove this when we have a thread-safe RefCounted + // for blocks/vectors. + static Values clone(Values values) { + final Block block = values.block; + try (var builder = block.elementType().newBlockBuilder(block.getPositionCount(), block.blockFactory())) { + builder.copyFrom(block, 0, block.getPositionCount()); + return new Values(values.name, builder.build()); + } } /** @@ -67,6 +79,11 @@ protected ReleasableIterator receive(Page page) { return appendBlocks(page, values.block.lookup(page.getBlock(positionsOrd), TARGET_BLOCK_SIZE)); } + @Override + public void close() { + Releasables.closeExpectNoException(values.block, super::close); + } + @Override public String toString() { return "ColumnLoad[values=" + values + ", positions=" + positionsOrd + "]";