Skip to content

Commit cc8068d

Browse files
committed
Keep outstanding pages when finish buffer early (elastic#121857)
Today, the exchange buffer of an exchange source is finished in two cases: (1) when the downstream pipeline has received enough data and (2) when all remote sinks have completed. In the first case, outstanding pages could be safely discarded. In the second case, no new pages should be received after finishing. In both scenarios, discarding all outstanding pages was safe if noMoreInputs was switched while adding pages. However, with the stop API, the buffer may now finish while keeping outstanding pages, and new pages may still be received. This change updates the exchange buffer to discard only the incoming page when noMoreInputs is switched, rather than all pages in the buffer. Closes elastic#120757
1 parent 4ea396a commit cc8068d

File tree

3 files changed

+30
-4
lines changed

3 files changed

+30
-4
lines changed

muted-tests.yml

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -420,9 +420,6 @@ tests:
420420
- class: org.elasticsearch.xpack.test.rest.XPackRestIT
421421
method: test {p0=transform/transforms_start_stop/Test schedule_now on an already started transform}
422422
issue: https://github.com/elastic/elasticsearch/issues/120720
423-
- class: org.elasticsearch.xpack.esql.action.CrossClusterAsyncEnrichStopIT
424-
method: testEnrichAfterStop
425-
issue: https://github.com/elastic/elasticsearch/issues/120757
426423
- class: org.elasticsearch.xpack.shutdown.NodeShutdownIT
427424
method: testStalledShardMigrationProperlyDetected
428425
issue: https://github.com/elastic/elasticsearch/issues/115697

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/exchange/ExchangeBuffer.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,17 @@ void addPage(Page page) {
4747
notifyNotEmpty();
4848
}
4949
if (noMoreInputs) {
50-
discardPages();
50+
// O(N) but acceptable because it only occurs with the stop API, and the queue size should be very small.
51+
if (queue.removeIf(p -> p == page)) {
52+
page.releaseBlocks();
53+
final int size = queueSize.decrementAndGet();
54+
if (size == maxSize - 1) {
55+
notifyNotFull();
56+
}
57+
if (size == 0) {
58+
completionFuture.onResponse(null);
59+
}
60+
}
5161
}
5262
}
5363

x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/exchange/ExchangeBufferTests.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,25 @@ public void testDrainPages() throws Exception {
6666
blockFactory.ensureAllBlocksAreReleased();
6767
}
6868

69+
public void testOutstandingPages() throws Exception {
70+
ExchangeBuffer buffer = new ExchangeBuffer(randomIntBetween(1000, 10000));
71+
var blockFactory = blockFactory();
72+
Page p1 = randomPage(blockFactory);
73+
Page p2 = randomPage(blockFactory);
74+
buffer.addPage(p1);
75+
buffer.addPage(p2);
76+
buffer.finish(false);
77+
buffer.addPage(randomPage(blockFactory));
78+
assertThat(buffer.size(), equalTo(2));
79+
assertSame(buffer.pollPage(), p1);
80+
p1.releaseBlocks();
81+
assertSame(buffer.pollPage(), p2);
82+
p2.releaseBlocks();
83+
assertNull(buffer.pollPage());
84+
assertTrue(buffer.isFinished());
85+
blockFactory.ensureAllBlocksAreReleased();
86+
}
87+
6988
private static MockBlockFactory blockFactory() {
7089
BigArrays bigArrays = new MockBigArrays(PageCacheRecycler.NON_RECYCLING_INSTANCE, ByteSizeValue.ofGb(1)).withCircuitBreaking();
7190
CircuitBreaker breaker = bigArrays.breakerService().getBreaker(CircuitBreaker.REQUEST);

0 commit comments

Comments
 (0)