Skip to content

Commit b8973c1

Browse files
authored
Remove completion listener from ExchangeSourceHandler (elastic#122446) (elastic#122465)
With elastic#117410, each remote sink now has its own listener, and the main query won't finish until all remote sink listeners have completed. As a result, we no longer need to wait for the exchange source to finish. This change removes the completion listener to simplify the exchange service. The completion listener could previously return prematurely while remote sinks were still being registered. Closes elastic#122408
1 parent 3b66e6f commit b8973c1

File tree

9 files changed

+119
-145
lines changed

9 files changed

+119
-145
lines changed

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/exchange/ExchangeSourceHandler.java

Lines changed: 8 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,6 @@
88
package org.elasticsearch.compute.operator.exchange;
99

1010
import org.elasticsearch.action.ActionListener;
11-
import org.elasticsearch.action.ActionRunnable;
12-
import org.elasticsearch.action.support.RefCountingRunnable;
1311
import org.elasticsearch.action.support.SubscribableListener;
1412
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
1513
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
@@ -19,7 +17,6 @@
1917
import org.elasticsearch.core.Releasable;
2018
import org.elasticsearch.tasks.TaskCancelledException;
2119

22-
import java.util.List;
2320
import java.util.Map;
2421
import java.util.concurrent.Executor;
2522
import java.util.concurrent.atomic.AtomicInteger;
@@ -51,28 +48,12 @@ public final class ExchangeSourceHandler {
5148
* @param maxBufferSize the maximum size of the exchange buffer. A larger buffer reduces ``pauses`` but uses more memory,
5249
* which could otherwise be allocated for other purposes.
5350
* @param fetchExecutor the executor used to fetch pages.
54-
* @param completionListener a listener that will be notified when the exchange source handler completes
5551
*/
56-
public ExchangeSourceHandler(int maxBufferSize, Executor fetchExecutor, ActionListener<Void> completionListener) {
52+
public ExchangeSourceHandler(int maxBufferSize, Executor fetchExecutor) {
5753
this.buffer = new ExchangeBuffer(maxBufferSize);
5854
this.fetchExecutor = fetchExecutor;
5955
this.outstandingSinks = new PendingInstances(() -> buffer.finish(false));
60-
final PendingInstances closingSinks = new PendingInstances(() -> {});
61-
closingSinks.trackNewInstance();
62-
this.outstandingSources = new PendingInstances(() -> finishEarly(true, ActionListener.running(closingSinks::finishInstance)));
63-
buffer.addCompletionListener(ActionListener.running(() -> {
64-
final ActionListener<Void> listener = ActionListener.assertAtLeastOnce(completionListener);
65-
try (RefCountingRunnable refs = new RefCountingRunnable(ActionRunnable.run(listener, this::checkFailure))) {
66-
closingSinks.completion.addListener(refs.acquireListener());
67-
for (PendingInstances pending : List.of(outstandingSinks, outstandingSources)) {
68-
// Create an outstanding instance and then finish to complete the completionListener
69-
// if we haven't registered any instances of exchange sinks or exchange sources before.
70-
pending.trackNewInstance();
71-
pending.completion.addListener(refs.acquireListener());
72-
pending.finishInstance();
73-
}
74-
}
75-
}));
56+
this.outstandingSources = new PendingInstances(() -> finishEarly(true, ActionListener.noop()));
7657
}
7758

7859
private void checkFailure() {
@@ -271,7 +252,13 @@ public void addRemoteSink(
271252
final ActionListener<Void> sinkListener = ActionListener.assertAtLeastOnce(
272253
ActionListener.notifyOnce(ActionListener.runBefore(listener, () -> remoteSinks.remove(sinkId)))
273254
);
255+
final Releasable emptySink = addEmptySink();
274256
fetchExecutor.execute(new AbstractRunnable() {
257+
@Override
258+
public void onAfter() {
259+
emptySink.close();
260+
}
261+
275262
@Override
276263
public void onFailure(Exception e) {
277264
if (failFast) {

x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/DriverTests.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -334,8 +334,7 @@ public void testResumeOnEarlyFinish() throws Exception {
334334
DriverContext driverContext = driverContext();
335335
ThreadPool threadPool = threadPool();
336336
try {
337-
PlainActionFuture<Void> sourceFuture = new PlainActionFuture<>();
338-
var sourceHandler = new ExchangeSourceHandler(between(1, 5), threadPool.executor("esql"), sourceFuture);
337+
var sourceHandler = new ExchangeSourceHandler(between(1, 5), threadPool.executor("esql"));
339338
var sinkHandler = new ExchangeSinkHandler(driverContext.blockFactory(), between(1, 5), System::currentTimeMillis);
340339
var sourceOperator = new ExchangeSourceOperator(sourceHandler.createExchangeSource());
341340
var sinkOperator = new ExchangeSinkOperator(sinkHandler.createExchangeSink(() -> {}), Function.identity());
@@ -351,7 +350,6 @@ public void testResumeOnEarlyFinish() throws Exception {
351350
sinkHandler.fetchPageAsync(true, ActionListener.noop());
352351
future.actionGet(5, TimeUnit.SECONDS);
353352
assertThat(driver.status().status(), equalTo(DriverStatus.Status.DONE));
354-
sourceFuture.actionGet(5, TimeUnit.SECONDS);
355353
} finally {
356354
terminate(threadPool);
357355
}

x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/ForkingOperatorTestCase.java

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -216,11 +216,7 @@ List<Driver> createDriversForInput(List<Page> input, List<Page> results, boolean
216216
randomIntBetween(2, 10),
217217
threadPool.relativeTimeInMillisSupplier()
218218
);
219-
ExchangeSourceHandler sourceExchanger = new ExchangeSourceHandler(
220-
randomIntBetween(1, 4),
221-
threadPool.executor(ESQL_TEST_EXECUTOR),
222-
ActionListener.noop()
223-
);
219+
ExchangeSourceHandler sourceExchanger = new ExchangeSourceHandler(randomIntBetween(1, 4), threadPool.executor(ESQL_TEST_EXECUTOR));
224220
sourceExchanger.addRemoteSink(
225221
sinkExchanger::fetchPageAsync,
226222
randomBoolean(),

0 commit comments

Comments
 (0)