Skip to content

Commit cddcfab

Browse files
committed
fix mem.leak
1 parent c2c5911 commit cddcfab

File tree

3 files changed

+9
-9
lines changed

3 files changed

+9
-9
lines changed

test/framework/src/main/java/org/elasticsearch/common/util/MockBigArrays.java

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -100,9 +100,6 @@ public static void assertFitsIn(ByteSizeValue max, Function<BigArrays, Releasabl
100100
private static final ConcurrentMap<Object, Object> ACQUIRED_ARRAYS = new ConcurrentHashMap<>();
101101

102102
public static void ensureAllArraysAreReleased() throws Exception {
103-
// TODO: reenable + fix mem.leaks
104-
if (true) return;
105-
106103
final Map<Object, Object> masterCopy = new HashMap<>(ACQUIRED_ARRAYS);
107104
if (masterCopy.isEmpty() == false) {
108105
// not empty, we might be executing on a shared cluster that keeps on obtaining

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/ChangePointOperator.java

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@
2323

2424
public class ChangePointOperator implements Operator {
2525

26+
// TODO: close upon failure / interrupt
27+
2628
public record Factory(int inputChannel) implements OperatorFactory {
2729
@Override
2830
public Operator get(DriverContext driverContext) {
@@ -70,15 +72,16 @@ public void finish() {
7072

7173
@Override
7274
public boolean isFinished() {
73-
return finished && outputPageIndex == inputPages.size();
75+
return finished && outputPages.isEmpty();
7476
}
7577

7678
@Override
7779
public Page getOutput() {
7880
if (finished == false) {
7981
return null;
8082
}
81-
if (outputPageIndex == inputPages.size()) {
83+
if (outputPageIndex == outputPages.size()) {
84+
outputPages.clear();
8285
return null;
8386
}
8487
return outputPages.get(outputPageIndex++);
@@ -90,6 +93,7 @@ private void createOutputPages() {
9093
valuesCount += page.getPositionCount();
9194
}
9295

96+
// TODO: account for this memory?
9397
double[] values = new double[valuesCount];
9498
int valuesIndex = 0;
9599
for (Page inputPage : inputPages) {
@@ -131,11 +135,13 @@ private void createOutputPages() {
131135
changePvalueBlock = blockFactory.newConstantNullBlock(inputPage.getPositionCount());
132136
}
133137

138+
// TODO: what about duplicate names??
134139
Page outputPage = inputPage.appendBlocks(new Block[] { changeTypeBlock, changePvalueBlock });
135140
outputPages.add(outputPage);
136-
137141
pageStartIndex += inputPage.getPositionCount();
138142
}
143+
144+
inputPages.clear();
139145
}
140146

141147
@Override

x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/CsvTests.java

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -327,16 +327,13 @@ private void doTest() throws Exception {
327327
} finally {
328328
Releasables.close(() -> Iterators.map(actualResults.pages().iterator(), p -> p::releaseBlocks));
329329
// Give the breaker service some time to clear in case we got results before the rest of the driver had cleaned up
330-
// TODO: reenable + fix mem.leaks
331-
/*
332330
assertBusy(
333331
() -> assertThat(
334332
"Not all circuits were cleaned up",
335333
bigArrays.breakerService().getBreaker(CircuitBreaker.REQUEST).getUsed(),
336334
equalTo(0L)
337335
)
338336
);
339-
*/
340337
}
341338
}
342339

0 commit comments

Comments
 (0)