Skip to content

Commit beb7b02

Browse files
dnhatnsmalyshev
authored andcommitted
Allow close exchange early
1 parent 932d0f9 commit beb7b02

File tree

4 files changed

+108
-16
lines changed

4 files changed

+108
-16
lines changed

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/exchange/ExchangeService.java

Lines changed: 22 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
import org.elasticsearch.action.ActionListener;
1515
import org.elasticsearch.action.ActionListenerResponseHandler;
1616
import org.elasticsearch.action.support.ChannelActionListener;
17+
import org.elasticsearch.action.support.SubscribableListener;
1718
import org.elasticsearch.common.component.AbstractLifecycleComponent;
1819
import org.elasticsearch.common.io.stream.StreamInput;
1920
import org.elasticsearch.common.io.stream.StreamOutput;
@@ -23,6 +24,7 @@
2324
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
2425
import org.elasticsearch.compute.data.BlockFactory;
2526
import org.elasticsearch.compute.data.BlockStreamInput;
27+
import org.elasticsearch.compute.data.Page;
2628
import org.elasticsearch.core.Nullable;
2729
import org.elasticsearch.core.TimeValue;
2830
import org.elasticsearch.tasks.CancellableTask;
@@ -40,10 +42,11 @@
4042

4143
import java.io.IOException;
4244
import java.util.Map;
45+
import java.util.Objects;
4346
import java.util.Set;
4447
import java.util.concurrent.Executor;
45-
import java.util.concurrent.atomic.AtomicBoolean;
4648
import java.util.concurrent.atomic.AtomicLong;
49+
import java.util.concurrent.atomic.AtomicReference;
4750

4851
/**
4952
* {@link ExchangeService} is responsible for exchanging pages between exchange sinks and sources on the same or different nodes.
@@ -293,7 +296,7 @@ static final class TransportRemoteSink implements RemoteSink {
293296
final Executor responseExecutor;
294297

295298
final AtomicLong estimatedPageSizeInBytes = new AtomicLong(0L);
296-
final AtomicBoolean finished = new AtomicBoolean(false);
299+
final AtomicReference<SubscribableListener<Void>> completionListenerRef = new AtomicReference<>(null);
297300

298301
TransportRemoteSink(
299302
TransportService transportService,
@@ -318,13 +321,14 @@ public void fetchPageAsync(boolean allSourcesFinished, ActionListener<ExchangeRe
318321
return;
319322
}
320323
// already finished
321-
if (finished.get()) {
322-
listener.onResponse(new ExchangeResponse(blockFactory, null, true));
324+
SubscribableListener<Void> completionListener = completionListenerRef.get();
325+
if (completionListener != null) {
326+
completionListener.addListener(listener.map(unused -> new ExchangeResponse(blockFactory, null, true)));
323327
return;
324328
}
325329
doFetchPageAsync(false, ActionListener.wrap(r -> {
326330
if (r.finished()) {
327-
finished.set(true);
331+
completionListenerRef.compareAndSet(null, SubscribableListener.newSucceeded(null));
328332
}
329333
listener.onResponse(r);
330334
}, e -> close(ActionListener.running(() -> listener.onFailure(e)))));
@@ -356,10 +360,19 @@ private void doFetchPageAsync(boolean allSourcesFinished, ActionListener<Exchang
356360

357361
@Override
358362
public void close(ActionListener<Void> listener) {
359-
if (finished.compareAndSet(false, true)) {
360-
doFetchPageAsync(true, listener.delegateFailure((l, unused) -> l.onResponse(null)));
361-
} else {
362-
listener.onResponse(null);
363+
final SubscribableListener<Void> candidate = new SubscribableListener<>();
364+
final SubscribableListener<Void> actual = completionListenerRef.updateAndGet(
365+
curr -> Objects.requireNonNullElse(curr, candidate)
366+
);
367+
actual.addListener(listener);
368+
if (candidate == actual) {
369+
doFetchPageAsync(true, candidate.delegateFailure((l, r) -> {
370+
final Page page = r.takePage();
371+
if (page != null) {
372+
page.releaseBlocks();
373+
}
374+
l.onResponse(null);
375+
}));
363376
}
364377
}
365378
}

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/exchange/ExchangeSourceHandler.java

Lines changed: 29 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,12 +13,14 @@
1313
import org.elasticsearch.action.support.SubscribableListener;
1414
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
1515
import org.elasticsearch.compute.EsqlRefCountingListener;
16+
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
1617
import org.elasticsearch.compute.data.Page;
1718
import org.elasticsearch.compute.operator.FailureCollector;
1819
import org.elasticsearch.compute.operator.IsBlockedResult;
1920
import org.elasticsearch.core.Releasable;
2021

2122
import java.util.List;
23+
import java.util.Map;
2224
import java.util.concurrent.Executor;
2325
import java.util.concurrent.atomic.AtomicInteger;
2426

@@ -41,6 +43,9 @@ public final class ExchangeSourceHandler {
4143
// The final failure collected will be notified to callers via the {@code completionListener}.
4244
private final FailureCollector failure = new FailureCollector();
4345

46+
private final AtomicInteger nextSinkId = new AtomicInteger();
47+
private final Map<Integer, RemoteSink> remoteSinks = ConcurrentCollections.newConcurrentMap();
48+
4449
/**
4550
* Creates a new ExchangeSourceHandler.
4651
*
@@ -53,7 +58,12 @@ public ExchangeSourceHandler(int maxBufferSize, Executor fetchExecutor, ActionLi
5358
this.buffer = new ExchangeBuffer(maxBufferSize);
5459
this.fetchExecutor = fetchExecutor;
5560
this.outstandingSinks = new PendingInstances(() -> buffer.finish(false));
56-
this.outstandingSources = new PendingInstances(() -> buffer.finish(true));
61+
final PendingInstances closingSinks = new PendingInstances(() -> {});
62+
closingSinks.trackNewInstance();
63+
this.outstandingSources = new PendingInstances(() -> {
64+
buffer.finish(true);
65+
finishEarly(ActionListener.running(closingSinks::finishInstance));
66+
});
5767
buffer.addCompletionListener(ActionListener.running(() -> {
5868
final ActionListener<Void> listener = ActionListener.assertAtLeastOnce(completionListener);
5969
try (RefCountingRunnable refs = new RefCountingRunnable(() -> {
@@ -257,7 +267,11 @@ void onSinkComplete() {
257267
* @see ExchangeSinkHandler#fetchPageAsync(boolean, ActionListener)
258268
*/
259269
public void addRemoteSink(RemoteSink remoteSink, boolean failFast, int instances, ActionListener<Void> listener) {
260-
final ActionListener<Void> sinkListener = ActionListener.assertAtLeastOnce(ActionListener.notifyOnce(listener));
270+
final int sinkId = nextSinkId.incrementAndGet();
271+
remoteSinks.put(sinkId, remoteSink);
272+
final ActionListener<Void> sinkListener = ActionListener.assertAtLeastOnce(
273+
ActionListener.notifyOnce(ActionListener.runBefore(listener, () -> remoteSinks.remove(sinkId)))
274+
);
261275
fetchExecutor.execute(new AbstractRunnable() {
262276
@Override
263277
public void onFailure(Exception e) {
@@ -291,6 +305,19 @@ public Releasable addEmptySink() {
291305
return outstandingSinks::finishInstance;
292306
}
293307

308+
/**
309+
* Gracefully terminates the exchange source early by instructing all remote exchange sinks to stop their computations.
310+
* This can happen when the exchange source has accumulated enough data (e.g., reaching the LIMIT) or when users want to
311+
* see the current result immediately.
312+
*/
313+
public void finishEarly(ActionListener<Void> listener) {
314+
try (RefCountingListener refs = new RefCountingListener(listener)) {
315+
for (RemoteSink remoteSink : remoteSinks.values()) {
316+
remoteSink.close(refs.acquire());
317+
}
318+
}
319+
}
320+
294321
private static class PendingInstances {
295322
private final AtomicInteger instances = new AtomicInteger();
296323
private final SubscribableListener<Void> completion = new SubscribableListener<>();

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/exchange/RemoteSink.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,18 +8,19 @@
88
package org.elasticsearch.compute.operator.exchange;
99

1010
import org.elasticsearch.action.ActionListener;
11+
import org.elasticsearch.compute.data.Page;
1112

1213
public interface RemoteSink {
1314

1415
void fetchPageAsync(boolean allSourcesFinished, ActionListener<ExchangeResponse> listener);
1516

1617
default void close(ActionListener<Void> listener) {
1718
fetchPageAsync(true, listener.delegateFailure((l, r) -> {
18-
try {
19-
r.close();
20-
} finally {
21-
l.onResponse(null);
19+
final Page page = r.takePage();
20+
if (page != null) {
21+
page.releaseBlocks();
2222
}
23+
l.onResponse(null);
2324
}));
2425
}
2526
}

x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/exchange/ExchangeServiceTests.java

Lines changed: 52 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,9 @@
5555
import java.util.ArrayList;
5656
import java.util.Collections;
5757
import java.util.List;
58+
import java.util.Queue;
5859
import java.util.Set;
60+
import java.util.concurrent.Semaphore;
5961
import java.util.concurrent.TimeUnit;
6062
import java.util.concurrent.atomic.AtomicBoolean;
6163
import java.util.concurrent.atomic.AtomicInteger;
@@ -421,7 +423,7 @@ public void testExchangeSourceContinueOnFailure() {
421423
}
422424
}
423425

424-
public void testEarlyTerminate() {
426+
public void testClosingSinks() {
425427
BlockFactory blockFactory = blockFactory();
426428
IntBlock block1 = blockFactory.newConstantIntBlockWith(1, 2);
427429
IntBlock block2 = blockFactory.newConstantIntBlockWith(1, 2);
@@ -441,6 +443,55 @@ public void testEarlyTerminate() {
441443
assertTrue(sink.isFinished());
442444
}
443445

446+
public void testFinishEarly() throws Exception {
447+
ExchangeSourceHandler sourceHandler = new ExchangeSourceHandler(20, threadPool.generic(), ActionListener.noop());
448+
Semaphore permits = new Semaphore(between(1, 5));
449+
BlockFactory blockFactory = blockFactory();
450+
Queue<Page> pages = ConcurrentCollections.newQueue();
451+
ExchangeSource exchangeSource = sourceHandler.createExchangeSource();
452+
AtomicBoolean sinkClosed = new AtomicBoolean();
453+
PlainActionFuture<Void> sinkCompleted = new PlainActionFuture<>();
454+
sourceHandler.addRemoteSink((allSourcesFinished, listener) -> {
455+
if (allSourcesFinished) {
456+
sinkClosed.set(true);
457+
permits.release(10);
458+
listener.onResponse(new ExchangeResponse(blockFactory, null, sinkClosed.get()));
459+
} else {
460+
try {
461+
if (permits.tryAcquire(between(0, 100), TimeUnit.MICROSECONDS)) {
462+
boolean closed = sinkClosed.get();
463+
final Page page;
464+
if (closed) {
465+
page = new Page(blockFactory.newConstantIntBlockWith(1, 1));
466+
pages.add(page);
467+
} else {
468+
page = null;
469+
}
470+
listener.onResponse(new ExchangeResponse(blockFactory, page, closed));
471+
} else {
472+
listener.onResponse(new ExchangeResponse(blockFactory, null, sinkClosed.get()));
473+
}
474+
} catch (Exception e) {
475+
throw new AssertionError(e);
476+
}
477+
}
478+
}, false, between(1, 3), sinkCompleted);
479+
threadPool.schedule(
480+
() -> sourceHandler.finishEarly(ActionListener.noop()),
481+
TimeValue.timeValueMillis(between(0, 10)),
482+
threadPool.generic()
483+
);
484+
sinkCompleted.actionGet();
485+
Page p;
486+
while ((p = exchangeSource.pollPage()) != null) {
487+
assertSame(p, pages.poll());
488+
p.releaseBlocks();
489+
}
490+
assertTrue(exchangeSource.isFinished());
491+
assertNull(pages.poll());
492+
exchangeSource.finish();
493+
}
494+
444495
public void testConcurrentWithTransportActions() {
445496
MockTransportService node0 = newTransportService();
446497
ExchangeService exchange0 = new ExchangeService(Settings.EMPTY, threadPool, ESQL_TEST_EXECUTOR, blockFactory());

0 commit comments

Comments
 (0)