Skip to content

Commit af14de5

Browse files
authored
Allow early termination of exchange source (#118129) (#118191)
This change introduces the ability to gracefully terminate the exchange source early by instructing all remote exchange sinks to stop their computations. 1. When sufficient data has been accumulated (e.g., reaching the LIMIT), the exchange source signals remote sinks to stop generating new pages, allowing the query to finish sooner. 2. When users request immediate results, even if they are partial, incomplete, or potentially inaccurate.
1 parent fe4f510 commit af14de5

File tree

4 files changed

+111
-16
lines changed

4 files changed

+111
-16
lines changed

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/exchange/ExchangeService.java

Lines changed: 22 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
import org.elasticsearch.action.ActionListener;
1515
import org.elasticsearch.action.ActionListenerResponseHandler;
1616
import org.elasticsearch.action.support.ChannelActionListener;
17+
import org.elasticsearch.action.support.SubscribableListener;
1718
import org.elasticsearch.common.component.AbstractLifecycleComponent;
1819
import org.elasticsearch.common.io.stream.StreamInput;
1920
import org.elasticsearch.common.io.stream.StreamOutput;
@@ -23,6 +24,7 @@
2324
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
2425
import org.elasticsearch.compute.data.BlockFactory;
2526
import org.elasticsearch.compute.data.BlockStreamInput;
27+
import org.elasticsearch.compute.data.Page;
2628
import org.elasticsearch.core.Nullable;
2729
import org.elasticsearch.core.TimeValue;
2830
import org.elasticsearch.tasks.CancellableTask;
@@ -40,10 +42,11 @@
4042

4143
import java.io.IOException;
4244
import java.util.Map;
45+
import java.util.Objects;
4346
import java.util.Set;
4447
import java.util.concurrent.Executor;
45-
import java.util.concurrent.atomic.AtomicBoolean;
4648
import java.util.concurrent.atomic.AtomicLong;
49+
import java.util.concurrent.atomic.AtomicReference;
4750

4851
/**
4952
* {@link ExchangeService} is responsible for exchanging pages between exchange sinks and sources on the same or different nodes.
@@ -293,7 +296,7 @@ static final class TransportRemoteSink implements RemoteSink {
293296
final Executor responseExecutor;
294297

295298
final AtomicLong estimatedPageSizeInBytes = new AtomicLong(0L);
296-
final AtomicBoolean finished = new AtomicBoolean(false);
299+
final AtomicReference<SubscribableListener<Void>> completionListenerRef = new AtomicReference<>(null);
297300

298301
TransportRemoteSink(
299302
TransportService transportService,
@@ -318,13 +321,14 @@ public void fetchPageAsync(boolean allSourcesFinished, ActionListener<ExchangeRe
318321
return;
319322
}
320323
// already finished
321-
if (finished.get()) {
322-
listener.onResponse(new ExchangeResponse(blockFactory, null, true));
324+
SubscribableListener<Void> completionListener = completionListenerRef.get();
325+
if (completionListener != null) {
326+
completionListener.addListener(listener.map(unused -> new ExchangeResponse(blockFactory, null, true)));
323327
return;
324328
}
325329
doFetchPageAsync(false, ActionListener.wrap(r -> {
326330
if (r.finished()) {
327-
finished.set(true);
331+
completionListenerRef.compareAndSet(null, SubscribableListener.newSucceeded(null));
328332
}
329333
listener.onResponse(r);
330334
}, e -> close(ActionListener.running(() -> listener.onFailure(e)))));
@@ -356,10 +360,19 @@ private void doFetchPageAsync(boolean allSourcesFinished, ActionListener<Exchang
356360

357361
@Override
358362
public void close(ActionListener<Void> listener) {
359-
if (finished.compareAndSet(false, true)) {
360-
doFetchPageAsync(true, listener.delegateFailure((l, unused) -> l.onResponse(null)));
361-
} else {
362-
listener.onResponse(null);
363+
final SubscribableListener<Void> candidate = new SubscribableListener<>();
364+
final SubscribableListener<Void> actual = completionListenerRef.updateAndGet(
365+
curr -> Objects.requireNonNullElse(curr, candidate)
366+
);
367+
actual.addListener(listener);
368+
if (candidate == actual) {
369+
doFetchPageAsync(true, ActionListener.wrap(r -> {
370+
final Page page = r.takePage();
371+
if (page != null) {
372+
page.releaseBlocks();
373+
}
374+
candidate.onResponse(null);
375+
}, e -> candidate.onResponse(null)));
363376
}
364377
}
365378
}

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/exchange/ExchangeSourceHandler.java

Lines changed: 30 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,13 +12,15 @@
1212
import org.elasticsearch.action.support.RefCountingRunnable;
1313
import org.elasticsearch.action.support.SubscribableListener;
1414
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
15+
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
1516
import org.elasticsearch.compute.EsqlRefCountingListener;
1617
import org.elasticsearch.compute.data.Page;
1718
import org.elasticsearch.compute.operator.FailureCollector;
1819
import org.elasticsearch.compute.operator.IsBlockedResult;
1920
import org.elasticsearch.core.Releasable;
2021

2122
import java.util.List;
23+
import java.util.Map;
2224
import java.util.concurrent.Executor;
2325
import java.util.concurrent.atomic.AtomicInteger;
2426

@@ -41,6 +43,9 @@ public final class ExchangeSourceHandler {
4143
// The final failure collected will be notified to callers via the {@code completionListener}.
4244
private final FailureCollector failure = new FailureCollector();
4345

46+
private final AtomicInteger nextSinkId = new AtomicInteger();
47+
private final Map<Integer, RemoteSink> remoteSinks = ConcurrentCollections.newConcurrentMap();
48+
4449
/**
4550
* Creates a new ExchangeSourceHandler.
4651
*
@@ -53,7 +58,9 @@ public ExchangeSourceHandler(int maxBufferSize, Executor fetchExecutor, ActionLi
5358
this.buffer = new ExchangeBuffer(maxBufferSize);
5459
this.fetchExecutor = fetchExecutor;
5560
this.outstandingSinks = new PendingInstances(() -> buffer.finish(false));
56-
this.outstandingSources = new PendingInstances(() -> buffer.finish(true));
61+
final PendingInstances closingSinks = new PendingInstances(() -> {});
62+
closingSinks.trackNewInstance();
63+
this.outstandingSources = new PendingInstances(() -> finishEarly(true, ActionListener.running(closingSinks::finishInstance)));
5764
buffer.addCompletionListener(ActionListener.running(() -> {
5865
final ActionListener<Void> listener = ActionListener.assertAtLeastOnce(completionListener);
5966
try (RefCountingRunnable refs = new RefCountingRunnable(() -> {
@@ -64,6 +71,7 @@ public ExchangeSourceHandler(int maxBufferSize, Executor fetchExecutor, ActionLi
6471
listener.onResponse(null);
6572
}
6673
})) {
74+
closingSinks.completion.addListener(refs.acquireListener());
6775
for (PendingInstances pending : List.of(outstandingSinks, outstandingSources)) {
6876
// Create an outstanding instance and then finish to complete the completionListener
6977
// if we haven't registered any instances of exchange sinks or exchange sources before.
@@ -257,7 +265,11 @@ void onSinkComplete() {
257265
* @see ExchangeSinkHandler#fetchPageAsync(boolean, ActionListener)
258266
*/
259267
public void addRemoteSink(RemoteSink remoteSink, boolean failFast, int instances, ActionListener<Void> listener) {
260-
final ActionListener<Void> sinkListener = ActionListener.assertAtLeastOnce(ActionListener.notifyOnce(listener));
268+
final int sinkId = nextSinkId.incrementAndGet();
269+
remoteSinks.put(sinkId, remoteSink);
270+
final ActionListener<Void> sinkListener = ActionListener.assertAtLeastOnce(
271+
ActionListener.notifyOnce(ActionListener.runBefore(listener, () -> remoteSinks.remove(sinkId)))
272+
);
261273
fetchExecutor.execute(new AbstractRunnable() {
262274
@Override
263275
public void onFailure(Exception e) {
@@ -291,6 +303,22 @@ public Releasable addEmptySink() {
291303
return outstandingSinks::finishInstance;
292304
}
293305

306+
/**
307+
* Gracefully terminates the exchange source early by instructing all remote exchange sinks to stop their computations.
308+
* This can happen when the exchange source has accumulated enough data (e.g., reaching the LIMIT) or when users want to
309+
* see the current result immediately.
310+
*
311+
* @param drainingPages whether to discard pages already fetched in the exchange
312+
*/
313+
public void finishEarly(boolean drainingPages, ActionListener<Void> listener) {
314+
buffer.finish(drainingPages);
315+
try (EsqlRefCountingListener refs = new EsqlRefCountingListener(listener)) {
316+
for (RemoteSink remoteSink : remoteSinks.values()) {
317+
remoteSink.close(refs.acquire());
318+
}
319+
}
320+
}
321+
294322
private static class PendingInstances {
295323
private final AtomicInteger instances = new AtomicInteger();
296324
private final SubscribableListener<Void> completion = new SubscribableListener<>();

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/exchange/RemoteSink.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,18 +8,19 @@
88
package org.elasticsearch.compute.operator.exchange;
99

1010
import org.elasticsearch.action.ActionListener;
11+
import org.elasticsearch.compute.data.Page;
1112

1213
public interface RemoteSink {
1314

1415
void fetchPageAsync(boolean allSourcesFinished, ActionListener<ExchangeResponse> listener);
1516

1617
default void close(ActionListener<Void> listener) {
1718
fetchPageAsync(true, listener.delegateFailure((l, r) -> {
18-
try {
19-
r.close();
20-
} finally {
21-
l.onResponse(null);
19+
final Page page = r.takePage();
20+
if (page != null) {
21+
page.releaseBlocks();
2222
}
23+
l.onResponse(null);
2324
}));
2425
}
2526
}

x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/exchange/ExchangeServiceTests.java

Lines changed: 54 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,9 @@
5555
import java.util.ArrayList;
5656
import java.util.Collections;
5757
import java.util.List;
58+
import java.util.Queue;
5859
import java.util.Set;
60+
import java.util.concurrent.Semaphore;
5961
import java.util.concurrent.TimeUnit;
6062
import java.util.concurrent.atomic.AtomicBoolean;
6163
import java.util.concurrent.atomic.AtomicInteger;
@@ -421,7 +423,7 @@ public void testExchangeSourceContinueOnFailure() {
421423
}
422424
}
423425

424-
public void testEarlyTerminate() {
426+
public void testClosingSinks() {
425427
BlockFactory blockFactory = blockFactory();
426428
IntBlock block1 = blockFactory.newConstantIntBlockWith(1, 2);
427429
IntBlock block2 = blockFactory.newConstantIntBlockWith(1, 2);
@@ -441,6 +443,57 @@ public void testEarlyTerminate() {
441443
assertTrue(sink.isFinished());
442444
}
443445

446+
public void testFinishEarly() throws Exception {
447+
ExchangeSourceHandler sourceHandler = new ExchangeSourceHandler(20, threadPool.generic(), ActionListener.noop());
448+
Semaphore permits = new Semaphore(between(1, 5));
449+
BlockFactory blockFactory = blockFactory();
450+
Queue<Page> pages = ConcurrentCollections.newQueue();
451+
ExchangeSource exchangeSource = sourceHandler.createExchangeSource();
452+
AtomicBoolean sinkClosed = new AtomicBoolean();
453+
PlainActionFuture<Void> sinkCompleted = new PlainActionFuture<>();
454+
sourceHandler.addRemoteSink((allSourcesFinished, listener) -> {
455+
if (allSourcesFinished) {
456+
sinkClosed.set(true);
457+
permits.release(10);
458+
listener.onResponse(new ExchangeResponse(blockFactory, null, sinkClosed.get()));
459+
} else {
460+
try {
461+
if (permits.tryAcquire(between(0, 100), TimeUnit.MICROSECONDS)) {
462+
boolean closed = sinkClosed.get();
463+
final Page page;
464+
if (closed) {
465+
page = new Page(blockFactory.newConstantIntBlockWith(1, 1));
466+
pages.add(page);
467+
} else {
468+
page = null;
469+
}
470+
listener.onResponse(new ExchangeResponse(blockFactory, page, closed));
471+
} else {
472+
listener.onResponse(new ExchangeResponse(blockFactory, null, sinkClosed.get()));
473+
}
474+
} catch (Exception e) {
475+
throw new AssertionError(e);
476+
}
477+
}
478+
}, false, between(1, 3), sinkCompleted);
479+
threadPool.schedule(
480+
() -> sourceHandler.finishEarly(randomBoolean(), ActionListener.noop()),
481+
TimeValue.timeValueMillis(between(0, 10)),
482+
threadPool.generic()
483+
);
484+
sinkCompleted.actionGet();
485+
Page p;
486+
while ((p = exchangeSource.pollPage()) != null) {
487+
assertSame(p, pages.poll());
488+
p.releaseBlocks();
489+
}
490+
while ((p = pages.poll()) != null) {
491+
p.releaseBlocks();
492+
}
493+
assertTrue(exchangeSource.isFinished());
494+
exchangeSource.finish();
495+
}
496+
444497
public void testConcurrentWithTransportActions() {
445498
MockTransportService node0 = newTransportService();
446499
ExchangeService exchange0 = new ExchangeService(Settings.EMPTY, threadPool, ESQL_TEST_EXECUTOR, blockFactory());

0 commit comments

Comments
 (0)