Skip to content

Commit 9a8a756

Browse files
authored
Double close on shuffle during field loading (elastic#130838) (elastic#130848)
Fixes a bug during field loading where we could double-close blocks if we failed to allocate memory during the un-shuffling portion of field loading from single segments. Unit test incoming in the followup. Closes elastic#130426 Closes elastic#130790 Closes elastic#130791 Closes elastic#130792 Closes elastic#130793 Closes elastic#130270 Closes elastic#130788 Closes elastic#130122 Closes elastic#130827
1 parent b6b13d5 commit 9a8a756

File tree

5 files changed

+32
-26
lines changed

5 files changed

+32
-26
lines changed

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/read/ValuesFromSingleReader.java

Lines changed: 19 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -65,22 +65,27 @@ public int get(int i) {
6565
return;
6666
}
6767
int[] forwards = docs.shardSegmentDocMapForwards();
68-
loadFromSingleLeaf(target, new BlockLoader.Docs() {
69-
@Override
70-
public int count() {
71-
return docs.getPositionCount();
72-
}
68+
Block[] unshuffled = new Block[target.length];
69+
try {
70+
loadFromSingleLeaf(unshuffled, new BlockLoader.Docs() {
71+
@Override
72+
public int count() {
73+
return docs.getPositionCount();
74+
}
7375

74-
@Override
75-
public int get(int i) {
76-
return docs.docs().getInt(forwards[i]);
77-
}
78-
});
79-
final int[] backwards = docs.shardSegmentDocMapBackwards();
80-
for (int i = 0; i < target.length; i++) {
81-
try (Block in = target[i]) {
82-
target[i] = in.filter(backwards);
76+
@Override
77+
public int get(int i) {
78+
return docs.docs().getInt(forwards[i]);
79+
}
80+
});
81+
final int[] backwards = docs.shardSegmentDocMapBackwards();
82+
for (int i = 0; i < unshuffled.length; i++) {
83+
target[i] = unshuffled[i].filter(backwards);
84+
unshuffled[i].close();
85+
unshuffled[i] = null;
8386
}
87+
} finally {
88+
Releasables.closeExpectNoException(unshuffled);
8489
}
8590
}
8691

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/read/ValuesReader.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,9 @@ public Block[] next() {
3636
boolean success = false;
3737
try {
3838
load(target, offset);
39+
if (target[0].getPositionCount() != docs.getPositionCount()) {
40+
throw new IllegalStateException("partial pages not yet supported");
41+
}
3942
success = true;
4043
for (Block b : target) {
4144
operator.valuesLoaded += b.getTotalValueCount();

x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/read/ValuesSourceReaderOperatorTests.java

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,6 @@
2929
import org.elasticsearch.common.bytes.BytesReference;
3030
import org.elasticsearch.common.lucene.Lucene;
3131
import org.elasticsearch.common.settings.Settings;
32-
import org.elasticsearch.common.unit.ByteSizeValue;
3332
import org.elasticsearch.compute.data.Block;
3433
import org.elasticsearch.compute.data.BlockFactory;
3534
import org.elasticsearch.compute.data.BooleanBlock;
@@ -446,12 +445,6 @@ protected void assertSimpleOutput(List<Page> input, List<Page> results) {
446445
assertThat(sum, equalTo(expectedSum));
447446
}
448447

449-
@Override
450-
protected ByteSizeValue enoughMemoryForSimple() {
451-
assumeFalse("strange exception in the test, fix soon", true);
452-
return ByteSizeValue.ofKb(1);
453-
}
454-
455448
public void testLoadAll() {
456449
DriverContext driverContext = driverContext();
457450
loadSimpleAndAssert(

x-pack/plugin/esql/compute/test/src/main/java/org/elasticsearch/compute/test/OperatorTestCase.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -98,10 +98,16 @@ protected ByteSizeValue enoughMemoryForSimple() {
9898
* all pages.
9999
*/
100100
public final void testSimpleCircuitBreaking() {
101-
ByteSizeValue memoryLimitForSimple = enoughMemoryForSimple();
102-
Operator.OperatorFactory simple = simple(new SimpleOptions(true));
101+
/*
102+
* Build the input before building `simple` to handle the rare
103+
* cases where `simple` need some state from the input - mostly
104+
* this is ValuesSourceReaderOperator.
105+
*/
103106
DriverContext inputFactoryContext = driverContext();
104107
List<Page> input = CannedSourceOperator.collectPages(simpleInput(inputFactoryContext.blockFactory(), between(1_000, 10_000)));
108+
109+
ByteSizeValue memoryLimitForSimple = enoughMemoryForSimple();
110+
Operator.OperatorFactory simple = simple(new SimpleOptions(true));
105111
try {
106112
ByteSizeValue limit = BreakerTestUtil.findBreakerLimit(memoryLimitForSimple, l -> runWithLimit(simple, input, l));
107113
ByteSizeValue testWithSize = ByteSizeValue.ofBytes(randomLongBetween(0, limit.getBytes()));

x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EnrichIT.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,6 @@
5050
import org.elasticsearch.xpack.esql.core.type.DataType;
5151
import org.elasticsearch.xpack.esql.enrich.EnrichLookupService;
5252
import org.elasticsearch.xpack.esql.plan.logical.Enrich;
53-
import org.elasticsearch.xpack.esql.plugin.EsqlPlugin;
5453
import org.junit.After;
5554
import org.junit.Before;
5655

@@ -81,8 +80,8 @@ public class EnrichIT extends AbstractEsqlIntegTestCase {
8180

8281
@Override
8382
protected Collection<Class<? extends Plugin>> nodePlugins() {
84-
List<Class<? extends Plugin>> plugins = new ArrayList<>(super.nodePlugins());
85-
plugins.add(EsqlPlugin.class);
83+
List<Class<? extends Plugin>> plugins = new ArrayList<>();
84+
plugins.add(EsqlActionBreakerIT.EsqlTestPluginWithMockBlockFactory.class);
8685
plugins.add(InternalExchangePlugin.class);
8786
plugins.add(LocalStateEnrich.class);
8887
plugins.add(IngestCommonPlugin.class);

0 commit comments

Comments
 (0)