Skip to content

Commit 839d511

Browse files
committed
Keep outstanding pages when finish buffer early (elastic#121857)
Today, the exchange buffer of an exchange source is finished in two cases: (1) when the downstream pipeline has received enough data and (2) when all remote sinks have completed. In the first case, outstanding pages could be safely discarded. In the second case, no new pages should be received after finishing. In both scenarios, discarding all outstanding pages was safe if noMoreInputs was switched while adding pages. However, with the stop API, the buffer may now finish while keeping outstanding pages, and new pages may still be received. This change updates the exchange buffer to discard only the incoming page when noMoreInputs is switched, rather than all pages in the buffer. Closes elastic#120757
1 parent 4e6e0d4 commit 839d511

File tree

3 files changed

+30
-4
lines changed

3 files changed

+30
-4
lines changed

muted-tests.yml

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -429,9 +429,6 @@ tests:
429429
- class: org.elasticsearch.xpack.test.rest.XPackRestIT
430430
method: test {p0=transform/transforms_start_stop/Test schedule_now on an already started transform}
431431
issue: https://github.com/elastic/elasticsearch/issues/120720
432-
- class: org.elasticsearch.xpack.esql.action.CrossClusterAsyncEnrichStopIT
433-
method: testEnrichAfterStop
434-
issue: https://github.com/elastic/elasticsearch/issues/120757
435432
- class: org.elasticsearch.xpack.shutdown.NodeShutdownIT
436433
method: testStalledShardMigrationProperlyDetected
437434
issue: https://github.com/elastic/elasticsearch/issues/115697

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/exchange/ExchangeBuffer.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,17 @@ void addPage(Page page) {
4747
notifyNotEmpty();
4848
}
4949
if (noMoreInputs) {
50-
discardPages();
50+
// O(N) but acceptable because it only occurs with the stop API, and the queue size should be very small.
51+
if (queue.removeIf(p -> p == page)) {
52+
page.releaseBlocks();
53+
final int size = queueSize.decrementAndGet();
54+
if (size == maxSize - 1) {
55+
notifyNotFull();
56+
}
57+
if (size == 0) {
58+
completionFuture.onResponse(null);
59+
}
60+
}
5161
}
5262
}
5363

x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/exchange/ExchangeBufferTests.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,25 @@ public void testDrainPages() throws Exception {
6666
blockFactory.ensureAllBlocksAreReleased();
6767
}
6868

69+
public void testOutstandingPages() throws Exception {
70+
ExchangeBuffer buffer = new ExchangeBuffer(randomIntBetween(1000, 10000));
71+
var blockFactory = blockFactory();
72+
Page p1 = randomPage(blockFactory);
73+
Page p2 = randomPage(blockFactory);
74+
buffer.addPage(p1);
75+
buffer.addPage(p2);
76+
buffer.finish(false);
77+
buffer.addPage(randomPage(blockFactory));
78+
assertThat(buffer.size(), equalTo(2));
79+
assertSame(buffer.pollPage(), p1);
80+
p1.releaseBlocks();
81+
assertSame(buffer.pollPage(), p2);
82+
p2.releaseBlocks();
83+
assertNull(buffer.pollPage());
84+
assertTrue(buffer.isFinished());
85+
blockFactory.ensureAllBlocksAreReleased();
86+
}
87+
6988
private static MockBlockFactory blockFactory() {
7089
BigArrays bigArrays = new MockBigArrays(PageCacheRecycler.NON_RECYCLING_INSTANCE, ByteSizeValue.ofGb(1)).withCircuitBreaking();
7190
CircuitBreaker breaker = bigArrays.breakerService().getBreaker(CircuitBreaker.REQUEST);

0 commit comments

Comments
 (0)