File tree Expand file tree Collapse file tree 3 files changed +30
-4
lines changed
x-pack/plugin/esql/compute/src
main/java/org/elasticsearch/compute/operator/exchange
test/java/org/elasticsearch/compute/operator/exchange Expand file tree Collapse file tree 3 files changed +30
-4
lines changed Original file line number Diff line number Diff line change @@ -209,9 +209,6 @@ tests:
209209 issue : https://github.com/elastic/elasticsearch/issues/120668
210210- class : org.elasticsearch.xpack.security.authc.ldap.ADLdapUserSearchSessionFactoryTests
211211 issue : https://github.com/elastic/elasticsearch/issues/119882
212- - class : org.elasticsearch.xpack.esql.action.CrossClusterAsyncEnrichStopIT
213- method : testEnrichAfterStop
214- issue : https://github.com/elastic/elasticsearch/issues/120757
215212- class : org.elasticsearch.xpack.test.rest.XPackRestIT
216213 method : test {p0=ml/3rd_party_deployment/Test start deployment fails while model download in progress}
217214 issue : https://github.com/elastic/elasticsearch/issues/120810
Original file line number Diff line number Diff line change @@ -47,7 +47,17 @@ void addPage(Page page) {
4747 notifyNotEmpty ();
4848 }
4949 if (noMoreInputs ) {
50- discardPages ();
50+ // O(N) but acceptable because it only occurs with the stop API, and the queue size should be very small.
51+ if (queue .removeIf (p -> p == page )) {
52+ page .releaseBlocks ();
53+ final int size = queueSize .decrementAndGet ();
54+ if (size == maxSize - 1 ) {
55+ notifyNotFull ();
56+ }
57+ if (size == 0 ) {
58+ completionFuture .onResponse (null );
59+ }
60+ }
5161 }
5262 }
5363
Original file line number Diff line number Diff line change @@ -66,6 +66,25 @@ public void testDrainPages() throws Exception {
6666 blockFactory .ensureAllBlocksAreReleased ();
6767 }
6868
69+ public void testOutstandingPages () throws Exception {
70+ ExchangeBuffer buffer = new ExchangeBuffer (randomIntBetween (1000 , 10000 ));
71+ var blockFactory = blockFactory ();
72+ Page p1 = randomPage (blockFactory );
73+ Page p2 = randomPage (blockFactory );
74+ buffer .addPage (p1 );
75+ buffer .addPage (p2 );
76+ buffer .finish (false );
77+ buffer .addPage (randomPage (blockFactory ));
78+ assertThat (buffer .size (), equalTo (2 ));
79+ assertSame (buffer .pollPage (), p1 );
80+ p1 .releaseBlocks ();
81+ assertSame (buffer .pollPage (), p2 );
82+ p2 .releaseBlocks ();
83+ assertNull (buffer .pollPage ());
84+ assertTrue (buffer .isFinished ());
85+ blockFactory .ensureAllBlocksAreReleased ();
86+ }
87+
6988 private static MockBlockFactory blockFactory () {
7089 BigArrays bigArrays = new MockBigArrays (PageCacheRecycler .NON_RECYCLING_INSTANCE , ByteSizeValue .ofGb (1 )).withCircuitBreaking ();
7190 CircuitBreaker breaker = bigArrays .breakerService ().getBreaker (CircuitBreaker .REQUEST );
You can’t perform that action at this time.
0 commit comments