Skip to content

Commit 78f6133

Browse files
authored
Fix data race in InlineStats (#131370)
Although blocks/vectors are immutable and safe to share between threads, their references are currently not thread-safe, which can lead to data races. Previously, blocks/vectors were exclusively owned by a single thread, but this is no longer always the case with InlineJoin. We should consider switching to AbstractRefCounted, which is thread-safe, and benchmark it with many-fields use cases to ensure there is no performance regression. As a temporary solution, this change clones the values block in InlineJoin until thread-safe blocks/vectors are available.
1 parent 280793d commit 78f6133

File tree

1 file changed

+18
-1
lines changed

1 file changed

+18
-1
lines changed

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/ColumnLoadOperator.java

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
import org.elasticsearch.compute.data.Block;
1212
import org.elasticsearch.compute.data.Page;
1313
import org.elasticsearch.core.ReleasableIterator;
14+
import org.elasticsearch.core.Releasables;
1415

1516
/**
1617
* {@link Block#lookup Looks up} values from a provided {@link Block} and
@@ -44,8 +45,19 @@ public String describe() {
4445
private final int positionsOrd;
4546

4647
public ColumnLoadOperator(Values values, int positionsOrd) {
47-
this.values = values;
4848
this.positionsOrd = positionsOrd;
49+
this.values = clone(values);
50+
}
51+
52+
// FIXME: Since we don't have a thread-safe RefCounted for blocks/vectors, we have to clone the values block to avoid
53+
// data races of reference when sharing blocks/vectors across threads. Remove this when we have a thread-safe RefCounted
54+
// for blocks/vectors.
55+
static Values clone(Values values) {
56+
final Block block = values.block;
57+
try (var builder = block.elementType().newBlockBuilder(block.getPositionCount(), block.blockFactory())) {
58+
builder.copyFrom(block, 0, block.getPositionCount());
59+
return new Values(values.name, builder.build());
60+
}
4961
}
5062

5163
/**
@@ -67,6 +79,11 @@ protected ReleasableIterator<Page> receive(Page page) {
6779
return appendBlocks(page, values.block.lookup(page.getBlock(positionsOrd), TARGET_BLOCK_SIZE));
6880
}
6981

82+
@Override
83+
public void close() {
84+
Releasables.closeExpectNoException(values.block, super::close);
85+
}
86+
7087
@Override
7188
public String toString() {
7289
return "ColumnLoad[values=" + values + ", positions=" + positionsOrd + "]";

0 commit comments

Comments
 (0)