Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 0 additions & 3 deletions muted-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -555,9 +555,6 @@ tests:
- class: org.elasticsearch.xpack.esql.action.EsqlRemoteErrorWrapIT
method: testThatRemoteErrorsAreWrapped
issue: https://github.com/elastic/elasticsearch/issues/130794
- class: org.elasticsearch.xpack.esql.qa.multi_node.EsqlSpecIT
method: test {ints.InCast SYNC}
issue: https://github.com/elastic/elasticsearch/issues/130796
- class: org.elasticsearch.backwards.MixedClusterClientYamlTestSuiteIT
method: test {p0=mtermvectors/10_basic/Tests catching other exceptions per item}
issue: https://github.com/elastic/elasticsearch/issues/122414
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -321,6 +321,7 @@ public TransportVersion getMinimalSupportedVersion() {
private static class AppendBlocksIterator implements ReleasableIterator<Page> {
private final Page page;
private final ReleasableIterator<Block[]> next;
private boolean closed = false;

private int positionOffset;

Expand Down Expand Up @@ -348,7 +349,15 @@ public final Page next() {
for (int b = 0; b < page.getBlockCount(); b++) {
page.getBlock(b).incRef();
}
return page.appendBlocks(read);
final Page result = page.appendBlocks(read);
// We need to release the blocks of the page in this iteration instead of delaying to the next,
// because the blocks of this page are now shared with the output page. The output page can be
// passed to a separate driver, which may run concurrently with this driver, leading to data races
// of references in AbstractNonThreadSafeRefCounted, which is not thread-safe.
// An alternative would be to make RefCounted for Vectors/Blocks thread-safe when they are about
// to be shared with other drivers via #allowPassingToDifferentDriver.
close();
return result;
}
Block[] newBlocks = new Block[page.getBlockCount() + read.length];
System.arraycopy(read, 0, newBlocks, page.getBlockCount(), read.length);
Expand All @@ -368,7 +377,10 @@ public final Page next() {

@Override
public void close() {
Releasables.closeExpectNoException(page::releaseBlocks, next);
if (closed == false) {
closed = true;
Releasables.closeExpectNoException(page::releaseBlocks, next);
}
}
}
}