Skip to content

Commit 4d2c7ba

Browse files
committed
Keep outstanding pages when finish buffer early
1 parent 0cf42f2 commit 4d2c7ba

File tree

3 files changed

+29
-4
lines changed

3 files changed

+29
-4
lines changed

muted-tests.yml

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -209,9 +209,6 @@ tests:
209209
issue: https://github.com/elastic/elasticsearch/issues/120668
210210
- class: org.elasticsearch.xpack.security.authc.ldap.ADLdapUserSearchSessionFactoryTests
211211
issue: https://github.com/elastic/elasticsearch/issues/119882
212-
- class: org.elasticsearch.xpack.esql.action.CrossClusterAsyncEnrichStopIT
213-
method: testEnrichAfterStop
214-
issue: https://github.com/elastic/elasticsearch/issues/120757
215212
- class: org.elasticsearch.xpack.test.rest.XPackRestIT
216213
method: test {p0=ml/3rd_party_deployment/Test start deployment fails while model download in progress}
217214
issue: https://github.com/elastic/elasticsearch/issues/120810

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/exchange/ExchangeBuffer.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,16 @@ void addPage(Page page) {
4747
notifyNotEmpty();
4848
}
4949
if (noMoreInputs) {
50-
discardPages();
50+
if (queue.removeIf(p -> p == page)) {
51+
page.releaseBlocks();
52+
final int size = queueSize.decrementAndGet();
53+
if (size == maxSize - 1) {
54+
notifyNotFull();
55+
}
56+
if (size == 0) {
57+
completionFuture.onResponse(null);
58+
}
59+
}
5160
}
5261
}
5362

x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/exchange/ExchangeBufferTests.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,25 @@ public void testDrainPages() throws Exception {
6666
blockFactory.ensureAllBlocksAreReleased();
6767
}
6868

69+
public void testOutstandingPages() throws Exception {
70+
ExchangeBuffer buffer = new ExchangeBuffer(randomIntBetween(1000, 10000));
71+
var blockFactory = blockFactory();
72+
Page p1 = randomPage(blockFactory);
73+
Page p2 = randomPage(blockFactory);
74+
buffer.addPage(p1);
75+
buffer.addPage(p2);
76+
buffer.finish(false);
77+
buffer.addPage(randomPage(blockFactory));
78+
assertThat(buffer.size(), equalTo(2));
79+
assertSame(buffer.pollPage(), p1);
80+
p1.releaseBlocks();
81+
assertSame(buffer.pollPage(), p2);
82+
p2.releaseBlocks();
83+
assertNull(buffer.pollPage());
84+
assertTrue(buffer.isFinished());
85+
blockFactory.ensureAllBlocksAreReleased();
86+
}
87+
6988
private static MockBlockFactory blockFactory() {
7089
BigArrays bigArrays = new MockBigArrays(PageCacheRecycler.NON_RECYCLING_INSTANCE, ByteSizeValue.ofGb(1)).withCircuitBreaking();
7190
CircuitBreaker breaker = bigArrays.breakerService().getBreaker(CircuitBreaker.REQUEST);

0 commit comments

Comments
 (0)