Skip to content

Commit f7e1d9c

Browse files
dnhatnsmalyshev
authored andcommitted
Fix tests
1 parent beb7b02 commit f7e1d9c

File tree

3 files changed

+13
-11
lines changed

3 files changed

+13
-11
lines changed

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/exchange/ExchangeService.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -366,13 +366,13 @@ public void close(ActionListener<Void> listener) {
366366
);
367367
actual.addListener(listener);
368368
if (candidate == actual) {
369-
doFetchPageAsync(true, candidate.delegateFailure((l, r) -> {
369+
doFetchPageAsync(true, ActionListener.wrap(r -> {
370370
final Page page = r.takePage();
371371
if (page != null) {
372372
page.releaseBlocks();
373373
}
374-
l.onResponse(null);
375-
}));
374+
candidate.onResponse(null);
375+
}, e -> candidate.onResponse(null)));
376376
}
377377
}
378378
}

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/exchange/ExchangeSourceHandler.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -60,10 +60,7 @@ public ExchangeSourceHandler(int maxBufferSize, Executor fetchExecutor, ActionLi
6060
this.outstandingSinks = new PendingInstances(() -> buffer.finish(false));
6161
final PendingInstances closingSinks = new PendingInstances(() -> {});
6262
closingSinks.trackNewInstance();
63-
this.outstandingSources = new PendingInstances(() -> {
64-
buffer.finish(true);
65-
finishEarly(ActionListener.running(closingSinks::finishInstance));
66-
});
63+
this.outstandingSources = new PendingInstances(() -> finishEarly(true, ActionListener.running(closingSinks::finishInstance)));
6764
buffer.addCompletionListener(ActionListener.running(() -> {
6865
final ActionListener<Void> listener = ActionListener.assertAtLeastOnce(completionListener);
6966
try (RefCountingRunnable refs = new RefCountingRunnable(() -> {
@@ -309,9 +306,12 @@ public Releasable addEmptySink() {
309306
* Gracefully terminates the exchange source early by instructing all remote exchange sinks to stop their computations.
310307
* This can happen when the exchange source has accumulated enough data (e.g., reaching the LIMIT) or when users want to
311308
* see the current result immediately.
309+
*
310+
* @param drainingPages whether to discard pages already fetched in the exchange
312311
*/
313-
public void finishEarly(ActionListener<Void> listener) {
314-
try (RefCountingListener refs = new RefCountingListener(listener)) {
312+
public void finishEarly(boolean drainingPages, ActionListener<Void> listener) {
313+
buffer.finish(drainingPages);
314+
try (EsqlRefCountingListener refs = new EsqlRefCountingListener(listener)) {
315315
for (RemoteSink remoteSink : remoteSinks.values()) {
316316
remoteSink.close(refs.acquire());
317317
}

x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/exchange/ExchangeServiceTests.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -477,7 +477,7 @@ public void testFinishEarly() throws Exception {
477477
}
478478
}, false, between(1, 3), sinkCompleted);
479479
threadPool.schedule(
480-
() -> sourceHandler.finishEarly(ActionListener.noop()),
480+
() -> sourceHandler.finishEarly(randomBoolean(), ActionListener.noop()),
481481
TimeValue.timeValueMillis(between(0, 10)),
482482
threadPool.generic()
483483
);
@@ -487,8 +487,10 @@ public void testFinishEarly() throws Exception {
487487
assertSame(p, pages.poll());
488488
p.releaseBlocks();
489489
}
490+
while ((p = pages.poll()) != null) {
491+
p.releaseBlocks();
492+
}
490493
assertTrue(exchangeSource.isFinished());
491-
assertNull(pages.poll());
492494
exchangeSource.finish();
493495
}
494496

0 commit comments

Comments
 (0)