Skip to content

Commit 08c2df1

Browse files
committed
Retry on shard failure
1 parent 4ecb91f commit 08c2df1

File tree

20 files changed

+922
-207
lines changed

20 files changed

+922
-207
lines changed

server/src/main/java/org/elasticsearch/TransportVersions.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -167,7 +167,6 @@ static TransportVersion def(int id) {
167167
public static final TransportVersion RANK_DOC_OPTIONAL_METADATA_FOR_EXPLAIN = def(8_833_00_0);
168168
public static final TransportVersion ESQL_RETRY_ON_SHARD_LEVEL_FAILURE = def(8_834_00_0);
169169

170-
171170
/*
172171
* STOP! READ THIS FIRST! No, really,
173172
* ____ _____ ___ ____ _ ____ _____ _ ____ _____ _ _ ___ ____ _____ ___ ____ ____ _____ _

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/FailureCollector.java

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -45,19 +45,23 @@ public FailureCollector(int maxExceptions) {
4545
this.nonCancelledExceptionsPermits = new Semaphore(maxExceptions);
4646
}
4747

48-
private static Exception unwrapTransportException(TransportException te) {
49-
final Throwable cause = te.getCause();
50-
if (cause == null) {
51-
return te;
52-
} else if (cause instanceof Exception ex) {
53-
return ex;
48+
public static Exception unwrapTransportException(Exception e) {
49+
if (e instanceof TransportException te) {
50+
final Throwable cause = te.getCause();
51+
if (cause == null) {
52+
return te;
53+
} else if (cause instanceof Exception ex) {
54+
return ex;
55+
} else {
56+
return new ElasticsearchException(cause);
57+
}
5458
} else {
55-
return new ElasticsearchException(cause);
59+
return e;
5660
}
5761
}
5862

5963
public void unwrapAndCollect(Exception e) {
60-
e = e instanceof TransportException te ? unwrapTransportException(te) : e;
64+
e = unwrapTransportException(e);
6165
if (ExceptionsHelper.unwrap(e, TaskCancelledException.class) != null) {
6266
if (nonCancelledExceptions.isEmpty() && cancelledExceptionsPermits.tryAcquire()) {
6367
cancelledExceptions.add(e);

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/exchange/ExchangeResponse.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,10 @@ public Page takePage() {
6565
return page;
6666
}
6767

68+
public boolean hasData() {
69+
return page != null;
70+
}
71+
6872
public long ramBytesUsedByPage() {
6973
if (page != null) {
7074
return page.ramBytesUsedByBlocks();

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/exchange/ExchangeService.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,8 @@
5151
/**
5252
* {@link ExchangeService} is responsible for exchanging pages between exchange sinks and sources on the same or different nodes.
5353
* It holds a map of {@link ExchangeSinkHandler} instances for each node in the cluster to serve {@link ExchangeRequest}s
54-
* To connect exchange sources to exchange sinks, use {@link ExchangeSourceHandler#addRemoteSink(RemoteSink, boolean, int, ActionListener)}.
54+
* To connect exchange sources to exchange sinks,
55+
* use {@link ExchangeSourceHandler#addRemoteSink(RemoteSink, boolean, Runnable, int, ActionListener)}.
5556
*/
5657
public final class ExchangeService extends AbstractLifecycleComponent {
5758
// TODO: Make this a child action of the data node transport to ensure that exchanges

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/exchange/ExchangeSinkHandler.java

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -22,10 +22,10 @@
2222

2323
/**
2424
* An {@link ExchangeSinkHandler} receives pages and status from its {@link ExchangeSink}s, which are created using
25-
* {@link #createExchangeSink()}} method. Pages and status can then be retrieved asynchronously by {@link ExchangeSourceHandler}s
25+
* {@link #createExchangeSink(Runnable)}} method. Pages and status can then be retrieved asynchronously by {@link ExchangeSourceHandler}s
2626
* using the {@link #fetchPageAsync(boolean, ActionListener)} method.
2727
*
28-
* @see #createExchangeSink()
28+
* @see #createExchangeSink(Runnable)
2929
* @see #fetchPageAsync(boolean, ActionListener)
3030
* @see ExchangeSourceHandler
3131
*/
@@ -53,15 +53,18 @@ public ExchangeSinkHandler(BlockFactory blockFactory, int maxBufferSize, LongSup
5353
private class ExchangeSinkImpl implements ExchangeSink {
5454
boolean finished;
5555
private final SubscribableListener<Void> onFinished = new SubscribableListener<>();
56+
private final Runnable onPageAdded;
5657

57-
ExchangeSinkImpl() {
58+
ExchangeSinkImpl(Runnable onPageAdded) {
5859
onChanged();
60+
this.onPageAdded = onPageAdded;
5961
buffer.addCompletionListener(onFinished);
6062
outstandingSinks.incrementAndGet();
6163
}
6264

6365
@Override
6466
public void addPage(Page page) {
67+
onPageAdded.run();
6568
buffer.addPage(page);
6669
notifyListeners();
6770
}
@@ -101,7 +104,7 @@ public IsBlockedResult waitForWriting() {
101104
* @param sourceFinished if true, then this handler can finish as sources have enough pages.
102105
* @param listener the listener that will be notified when pages are ready or this handler is finished
103106
* @see RemoteSink
104-
* @see ExchangeSourceHandler#addRemoteSink(RemoteSink, boolean, int, ActionListener)
107+
* @see ExchangeSourceHandler#addRemoteSink(RemoteSink, boolean, Runnable, int, ActionListener)
105108
*/
106109
public void fetchPageAsync(boolean sourceFinished, ActionListener<ExchangeResponse> listener) {
107110
if (sourceFinished) {
@@ -161,10 +164,11 @@ private void notifyListeners() {
161164
/**
162165
* Create a new exchange sink for exchanging data
163166
*
167+
* @param onPageAdded a {@link Runnable} that called when a new page is added
164168
* @see ExchangeSinkOperator
165169
*/
166-
public ExchangeSink createExchangeSink() {
167-
return new ExchangeSinkImpl();
170+
public ExchangeSink createExchangeSink(Runnable onPageAdded) {
171+
return new ExchangeSinkImpl(onPageAdded);
168172
}
169173

170174
/**

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/exchange/ExchangeSourceHandler.java

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -27,10 +27,10 @@
2727
/**
2828
* An {@link ExchangeSourceHandler} asynchronously fetches pages and status from multiple {@link RemoteSink}s
2929
* and feeds them to its {@link ExchangeSource}, which are created using the {@link #createExchangeSource()}) method.
30-
* {@link RemoteSink}s are added using the {@link #addRemoteSink(RemoteSink, boolean, int, ActionListener)}) method.
30+
* {@link RemoteSink}s are added using the {@link #addRemoteSink(RemoteSink, boolean, Runnable, int, ActionListener)}) method.
3131
*
3232
* @see #createExchangeSource()
33-
* @see #addRemoteSink(RemoteSink, boolean, int, ActionListener)
33+
* @see #addRemoteSink(RemoteSink, boolean, Runnable, int, ActionListener)
3434
*/
3535
public final class ExchangeSourceHandler {
3636
private final ExchangeBuffer buffer;
@@ -186,11 +186,13 @@ private final class RemoteSinkFetcher {
186186
private final RemoteSink remoteSink;
187187
private final boolean failFast;
188188
private final ActionListener<Void> completionListener;
189+
private final Runnable onPageFetched;
189190

190-
RemoteSinkFetcher(RemoteSink remoteSink, boolean failFast, ActionListener<Void> completionListener) {
191+
RemoteSinkFetcher(RemoteSink remoteSink, boolean failFast, Runnable onPageFetched, ActionListener<Void> completionListener) {
191192
outstandingSinks.trackNewInstance();
192193
this.remoteSink = remoteSink;
193194
this.failFast = failFast;
195+
this.onPageFetched = onPageFetched;
194196
this.completionListener = completionListener;
195197
}
196198

@@ -203,6 +205,7 @@ void fetchPage() {
203205
remoteSink.fetchPageAsync(toFinishSinks, ActionListener.wrap(resp -> {
204206
Page page = resp.takePage();
205207
if (page != null) {
208+
onPageFetched.run();
206209
buffer.addPage(page);
207210
}
208211
if (resp.finished()) {
@@ -259,12 +262,19 @@ void onSinkComplete() {
259262
* - If {@code true}, failures from this remote sink will cause the exchange source to abort.
260263
* Callers can safely ignore failures notified via this listener, as they are collected and
261264
* reported by the exchange source.
265+
* @param onPageFetched a callback that is called when a new page is fetched from the remote sink
262266
* @param instances the number of concurrent ``clients`` that this handler should use to fetch pages.
263267
* More clients reduce latency, but add overhead.
264268
* @param listener a listener that will be notified when the sink fails or completes
265269
* @see ExchangeSinkHandler#fetchPageAsync(boolean, ActionListener)
266270
*/
267-
public void addRemoteSink(RemoteSink remoteSink, boolean failFast, int instances, ActionListener<Void> listener) {
271+
public void addRemoteSink(
272+
RemoteSink remoteSink,
273+
boolean failFast,
274+
Runnable onPageFetched,
275+
int instances,
276+
ActionListener<Void> listener
277+
) {
268278
final int sinkId = nextSinkId.incrementAndGet();
269279
remoteSinks.put(sinkId, remoteSink);
270280
final ActionListener<Void> sinkListener = ActionListener.assertAtLeastOnce(
@@ -284,7 +294,7 @@ public void onFailure(Exception e) {
284294
protected void doRun() {
285295
try (EsqlRefCountingListener refs = new EsqlRefCountingListener(sinkListener)) {
286296
for (int i = 0; i < instances; i++) {
287-
var fetcher = new RemoteSinkFetcher(remoteSink, failFast, refs.acquire());
297+
var fetcher = new RemoteSinkFetcher(remoteSink, failFast, onPageFetched, refs.acquire());
288298
fetcher.fetchPage();
289299
}
290300
}

x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/DriverTests.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -297,7 +297,7 @@ public void testEarlyTermination() {
297297
final int maxAllowedRows = between(1, 100);
298298
final AtomicInteger processedRows = new AtomicInteger(0);
299299
var sinkHandler = new ExchangeSinkHandler(driverContext.blockFactory(), positions, System::currentTimeMillis);
300-
var sinkOperator = new ExchangeSinkOperator(sinkHandler.createExchangeSink(), Function.identity());
300+
var sinkOperator = new ExchangeSinkOperator(sinkHandler.createExchangeSink(() -> {}), Function.identity());
301301
final var delayOperator = new EvalOperator(driverContext.blockFactory(), new EvalOperator.ExpressionEvaluator() {
302302
@Override
303303
public Block eval(Page page) {
@@ -335,7 +335,7 @@ public void testResumeOnEarlyFinish() throws Exception {
335335
var sourceHandler = new ExchangeSourceHandler(between(1, 5), threadPool.executor("esql"), sourceFuture);
336336
var sinkHandler = new ExchangeSinkHandler(driverContext.blockFactory(), between(1, 5), System::currentTimeMillis);
337337
var sourceOperator = new ExchangeSourceOperator(sourceHandler.createExchangeSource());
338-
var sinkOperator = new ExchangeSinkOperator(sinkHandler.createExchangeSink(), Function.identity());
338+
var sinkOperator = new ExchangeSinkOperator(sinkHandler.createExchangeSink(() -> {}), Function.identity());
339339
Driver driver = new Driver(driverContext, sourceOperator, List.of(), sinkOperator, () -> {});
340340
PlainActionFuture<Void> future = new PlainActionFuture<>();
341341
Driver.start(threadPool.getThreadContext(), threadPool.executor("esql"), driver, between(1, 1000), future);

x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/ForkingOperatorTestCase.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -220,6 +220,7 @@ List<Driver> createDriversForInput(List<Page> input, List<Page> results, boolean
220220
sourceExchanger.addRemoteSink(
221221
sinkExchanger::fetchPageAsync,
222222
randomBoolean(),
223+
() -> {},
223224
1,
224225
ActionListener.<Void>noop().delegateResponse((l, e) -> {
225226
throw new AssertionError("unexpected failure", e);
@@ -248,7 +249,7 @@ List<Driver> createDriversForInput(List<Page> input, List<Page> results, boolean
248249
simpleWithMode(AggregatorMode.INTERMEDIATE).get(driver1Context),
249250
intermediateOperatorItr.next()
250251
),
251-
new ExchangeSinkOperator(sinkExchanger.createExchangeSink(), Function.identity()),
252+
new ExchangeSinkOperator(sinkExchanger.createExchangeSink(() -> {}), Function.identity()),
252253
() -> {}
253254
)
254255
);

x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/exchange/ExchangeServiceTests.java

Lines changed: 24 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -97,12 +97,12 @@ public void testBasic() throws Exception {
9797
pages[i] = new Page(blockFactory.newConstantIntBlockWith(i, 2));
9898
}
9999
ExchangeSinkHandler sinkExchanger = new ExchangeSinkHandler(blockFactory, 2, threadPool.relativeTimeInMillisSupplier());
100-
ExchangeSink sink1 = sinkExchanger.createExchangeSink();
101-
ExchangeSink sink2 = sinkExchanger.createExchangeSink();
100+
ExchangeSink sink1 = sinkExchanger.createExchangeSink(() -> {});
101+
ExchangeSink sink2 = sinkExchanger.createExchangeSink(() -> {});
102102
PlainActionFuture<Void> sourceCompletion = new PlainActionFuture<>();
103103
ExchangeSourceHandler sourceExchanger = new ExchangeSourceHandler(3, threadPool.executor(ESQL_TEST_EXECUTOR), sourceCompletion);
104104
ExchangeSource source = sourceExchanger.createExchangeSource();
105-
sourceExchanger.addRemoteSink(sinkExchanger::fetchPageAsync, randomBoolean(), 1, ActionListener.noop());
105+
sourceExchanger.addRemoteSink(sinkExchanger::fetchPageAsync, randomBoolean(), () -> {}, 1, ActionListener.noop());
106106
SubscribableListener<Void> waitForReading = source.waitForReading().listener();
107107
assertFalse(waitForReading.isDone());
108108
assertNull(source.pollPage());
@@ -340,10 +340,16 @@ public void testConcurrentWithHandlers() {
340340
sinkHandler = randomFrom(sinkHandlers);
341341
} else {
342342
sinkHandler = new ExchangeSinkHandler(blockFactory, randomExchangeBuffer(), threadPool.relativeTimeInMillisSupplier());
343-
sourceExchanger.addRemoteSink(sinkHandler::fetchPageAsync, randomBoolean(), randomIntBetween(1, 3), ActionListener.noop());
343+
sourceExchanger.addRemoteSink(
344+
sinkHandler::fetchPageAsync,
345+
randomBoolean(),
346+
() -> {},
347+
randomIntBetween(1, 3),
348+
ActionListener.noop()
349+
);
344350
sinkHandlers.add(sinkHandler);
345351
}
346-
return sinkHandler.createExchangeSink();
352+
return sinkHandler.createExchangeSink(() -> {});
347353
};
348354
final int maxInputSeqNo = rarely() ? -1 : randomIntBetween(0, 50_000);
349355
final int maxOutputSeqNo = rarely() ? -1 : randomIntBetween(0, 50_000);
@@ -398,14 +404,14 @@ public void testExchangeSourceContinueOnFailure() {
398404
l.onResponse(new ExchangeResponse(blockFactory, page, r.finished()));
399405
}));
400406
}
401-
}, false, instance, ActionListener.wrap(r -> {
407+
}, false, () -> {}, instance, ActionListener.wrap(r -> {
402408
assertFalse(sinkFailed.get());
403409
completedSinks.incrementAndGet();
404410
}, e -> {
405411
assertTrue(sinkFailed.get());
406412
failedSinks.incrementAndGet();
407413
}));
408-
return sinkHandler.createExchangeSink();
414+
return sinkHandler.createExchangeSink(() -> {});
409415
};
410416
Set<Integer> actualSeqNos = runConcurrentTest(
411417
maxInputSeqNo,
@@ -430,7 +436,7 @@ public void testClosingSinks() {
430436
Page p1 = new Page(block1);
431437
Page p2 = new Page(block2);
432438
ExchangeSinkHandler sinkExchanger = new ExchangeSinkHandler(blockFactory, 2, threadPool.relativeTimeInMillisSupplier());
433-
ExchangeSink sink = sinkExchanger.createExchangeSink();
439+
ExchangeSink sink = sinkExchanger.createExchangeSink(() -> {});
434440
sink.addPage(p1);
435441
sink.addPage(p2);
436442
assertFalse(sink.waitForWriting().listener().isDone());
@@ -475,7 +481,7 @@ public void testFinishEarly() throws Exception {
475481
throw new AssertionError(e);
476482
}
477483
}
478-
}, false, between(1, 3), sinkCompleted);
484+
}, false, () -> {}, between(1, 3), sinkCompleted);
479485
threadPool.schedule(
480486
() -> sourceHandler.finishEarly(randomBoolean(), ActionListener.noop()),
481487
TimeValue.timeValueMillis(between(0, 10)),
@@ -526,6 +532,7 @@ public void testConcurrentWithTransportActions() {
526532
sourceHandler.addRemoteSink(
527533
exchange0.newRemoteSink(task, exchangeId, node0, connection),
528534
randomBoolean(),
535+
() -> {},
529536
randomIntBetween(1, 5),
530537
ActionListener.noop()
531538
);
@@ -535,7 +542,7 @@ public void testConcurrentWithTransportActions() {
535542
maxInputSeqNo,
536543
maxOutputSeqNo,
537544
sourceHandler::createExchangeSource,
538-
sinkHandler::createExchangeSink
545+
() -> sinkHandler.createExchangeSink(() -> {})
539546
);
540547
var expectedSeqNos = IntStream.range(0, Math.min(maxInputSeqNo, maxOutputSeqNo)).boxed().collect(Collectors.toSet());
541548
assertThat(actualSeqNos, hasSize(expectedSeqNos.size()));
@@ -601,12 +608,18 @@ public void sendResponse(TransportResponse transportResponse) {
601608
sourceHandler.addRemoteSink(
602609
exchange0.newRemoteSink(task, exchangeId, node0, connection),
603610
true,
611+
() -> {},
604612
randomIntBetween(1, 5),
605613
ActionListener.noop()
606614
);
607615
Exception err = expectThrows(
608616
Exception.class,
609-
() -> runConcurrentTest(maxSeqNo, maxSeqNo, sourceHandler::createExchangeSource, sinkHandler::createExchangeSink)
617+
() -> runConcurrentTest(
618+
maxSeqNo,
619+
maxSeqNo,
620+
sourceHandler::createExchangeSource,
621+
() -> sinkHandler.createExchangeSink(() -> {})
622+
)
610623
);
611624
Throwable cause = ExceptionsHelper.unwrap(err, IOException.class);
612625
assertNotNull(cause);

0 commit comments

Comments
 (0)