Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,6 @@
package org.elasticsearch.compute.operator.exchange;

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRunnable;
import org.elasticsearch.action.support.RefCountingRunnable;
import org.elasticsearch.action.support.SubscribableListener;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
Expand All @@ -19,7 +17,6 @@
import org.elasticsearch.core.Releasable;
import org.elasticsearch.tasks.TaskCancelledException;

import java.util.List;
import java.util.Map;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicInteger;
Expand Down Expand Up @@ -51,28 +48,12 @@ public final class ExchangeSourceHandler {
* @param maxBufferSize the maximum size of the exchange buffer. A larger buffer reduces ``pauses`` but uses more memory,
* which could otherwise be allocated for other purposes.
* @param fetchExecutor the executor used to fetch pages.
* @param completionListener a listener that will be notified when the exchange source handler completes
*/
public ExchangeSourceHandler(int maxBufferSize, Executor fetchExecutor, ActionListener<Void> completionListener) {
public ExchangeSourceHandler(int maxBufferSize, Executor fetchExecutor) {
this.buffer = new ExchangeBuffer(maxBufferSize);
this.fetchExecutor = fetchExecutor;
this.outstandingSinks = new PendingInstances(() -> buffer.finish(false));
final PendingInstances closingSinks = new PendingInstances(() -> {});
closingSinks.trackNewInstance();
this.outstandingSources = new PendingInstances(() -> finishEarly(true, ActionListener.running(closingSinks::finishInstance)));
buffer.addCompletionListener(ActionListener.running(() -> {
final ActionListener<Void> listener = ActionListener.assertAtLeastOnce(completionListener);
try (RefCountingRunnable refs = new RefCountingRunnable(ActionRunnable.run(listener, this::checkFailure))) {
closingSinks.completion.addListener(refs.acquireListener());
for (PendingInstances pending : List.of(outstandingSinks, outstandingSources)) {
// Create an outstanding instance and then finish to complete the completionListener
// if we haven't registered any instances of exchange sinks or exchange sources before.
pending.trackNewInstance();
pending.completion.addListener(refs.acquireListener());
pending.finishInstance();
}
}
}));
this.outstandingSources = new PendingInstances(() -> finishEarly(true, ActionListener.noop()));
}

private void checkFailure() {
Expand Down Expand Up @@ -271,7 +252,13 @@ public void addRemoteSink(
final ActionListener<Void> sinkListener = ActionListener.assertAtLeastOnce(
ActionListener.notifyOnce(ActionListener.runBefore(listener, () -> remoteSinks.remove(sinkId)))
);
final Releasable emptySink = addEmptySink();
fetchExecutor.execute(new AbstractRunnable() {
@Override
public void onAfter() {
emptySink.close();
}

@Override
public void onFailure(Exception e) {
if (failFast) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -334,8 +334,7 @@ public void testResumeOnEarlyFinish() throws Exception {
DriverContext driverContext = driverContext();
ThreadPool threadPool = threadPool();
try {
PlainActionFuture<Void> sourceFuture = new PlainActionFuture<>();
var sourceHandler = new ExchangeSourceHandler(between(1, 5), threadPool.executor("esql"), sourceFuture);
var sourceHandler = new ExchangeSourceHandler(between(1, 5), threadPool.executor("esql"));
var sinkHandler = new ExchangeSinkHandler(driverContext.blockFactory(), between(1, 5), System::currentTimeMillis);
var sourceOperator = new ExchangeSourceOperator(sourceHandler.createExchangeSource());
var sinkOperator = new ExchangeSinkOperator(sinkHandler.createExchangeSink(() -> {}), Function.identity());
Expand All @@ -351,7 +350,6 @@ public void testResumeOnEarlyFinish() throws Exception {
sinkHandler.fetchPageAsync(true, ActionListener.noop());
future.actionGet(5, TimeUnit.SECONDS);
assertThat(driver.status().status(), equalTo(DriverStatus.Status.DONE));
sourceFuture.actionGet(5, TimeUnit.SECONDS);
} finally {
terminate(threadPool);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -216,11 +216,7 @@ List<Driver> createDriversForInput(List<Page> input, List<Page> results, boolean
randomIntBetween(2, 10),
threadPool.relativeTimeInMillisSupplier()
);
ExchangeSourceHandler sourceExchanger = new ExchangeSourceHandler(
randomIntBetween(1, 4),
threadPool.executor(ESQL_TEST_EXECUTOR),
ActionListener.noop()
);
ExchangeSourceHandler sourceExchanger = new ExchangeSourceHandler(randomIntBetween(1, 4), threadPool.executor(ESQL_TEST_EXECUTOR));
sourceExchanger.addRemoteSink(
sinkExchanger::fetchPageAsync,
randomBoolean(),
Expand Down
Loading