Skip to content

Commit ed915d3

Browse files
authored
Remove completion listener from ExchangeSourceHandler (elastic#122446) (elastic#122464)
With elastic#117410, each remote sink now has its own listener, and the main query won't finish until all remote sink listeners have completed. As a result, we no longer need to wait for the exchange source to finish. This change removes the completion listener to simplify the exchange service. The completion listener could previously return prematurely while remote sinks were still being registered. Closes elastic#122408
1 parent 5842b3d commit ed915d3

File tree

9 files changed

+119
-145
lines changed

9 files changed

+119
-145
lines changed

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/exchange/ExchangeSourceHandler.java

Lines changed: 8 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,6 @@
88
package org.elasticsearch.compute.operator.exchange;
99

1010
import org.elasticsearch.action.ActionListener;
11-
import org.elasticsearch.action.ActionRunnable;
12-
import org.elasticsearch.action.support.RefCountingRunnable;
1311
import org.elasticsearch.action.support.SubscribableListener;
1412
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
1513
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
@@ -19,7 +17,6 @@
1917
import org.elasticsearch.core.Releasable;
2018
import org.elasticsearch.tasks.TaskCancelledException;
2119

22-
import java.util.List;
2320
import java.util.Map;
2421
import java.util.concurrent.Executor;
2522
import java.util.concurrent.atomic.AtomicInteger;
@@ -51,28 +48,12 @@ public final class ExchangeSourceHandler {
5148
* @param maxBufferSize the maximum size of the exchange buffer. A larger buffer reduces ``pauses`` but uses more memory,
5249
* which could otherwise be allocated for other purposes.
5350
* @param fetchExecutor the executor used to fetch pages.
54-
* @param completionListener a listener that will be notified when the exchange source handler completes
5551
*/
56-
public ExchangeSourceHandler(int maxBufferSize, Executor fetchExecutor, ActionListener<Void> completionListener) {
52+
public ExchangeSourceHandler(int maxBufferSize, Executor fetchExecutor) {
5753
this.buffer = new ExchangeBuffer(maxBufferSize);
5854
this.fetchExecutor = fetchExecutor;
5955
this.outstandingSinks = new PendingInstances(() -> buffer.finish(false));
60-
final PendingInstances closingSinks = new PendingInstances(() -> {});
61-
closingSinks.trackNewInstance();
62-
this.outstandingSources = new PendingInstances(() -> finishEarly(true, ActionListener.running(closingSinks::finishInstance)));
63-
buffer.addCompletionListener(ActionListener.running(() -> {
64-
final ActionListener<Void> listener = ActionListener.assertAtLeastOnce(completionListener);
65-
try (RefCountingRunnable refs = new RefCountingRunnable(ActionRunnable.run(listener, this::checkFailure))) {
66-
closingSinks.completion.addListener(refs.acquireListener());
67-
for (PendingInstances pending : List.of(outstandingSinks, outstandingSources)) {
68-
// Create an outstanding instance and then finish to complete the completionListener
69-
// if we haven't registered any instances of exchange sinks or exchange sources before.
70-
pending.trackNewInstance();
71-
pending.completion.addListener(refs.acquireListener());
72-
pending.finishInstance();
73-
}
74-
}
75-
}));
56+
this.outstandingSources = new PendingInstances(() -> finishEarly(true, ActionListener.noop()));
7657
}
7758

7859
private void checkFailure() {
@@ -271,7 +252,13 @@ public void addRemoteSink(
271252
final ActionListener<Void> sinkListener = ActionListener.assertAtLeastOnce(
272253
ActionListener.notifyOnce(ActionListener.runBefore(listener, () -> remoteSinks.remove(sinkId)))
273254
);
255+
final Releasable emptySink = addEmptySink();
274256
fetchExecutor.execute(new AbstractRunnable() {
257+
@Override
258+
public void onAfter() {
259+
emptySink.close();
260+
}
261+
275262
@Override
276263
public void onFailure(Exception e) {
277264
if (failFast) {

x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/DriverTests.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -331,8 +331,7 @@ public void testResumeOnEarlyFinish() throws Exception {
331331
DriverContext driverContext = driverContext();
332332
ThreadPool threadPool = threadPool();
333333
try {
334-
PlainActionFuture<Void> sourceFuture = new PlainActionFuture<>();
335-
var sourceHandler = new ExchangeSourceHandler(between(1, 5), threadPool.executor("esql"), sourceFuture);
334+
var sourceHandler = new ExchangeSourceHandler(between(1, 5), threadPool.executor("esql"));
336335
var sinkHandler = new ExchangeSinkHandler(driverContext.blockFactory(), between(1, 5), System::currentTimeMillis);
337336
var sourceOperator = new ExchangeSourceOperator(sourceHandler.createExchangeSource());
338337
var sinkOperator = new ExchangeSinkOperator(sinkHandler.createExchangeSink(() -> {}), Function.identity());
@@ -348,7 +347,6 @@ public void testResumeOnEarlyFinish() throws Exception {
348347
sinkHandler.fetchPageAsync(true, ActionListener.noop());
349348
future.actionGet(5, TimeUnit.SECONDS);
350349
assertThat(driver.status().status(), equalTo(DriverStatus.Status.DONE));
351-
sourceFuture.actionGet(5, TimeUnit.SECONDS);
352350
} finally {
353351
terminate(threadPool);
354352
}

x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/ForkingOperatorTestCase.java

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -212,11 +212,7 @@ List<Driver> createDriversForInput(List<Page> input, List<Page> results, boolean
212212
randomIntBetween(2, 10),
213213
threadPool.relativeTimeInMillisSupplier()
214214
);
215-
ExchangeSourceHandler sourceExchanger = new ExchangeSourceHandler(
216-
randomIntBetween(1, 4),
217-
threadPool.executor(ESQL_TEST_EXECUTOR),
218-
ActionListener.noop()
219-
);
215+
ExchangeSourceHandler sourceExchanger = new ExchangeSourceHandler(randomIntBetween(1, 4), threadPool.executor(ESQL_TEST_EXECUTOR));
220216
sourceExchanger.addRemoteSink(
221217
sinkExchanger::fetchPageAsync,
222218
randomBoolean(),

0 commit comments

Comments
 (0)