Skip to content

Commit 6c09cb4

Browse files
committed
Add handling remote sink failures
1 parent dc1e4fa commit 6c09cb4

File tree

4 files changed

+35
-9
lines changed

4 files changed

+35
-9
lines changed

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/exchange/ExchangeService.java

Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@
4242
import java.util.Map;
4343
import java.util.concurrent.Executor;
4444
import java.util.concurrent.atomic.AtomicLong;
45+
import java.util.function.Consumer;
4546

4647
/**
4748
* {@link ExchangeService} is responsible for exchanging pages between exchange sinks and sources on the same or different nodes.
@@ -277,9 +278,16 @@ protected void doRun() {
277278
* @param exchangeId the exchange ID
278279
* @param transportService the transport service
279280
* @param conn the connection to the remote node where the remote exchange sink is located
281+
* @param failureCollector if not null, the failures will be sent to this consumer and the sink will be marked as finished
280282
*/
281-
public RemoteSink newRemoteSink(Task parentTask, String exchangeId, TransportService transportService, Transport.Connection conn) {
282-
return new TransportRemoteSink(transportService, blockFactory, conn, parentTask, exchangeId, executor);
283+
public RemoteSink newRemoteSink(
284+
Task parentTask,
285+
String exchangeId,
286+
TransportService transportService,
287+
Transport.Connection conn,
288+
Consumer<Exception> failureCollector
289+
) {
290+
return new TransportRemoteSink(transportService, blockFactory, conn, parentTask, exchangeId, executor, failureCollector);
283291
}
284292

285293
static final class TransportRemoteSink implements RemoteSink {
@@ -289,6 +297,7 @@ static final class TransportRemoteSink implements RemoteSink {
289297
final Task parentTask;
290298
final String exchangeId;
291299
final Executor responseExecutor;
300+
final Consumer<Exception> failureCollector;
292301

293302
final AtomicLong estimatedPageSizeInBytes = new AtomicLong(0L);
294303

@@ -298,14 +307,16 @@ static final class TransportRemoteSink implements RemoteSink {
298307
Transport.Connection connection,
299308
Task parentTask,
300309
String exchangeId,
301-
Executor responseExecutor
310+
Executor responseExecutor,
311+
Consumer<Exception> failureCollector
302312
) {
303313
this.transportService = transportService;
304314
this.blockFactory = blockFactory;
305315
this.connection = connection;
306316
this.parentTask = parentTask;
307317
this.exchangeId = exchangeId;
308318
this.responseExecutor = responseExecutor;
319+
this.failureCollector = failureCollector;
309320
}
310321

311322
@Override
@@ -316,6 +327,12 @@ public void fetchPageAsync(boolean allSourcesFinished, ActionListener<ExchangeRe
316327
blockFactory.breaker().addEstimateBytesAndMaybeBreak(reservedBytes, "fetch page");
317328
listener = ActionListener.runAfter(listener, () -> blockFactory.breaker().addWithoutBreaking(-reservedBytes));
318329
}
330+
if (failureCollector != null) {
331+
listener = listener.delegateResponse((l, ex) -> {
332+
failureCollector.accept(ex);
333+
l.onResponse(new ExchangeResponse(blockFactory, null, true));
334+
});
335+
}
319336
transportService.sendChildRequest(
320337
connection,
321338
EXCHANGE_ACTION_NAME,

x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/exchange/ExchangeServiceTests.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -383,7 +383,7 @@ public void testConcurrentWithTransportActions() {
383383
sourceHandler.addCompletionListener(sourceCompletionFuture);
384384
ExchangeSinkHandler sinkHandler = exchange1.createSinkHandler(exchangeId, randomExchangeBuffer());
385385
Transport.Connection connection = node0.getConnection(node1.getLocalNode());
386-
sourceHandler.addRemoteSink(exchange0.newRemoteSink(task, exchangeId, node0, connection), randomIntBetween(1, 5));
386+
sourceHandler.addRemoteSink(exchange0.newRemoteSink(task, exchangeId, node0, connection, null), randomIntBetween(1, 5));
387387
final int maxInputSeqNo = rarely() ? -1 : randomIntBetween(0, 50_000);
388388
final int maxOutputSeqNo = rarely() ? -1 : randomIntBetween(0, 50_000);
389389
runConcurrentTest(maxInputSeqNo, maxOutputSeqNo, sourceHandler::createExchangeSource, sinkHandler::createExchangeSink);
@@ -442,7 +442,7 @@ public void sendResponse(TransportResponse transportResponse) {
442442
sourceHandler.addCompletionListener(sourceCompletionFuture);
443443
ExchangeSinkHandler sinkHandler = exchange1.createSinkHandler(exchangeId, randomIntBetween(1, 128));
444444
Transport.Connection connection = node0.getConnection(node1.getLocalNode());
445-
sourceHandler.addRemoteSink(exchange0.newRemoteSink(task, exchangeId, node0, connection), randomIntBetween(1, 5));
445+
sourceHandler.addRemoteSink(exchange0.newRemoteSink(task, exchangeId, node0, connection, null), randomIntBetween(1, 5));
446446
Exception err = expectThrows(
447447
Exception.class,
448448
() -> runConcurrentTest(maxSeqNo, maxSeqNo, sourceHandler::createExchangeSource, sinkHandler::createExchangeSink)

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeListener.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -202,6 +202,9 @@ void markAsPartial(String computeClusterAlias, Exception e) {
202202
esqlExecutionInfo.swapCluster(computeClusterAlias, (k, v) -> {
203203
assert v.getStatus() != EsqlExecutionInfo.Cluster.Status.SKIPPED
204204
: "We shouldn't be running compute on a cluster that's already marked as skipped";
205+
if (v.getStatus() == EsqlExecutionInfo.Cluster.Status.PARTIAL) {
206+
return v;
207+
}
205208
return new EsqlExecutionInfo.Cluster.Builder(v).setStatus(EsqlExecutionInfo.Cluster.Status.PARTIAL)
206209
.setFailures(List.of(new ShardSearchFailure(e)))
207210
.build();

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -335,7 +335,7 @@ private void startComputeOnDataNodes(
335335
queryPragmas.exchangeBufferSize(),
336336
esqlExecutor,
337337
refs.acquire().delegateFailureAndWrap((l, unused) -> {
338-
var remoteSink = exchangeService.newRemoteSink(parentTask, sessionId, transportService, node.connection);
338+
var remoteSink = exchangeService.newRemoteSink(parentTask, sessionId, transportService, node.connection, null);
339339
exchangeSource.addRemoteSink(remoteSink, queryPragmas.concurrentExchangeClients());
340340
ActionListener<ComputeResponse> computeResponseListener = computeListener.acquireCompute(clusterAlias);
341341
var dataNodeListener = ActionListener.runBefore(computeResponseListener, () -> l.onResponse(null));
@@ -377,7 +377,7 @@ private void startComputeOnRemoteClusters(
377377
try (RefCountingListener refs = new RefCountingListener(linkExchangeListeners)) {
378378
for (RemoteCluster cluster : clusters) {
379379
final String clusterAlias = cluster.clusterAlias();
380-
final boolean shouldSkipOnFailure = computeListener.shouldIgnoreRemoteErrors(clusterAlias);
380+
final boolean suppressRemoteFailure = computeListener.shouldIgnoreRemoteErrors(clusterAlias);
381381
final var exchangeListener = refs.acquire();
382382
ExchangeService.openExchange(
383383
transportService,
@@ -386,7 +386,13 @@ private void startComputeOnRemoteClusters(
386386
queryPragmas.exchangeBufferSize(),
387387
esqlExecutor,
388388
ActionListener.wrap(unused -> {
389-
var remoteSink = exchangeService.newRemoteSink(rootTask, sessionId, transportService, cluster.connection);
389+
var remoteSink = exchangeService.newRemoteSink(
390+
rootTask,
391+
sessionId,
392+
transportService,
393+
cluster.connection,
394+
suppressRemoteFailure ? ex -> computeListener.markAsPartial(clusterAlias, ex) : null
395+
);
390396
exchangeSource.addRemoteSink(remoteSink, queryPragmas.concurrentExchangeClients());
391397
var remotePlan = new RemoteClusterPlan(plan, cluster.concreteIndices, cluster.originalIndices);
392398
var clusterRequest = new ClusterComputeRequest(clusterAlias, sessionId, configuration, remotePlan);
@@ -403,7 +409,7 @@ private void startComputeOnRemoteClusters(
403409
new ActionListenerResponseHandler<>(clusterListener, ComputeResponse::new, esqlExecutor)
404410
);
405411
}, e -> {
406-
if (shouldSkipOnFailure) {
412+
if (suppressRemoteFailure) {
407413
// TODO: drop this in final patch
408414
LOGGER.error("Marking failed cluster {} as partial: {}", clusterAlias, e);
409415
computeListener.markAsPartial(clusterAlias, e);

0 commit comments

Comments
 (0)