Skip to content

Commit 4c937cb

Browse files
authored
Try to finish remote sink once (#117592) (#117666)
Currently, we have three clients fetching pages by default, each with its own lifecycle. This can result in scenarios where more than one request is sent to complete the remote sink. While this does not cause correctness issues, it is inefficient, especially for cross-cluster requests. This change tracks the status of the remote sink and tries to send only one finish request per remote sink.
1 parent 0bd1219 commit 4c937cb

File tree

2 files changed

+37
-0
lines changed

2 files changed

+37
-0
lines changed

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/exchange/ExchangeService.java

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@
4242
import java.util.Map;
4343
import java.util.Set;
4444
import java.util.concurrent.Executor;
45+
import java.util.concurrent.atomic.AtomicBoolean;
4546
import java.util.concurrent.atomic.AtomicLong;
4647

4748
/**
@@ -292,6 +293,7 @@ static final class TransportRemoteSink implements RemoteSink {
292293
final Executor responseExecutor;
293294

294295
final AtomicLong estimatedPageSizeInBytes = new AtomicLong(0L);
296+
final AtomicBoolean finished = new AtomicBoolean(false);
295297

296298
TransportRemoteSink(
297299
TransportService transportService,
@@ -311,6 +313,32 @@ static final class TransportRemoteSink implements RemoteSink {
311313

312314
@Override
313315
public void fetchPageAsync(boolean allSourcesFinished, ActionListener<ExchangeResponse> listener) {
316+
if (allSourcesFinished) {
317+
if (finished.compareAndSet(false, true)) {
318+
doFetchPageAsync(true, listener);
319+
} else {
320+
// already finished or promised
321+
listener.onResponse(new ExchangeResponse(blockFactory, null, true));
322+
}
323+
} else {
324+
// already finished
325+
if (finished.get()) {
326+
listener.onResponse(new ExchangeResponse(blockFactory, null, true));
327+
return;
328+
}
329+
doFetchPageAsync(false, ActionListener.wrap(r -> {
330+
if (r.finished()) {
331+
finished.set(true);
332+
}
333+
listener.onResponse(r);
334+
}, e -> {
335+
finished.set(true);
336+
listener.onFailure(e);
337+
}));
338+
}
339+
}
340+
341+
private void doFetchPageAsync(boolean allSourcesFinished, ActionListener<ExchangeResponse> listener) {
314342
final long reservedBytes = allSourcesFinished ? 0 : estimatedPageSizeInBytes.get();
315343
if (reservedBytes > 0) {
316344
// This doesn't fully protect ESQL from OOM, but reduces the likelihood.

x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/exchange/ExchangeServiceTests.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -449,6 +449,15 @@ public void testConcurrentWithTransportActions() {
449449
ExchangeService exchange1 = new ExchangeService(Settings.EMPTY, threadPool, ESQL_TEST_EXECUTOR, blockFactory());
450450
exchange1.registerTransportHandler(node1);
451451
AbstractSimpleTransportTestCase.connectToNode(node0, node1.getLocalNode());
452+
Set<String> finishingRequests = ConcurrentCollections.newConcurrentSet();
453+
node1.addRequestHandlingBehavior(ExchangeService.EXCHANGE_ACTION_NAME, (handler, request, channel, task) -> {
454+
final ExchangeRequest exchangeRequest = (ExchangeRequest) request;
455+
if (exchangeRequest.sourcesFinished()) {
456+
String exchangeId = exchangeRequest.exchangeId();
457+
assertTrue("tried to finish [" + exchangeId + "] twice", finishingRequests.add(exchangeId));
458+
}
459+
handler.messageReceived(request, channel, task);
460+
});
452461

453462
try (exchange0; exchange1; node0; node1) {
454463
String exchangeId = "exchange";

0 commit comments

Comments
 (0)