Skip to content

Commit b5a352e

Browse files
authored
Try to finish remote sink once (#117592) (#117670)
Currently, we have three clients fetching pages by default, each with its own lifecycle. This can result in scenarios where more than one request is sent to complete the remote sink. While this does not cause correctness issues, it is inefficient, especially for cross-cluster requests. This change tracks the status of the remote sink and tries to send only one finish request per remote sink.
1 parent 303d93a commit b5a352e

File tree

2 files changed

+38
-1
lines changed

2 files changed

+38
-1
lines changed

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/exchange/ExchangeService.java

Lines changed: 29 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@
4242
import java.util.Map;
4343
import java.util.Set;
4444
import java.util.concurrent.Executor;
45+
import java.util.concurrent.atomic.AtomicBoolean;
4546
import java.util.concurrent.atomic.AtomicLong;
4647

4748
/**
@@ -292,6 +293,7 @@ static final class TransportRemoteSink implements RemoteSink {
292293
final Executor responseExecutor;
293294

294295
final AtomicLong estimatedPageSizeInBytes = new AtomicLong(0L);
296+
final AtomicBoolean finished = new AtomicBoolean(false);
295297

296298
TransportRemoteSink(
297299
TransportService transportService,
@@ -311,7 +313,33 @@ static final class TransportRemoteSink implements RemoteSink {
311313

312314
@Override
313315
public void fetchPageAsync(boolean allSourcesFinished, ActionListener<ExchangeResponse> listener) {
314-
final long reservedBytes = estimatedPageSizeInBytes.get();
316+
if (allSourcesFinished) {
317+
if (finished.compareAndSet(false, true)) {
318+
doFetchPageAsync(true, listener);
319+
} else {
320+
// already finished or promised
321+
listener.onResponse(new ExchangeResponse(blockFactory, null, true));
322+
}
323+
} else {
324+
// already finished
325+
if (finished.get()) {
326+
listener.onResponse(new ExchangeResponse(blockFactory, null, true));
327+
return;
328+
}
329+
doFetchPageAsync(false, ActionListener.wrap(r -> {
330+
if (r.finished()) {
331+
finished.set(true);
332+
}
333+
listener.onResponse(r);
334+
}, e -> {
335+
finished.set(true);
336+
listener.onFailure(e);
337+
}));
338+
}
339+
}
340+
341+
private void doFetchPageAsync(boolean allSourcesFinished, ActionListener<ExchangeResponse> listener) {
342+
final long reservedBytes = allSourcesFinished ? 0 : estimatedPageSizeInBytes.get();
315343
if (reservedBytes > 0) {
316344
// This doesn't fully protect ESQL from OOM, but reduces the likelihood.
317345
blockFactory.breaker().addEstimateBytesAndMaybeBreak(reservedBytes, "fetch page");

x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/exchange/ExchangeServiceTests.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -374,6 +374,15 @@ public void testConcurrentWithTransportActions() {
374374
ExchangeService exchange1 = new ExchangeService(Settings.EMPTY, threadPool, ESQL_TEST_EXECUTOR, blockFactory());
375375
exchange1.registerTransportHandler(node1);
376376
AbstractSimpleTransportTestCase.connectToNode(node0, node1.getLocalNode());
377+
Set<String> finishingRequests = ConcurrentCollections.newConcurrentSet();
378+
node1.addRequestHandlingBehavior(ExchangeService.EXCHANGE_ACTION_NAME, (handler, request, channel, task) -> {
379+
final ExchangeRequest exchangeRequest = (ExchangeRequest) request;
380+
if (exchangeRequest.sourcesFinished()) {
381+
String exchangeId = exchangeRequest.exchangeId();
382+
assertTrue("tried to finish [" + exchangeId + "] twice", finishingRequests.add(exchangeId));
383+
}
384+
handler.messageReceived(request, channel, task);
385+
});
377386

378387
try (exchange0; exchange1; node0; node1) {
379388
String exchangeId = "exchange";

0 commit comments

Comments
 (0)