Skip to content

Commit 924eb89

Browse files
committed
Keep outstanding pages when finish buffer early (elastic#121857)
Today, the exchange buffer of an exchange source is finished in two cases: (1) when the downstream pipeline has received enough data and (2) when all remote sinks have completed. In the first case, outstanding pages could be safely discarded. In the second case, no new pages should be received after finishing. In both scenarios, discarding all outstanding pages was safe if noMoreInputs was switched while adding pages. However, with the stop API, the buffer may now finish while keeping outstanding pages, and new pages may still be received. This change updates the exchange buffer to discard only the incoming page when noMoreInputs is switched, rather than all pages in the buffer. Closes elastic#120757
1 parent cbcdd0a commit 924eb89

File tree

3 files changed

+30
-4
lines changed

3 files changed

+30
-4
lines changed

muted-tests.yml

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -209,9 +209,6 @@ tests:
209209
issue: https://github.com/elastic/elasticsearch/issues/120668
210210
- class: org.elasticsearch.xpack.security.authc.ldap.ADLdapUserSearchSessionFactoryTests
211211
issue: https://github.com/elastic/elasticsearch/issues/119882
212-
- class: org.elasticsearch.xpack.esql.action.CrossClusterAsyncEnrichStopIT
213-
method: testEnrichAfterStop
214-
issue: https://github.com/elastic/elasticsearch/issues/120757
215212
- class: org.elasticsearch.xpack.test.rest.XPackRestIT
216213
method: test {p0=ml/3rd_party_deployment/Test start deployment fails while model download in progress}
217214
issue: https://github.com/elastic/elasticsearch/issues/120810

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/exchange/ExchangeBuffer.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,17 @@ void addPage(Page page) {
4747
notifyNotEmpty();
4848
}
4949
if (noMoreInputs) {
50-
discardPages();
50+
// O(N) but acceptable because it only occurs with the stop API, and the queue size should be very small.
51+
if (queue.removeIf(p -> p == page)) {
52+
page.releaseBlocks();
53+
final int size = queueSize.decrementAndGet();
54+
if (size == maxSize - 1) {
55+
notifyNotFull();
56+
}
57+
if (size == 0) {
58+
completionFuture.onResponse(null);
59+
}
60+
}
5161
}
5262
}
5363

x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/exchange/ExchangeBufferTests.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,25 @@ public void testDrainPages() throws Exception {
6666
blockFactory.ensureAllBlocksAreReleased();
6767
}
6868

69+
public void testOutstandingPages() throws Exception {
70+
ExchangeBuffer buffer = new ExchangeBuffer(randomIntBetween(1000, 10000));
71+
var blockFactory = blockFactory();
72+
Page p1 = randomPage(blockFactory);
73+
Page p2 = randomPage(blockFactory);
74+
buffer.addPage(p1);
75+
buffer.addPage(p2);
76+
buffer.finish(false);
77+
buffer.addPage(randomPage(blockFactory));
78+
assertThat(buffer.size(), equalTo(2));
79+
assertSame(buffer.pollPage(), p1);
80+
p1.releaseBlocks();
81+
assertSame(buffer.pollPage(), p2);
82+
p2.releaseBlocks();
83+
assertNull(buffer.pollPage());
84+
assertTrue(buffer.isFinished());
85+
blockFactory.ensureAllBlocksAreReleased();
86+
}
87+
6988
private static MockBlockFactory blockFactory() {
7089
BigArrays bigArrays = new MockBigArrays(PageCacheRecycler.NON_RECYCLING_INSTANCE, ByteSizeValue.ofGb(1)).withCircuitBreaking();
7190
CircuitBreaker breaker = bigArrays.breakerService().getBreaker(CircuitBreaker.REQUEST);

0 commit comments

Comments
 (0)