Skip to content

Commit a3d93b1

Browse files
committed
Retry on shard failure
1 parent 4ecb91f commit a3d93b1

File tree

19 files changed

+909
-195
lines changed

19 files changed

+909
-195
lines changed

server/src/main/java/org/elasticsearch/TransportVersions.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -167,7 +167,6 @@ static TransportVersion def(int id) {
167167
public static final TransportVersion RANK_DOC_OPTIONAL_METADATA_FOR_EXPLAIN = def(8_833_00_0);
168168
public static final TransportVersion ESQL_RETRY_ON_SHARD_LEVEL_FAILURE = def(8_834_00_0);
169169

170-
171170
/*
172171
* STOP! READ THIS FIRST! No, really,
173172
* ____ _____ ___ ____ _ ____ _____ _ ____ _____ _ _ ___ ____ _____ ___ ____ ____ _____ _

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/FailureCollector.java

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -45,19 +45,23 @@ public FailureCollector(int maxExceptions) {
4545
this.nonCancelledExceptionsPermits = new Semaphore(maxExceptions);
4646
}
4747

48-
private static Exception unwrapTransportException(TransportException te) {
49-
final Throwable cause = te.getCause();
50-
if (cause == null) {
51-
return te;
52-
} else if (cause instanceof Exception ex) {
53-
return ex;
48+
public static Exception unwrapTransportException(Exception e) {
49+
if (e instanceof TransportException te) {
50+
final Throwable cause = te.getCause();
51+
if (cause == null) {
52+
return te;
53+
} else if (cause instanceof Exception ex) {
54+
return ex;
55+
} else {
56+
return new ElasticsearchException(cause);
57+
}
5458
} else {
55-
return new ElasticsearchException(cause);
59+
return e;
5660
}
5761
}
5862

5963
public void unwrapAndCollect(Exception e) {
60-
e = e instanceof TransportException te ? unwrapTransportException(te) : e;
64+
e = unwrapTransportException(e);
6165
if (ExceptionsHelper.unwrap(e, TaskCancelledException.class) != null) {
6266
if (nonCancelledExceptions.isEmpty() && cancelledExceptionsPermits.tryAcquire()) {
6367
cancelledExceptions.add(e);

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/exchange/ExchangeResponse.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,10 @@ public Page takePage() {
6565
return page;
6666
}
6767

68+
public boolean hasData() {
69+
return page != null;
70+
}
71+
6872
public long ramBytesUsedByPage() {
6973
if (page != null) {
7074
return page.ramBytesUsedByBlocks();

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/exchange/ExchangeSinkHandler.java

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -22,10 +22,10 @@
2222

2323
/**
2424
* An {@link ExchangeSinkHandler} receives pages and status from its {@link ExchangeSink}s, which are created using
25-
* {@link #createExchangeSink()}} method. Pages and status can then be retrieved asynchronously by {@link ExchangeSourceHandler}s
25+
* {@link #createExchangeSink(Runnable)}} method. Pages and status can then be retrieved asynchronously by {@link ExchangeSourceHandler}s
2626
* using the {@link #fetchPageAsync(boolean, ActionListener)} method.
2727
*
28-
* @see #createExchangeSink()
28+
* @see #createExchangeSink(Runnable)
2929
* @see #fetchPageAsync(boolean, ActionListener)
3030
* @see ExchangeSourceHandler
3131
*/
@@ -53,15 +53,18 @@ public ExchangeSinkHandler(BlockFactory blockFactory, int maxBufferSize, LongSup
5353
private class ExchangeSinkImpl implements ExchangeSink {
5454
boolean finished;
5555
private final SubscribableListener<Void> onFinished = new SubscribableListener<>();
56+
private final Runnable onPageAdded;
5657

57-
ExchangeSinkImpl() {
58+
ExchangeSinkImpl(Runnable onPageAdded) {
5859
onChanged();
60+
this.onPageAdded = onPageAdded;
5961
buffer.addCompletionListener(onFinished);
6062
outstandingSinks.incrementAndGet();
6163
}
6264

6365
@Override
6466
public void addPage(Page page) {
67+
onPageAdded.run();
6568
buffer.addPage(page);
6669
notifyListeners();
6770
}
@@ -161,10 +164,11 @@ private void notifyListeners() {
161164
/**
162165
* Create a new exchange sink for exchanging data
163166
*
167+
* @param onPageAdded a {@link Runnable} that called when a new page is added
164168
* @see ExchangeSinkOperator
165169
*/
166-
public ExchangeSink createExchangeSink() {
167-
return new ExchangeSinkImpl();
170+
public ExchangeSink createExchangeSink(Runnable onPageAdded) {
171+
return new ExchangeSinkImpl(onPageAdded);
168172
}
169173

170174
/**

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/exchange/RemoteSink.java

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,4 +23,25 @@ default void close(ActionListener<Void> listener) {
2323
l.onResponse(null);
2424
}));
2525
}
26+
27+
/**
28+
* Wraps the given {@link RemoteSink} with a {@link Runnable} that is called when a new page is fetched.
29+
*/
30+
record TrackingRemoteSink(RemoteSink delegate, Runnable onPageFetched) implements RemoteSink {
31+
@Override
32+
public void fetchPageAsync(boolean allSourcesFinished, ActionListener<ExchangeResponse> listener) {
33+
listener = listener.delegateFailureAndWrap((l, r) -> {
34+
if (r.hasData()) {
35+
onPageFetched.run();
36+
}
37+
l.onResponse(r);
38+
});
39+
delegate.fetchPageAsync(allSourcesFinished, listener);
40+
}
41+
42+
@Override
43+
public void close(ActionListener<Void> listener) {
44+
delegate.close(listener);
45+
}
46+
}
2647
}

x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/DriverTests.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -297,7 +297,7 @@ public void testEarlyTermination() {
297297
final int maxAllowedRows = between(1, 100);
298298
final AtomicInteger processedRows = new AtomicInteger(0);
299299
var sinkHandler = new ExchangeSinkHandler(driverContext.blockFactory(), positions, System::currentTimeMillis);
300-
var sinkOperator = new ExchangeSinkOperator(sinkHandler.createExchangeSink(), Function.identity());
300+
var sinkOperator = new ExchangeSinkOperator(sinkHandler.createExchangeSink(() -> {}), Function.identity());
301301
final var delayOperator = new EvalOperator(driverContext.blockFactory(), new EvalOperator.ExpressionEvaluator() {
302302
@Override
303303
public Block eval(Page page) {
@@ -335,7 +335,7 @@ public void testResumeOnEarlyFinish() throws Exception {
335335
var sourceHandler = new ExchangeSourceHandler(between(1, 5), threadPool.executor("esql"), sourceFuture);
336336
var sinkHandler = new ExchangeSinkHandler(driverContext.blockFactory(), between(1, 5), System::currentTimeMillis);
337337
var sourceOperator = new ExchangeSourceOperator(sourceHandler.createExchangeSource());
338-
var sinkOperator = new ExchangeSinkOperator(sinkHandler.createExchangeSink(), Function.identity());
338+
var sinkOperator = new ExchangeSinkOperator(sinkHandler.createExchangeSink(() -> {}), Function.identity());
339339
Driver driver = new Driver(driverContext, sourceOperator, List.of(), sinkOperator, () -> {});
340340
PlainActionFuture<Void> future = new PlainActionFuture<>();
341341
Driver.start(threadPool.getThreadContext(), threadPool.executor("esql"), driver, between(1, 1000), future);

x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/ForkingOperatorTestCase.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -248,7 +248,7 @@ List<Driver> createDriversForInput(List<Page> input, List<Page> results, boolean
248248
simpleWithMode(AggregatorMode.INTERMEDIATE).get(driver1Context),
249249
intermediateOperatorItr.next()
250250
),
251-
new ExchangeSinkOperator(sinkExchanger.createExchangeSink(), Function.identity()),
251+
new ExchangeSinkOperator(sinkExchanger.createExchangeSink(() -> {}), Function.identity()),
252252
() -> {}
253253
)
254254
);

x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/exchange/ExchangeServiceTests.java

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -97,8 +97,8 @@ public void testBasic() throws Exception {
9797
pages[i] = new Page(blockFactory.newConstantIntBlockWith(i, 2));
9898
}
9999
ExchangeSinkHandler sinkExchanger = new ExchangeSinkHandler(blockFactory, 2, threadPool.relativeTimeInMillisSupplier());
100-
ExchangeSink sink1 = sinkExchanger.createExchangeSink();
101-
ExchangeSink sink2 = sinkExchanger.createExchangeSink();
100+
ExchangeSink sink1 = sinkExchanger.createExchangeSink(() -> {});
101+
ExchangeSink sink2 = sinkExchanger.createExchangeSink(() -> {});
102102
PlainActionFuture<Void> sourceCompletion = new PlainActionFuture<>();
103103
ExchangeSourceHandler sourceExchanger = new ExchangeSourceHandler(3, threadPool.executor(ESQL_TEST_EXECUTOR), sourceCompletion);
104104
ExchangeSource source = sourceExchanger.createExchangeSource();
@@ -343,7 +343,7 @@ public void testConcurrentWithHandlers() {
343343
sourceExchanger.addRemoteSink(sinkHandler::fetchPageAsync, randomBoolean(), randomIntBetween(1, 3), ActionListener.noop());
344344
sinkHandlers.add(sinkHandler);
345345
}
346-
return sinkHandler.createExchangeSink();
346+
return sinkHandler.createExchangeSink(() -> {});
347347
};
348348
final int maxInputSeqNo = rarely() ? -1 : randomIntBetween(0, 50_000);
349349
final int maxOutputSeqNo = rarely() ? -1 : randomIntBetween(0, 50_000);
@@ -405,7 +405,7 @@ public void testExchangeSourceContinueOnFailure() {
405405
assertTrue(sinkFailed.get());
406406
failedSinks.incrementAndGet();
407407
}));
408-
return sinkHandler.createExchangeSink();
408+
return sinkHandler.createExchangeSink(() -> {});
409409
};
410410
Set<Integer> actualSeqNos = runConcurrentTest(
411411
maxInputSeqNo,
@@ -430,7 +430,7 @@ public void testClosingSinks() {
430430
Page p1 = new Page(block1);
431431
Page p2 = new Page(block2);
432432
ExchangeSinkHandler sinkExchanger = new ExchangeSinkHandler(blockFactory, 2, threadPool.relativeTimeInMillisSupplier());
433-
ExchangeSink sink = sinkExchanger.createExchangeSink();
433+
ExchangeSink sink = sinkExchanger.createExchangeSink(() -> {});
434434
sink.addPage(p1);
435435
sink.addPage(p2);
436436
assertFalse(sink.waitForWriting().listener().isDone());
@@ -535,7 +535,7 @@ public void testConcurrentWithTransportActions() {
535535
maxInputSeqNo,
536536
maxOutputSeqNo,
537537
sourceHandler::createExchangeSource,
538-
sinkHandler::createExchangeSink
538+
() -> sinkHandler.createExchangeSink(() -> {})
539539
);
540540
var expectedSeqNos = IntStream.range(0, Math.min(maxInputSeqNo, maxOutputSeqNo)).boxed().collect(Collectors.toSet());
541541
assertThat(actualSeqNos, hasSize(expectedSeqNos.size()));
@@ -606,7 +606,12 @@ public void sendResponse(TransportResponse transportResponse) {
606606
);
607607
Exception err = expectThrows(
608608
Exception.class,
609-
() -> runConcurrentTest(maxSeqNo, maxSeqNo, sourceHandler::createExchangeSource, sinkHandler::createExchangeSink)
609+
() -> runConcurrentTest(
610+
maxSeqNo,
611+
maxSeqNo,
612+
sourceHandler::createExchangeSource,
613+
() -> sinkHandler.createExchangeSink(() -> {})
614+
)
610615
);
611616
Throwable cause = ExceptionsHelper.unwrap(err, IOException.class);
612617
assertNotNull(cause);
Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
8+
package org.elasticsearch.xpack.esql.action;
9+
10+
import org.elasticsearch.action.index.IndexRequestBuilder;
11+
import org.elasticsearch.index.IndexService;
12+
import org.elasticsearch.index.shard.IndexShard;
13+
import org.elasticsearch.indices.IndicesService;
14+
import org.elasticsearch.plugins.Plugin;
15+
import org.elasticsearch.test.transport.MockTransportService;
16+
import org.elasticsearch.xpack.esql.EsqlTestUtils;
17+
import org.elasticsearch.xpack.esql.plugin.ComputeService;
18+
19+
import java.io.IOException;
20+
import java.util.ArrayList;
21+
import java.util.Collection;
22+
import java.util.List;
23+
import java.util.concurrent.atomic.AtomicBoolean;
24+
25+
import static org.elasticsearch.index.shard.IndexShardTestCase.closeShardNoCheck;
26+
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
27+
import static org.hamcrest.Matchers.equalTo;
28+
29+
public class EsqlRetryIT extends AbstractEsqlIntegTestCase {
30+
31+
@Override
32+
protected Collection<Class<? extends Plugin>> nodePlugins() {
33+
List<Class<? extends Plugin>> plugins = new ArrayList<>(super.nodePlugins());
34+
plugins.add(MockTransportService.TestPlugin.class);
35+
return plugins;
36+
}
37+
38+
public void testRetryOnShardFailures() throws Exception {
39+
populateIndices();
40+
try {
41+
final AtomicBoolean relocated = new AtomicBoolean();
42+
for (String node : internalCluster().getNodeNames()) {
43+
// fail some target shards while handling the data node request
44+
MockTransportService.getInstance(node)
45+
.addRequestHandlingBehavior(ComputeService.DATA_ACTION_NAME, (handler, request, channel, task) -> {
46+
if (relocated.compareAndSet(false, true)) {
47+
closeOrFailShards(node);
48+
}
49+
handler.messageReceived(request, channel, task);
50+
});
51+
}
52+
try (var resp = run("FROM log-* | STATS COUNT(timestamp) | LIMIT 1")) {
53+
assertThat(EsqlTestUtils.getValuesList(resp).get(0).get(0), equalTo(7L));
54+
}
55+
} finally {
56+
for (String node : internalCluster().getNodeNames()) {
57+
MockTransportService.getInstance(node).clearAllRules();
58+
}
59+
}
60+
}
61+
62+
private void populateIndices() {
63+
internalCluster().ensureAtLeastNumDataNodes(2);
64+
assertAcked(prepareCreate("log-index-1").setSettings(indexSettings(between(1, 3), 1)).setMapping("timestamp", "type=date"));
65+
assertAcked(prepareCreate("log-index-2").setSettings(indexSettings(between(1, 3), 1)).setMapping("timestamp", "type=date"));
66+
List<IndexRequestBuilder> reqs = new ArrayList<>();
67+
reqs.add(prepareIndex("log-index-1").setSource("timestamp", "2015-07-08"));
68+
reqs.add(prepareIndex("log-index-1").setSource("timestamp", "2018-07-08"));
69+
reqs.add(prepareIndex("log-index-1").setSource("timestamp", "2020-03-03"));
70+
reqs.add(prepareIndex("log-index-1").setSource("timestamp", "2020-09-09"));
71+
reqs.add(prepareIndex("log-index-2").setSource("timestamp", "2019-10-12"));
72+
reqs.add(prepareIndex("log-index-2").setSource("timestamp", "2020-02-02"));
73+
reqs.add(prepareIndex("log-index-2").setSource("timestamp", "2020-10-10"));
74+
indexRandom(true, reqs);
75+
ensureGreen("log-index-1", "log-index-2");
76+
indicesAdmin().prepareRefresh("log-index-1", "log-index-2").get();
77+
}
78+
79+
private void closeOrFailShards(String nodeName) throws Exception {
80+
final IndicesService indicesService = internalCluster().getInstance(IndicesService.class, nodeName);
81+
for (IndexService indexService : indicesService) {
82+
for (IndexShard indexShard : indexService) {
83+
if (randomBoolean()) {
84+
indexShard.failShard("simulated", new IOException("simulated failure"));
85+
} else if (randomBoolean()) {
86+
closeShardNoCheck(indexShard);
87+
}
88+
}
89+
}
90+
}
91+
}

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlanner.java

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@
3636
import org.elasticsearch.compute.operator.SourceOperator;
3737
import org.elasticsearch.compute.operator.SourceOperator.SourceOperatorFactory;
3838
import org.elasticsearch.compute.operator.StringExtractOperator;
39-
import org.elasticsearch.compute.operator.exchange.ExchangeSinkHandler;
39+
import org.elasticsearch.compute.operator.exchange.ExchangeSink;
4040
import org.elasticsearch.compute.operator.exchange.ExchangeSinkOperator.ExchangeSinkOperatorFactory;
4141
import org.elasticsearch.compute.operator.exchange.ExchangeSourceHandler;
4242
import org.elasticsearch.compute.operator.exchange.ExchangeSourceOperator.ExchangeSourceOperatorFactory;
@@ -103,6 +103,7 @@
103103
import java.util.Objects;
104104
import java.util.Optional;
105105
import java.util.function.Function;
106+
import java.util.function.Supplier;
106107
import java.util.stream.IntStream;
107108
import java.util.stream.Stream;
108109

@@ -127,7 +128,7 @@ public class LocalExecutionPlanner {
127128
private final Settings settings;
128129
private final Configuration configuration;
129130
private final ExchangeSourceHandler exchangeSourceHandler;
130-
private final ExchangeSinkHandler exchangeSinkHandler;
131+
private final Supplier<ExchangeSink> exchangeSinkSupplier;
131132
private final EnrichLookupService enrichLookupService;
132133
private final LookupFromIndexService lookupFromIndexService;
133134
private final PhysicalOperationProviders physicalOperationProviders;
@@ -141,7 +142,7 @@ public LocalExecutionPlanner(
141142
Settings settings,
142143
Configuration configuration,
143144
ExchangeSourceHandler exchangeSourceHandler,
144-
ExchangeSinkHandler exchangeSinkHandler,
145+
Supplier<ExchangeSink> exchangeSinkSupplier,
145146
EnrichLookupService enrichLookupService,
146147
LookupFromIndexService lookupFromIndexService,
147148
PhysicalOperationProviders physicalOperationProviders
@@ -153,7 +154,7 @@ public LocalExecutionPlanner(
153154
this.blockFactory = blockFactory;
154155
this.settings = settings;
155156
this.exchangeSourceHandler = exchangeSourceHandler;
156-
this.exchangeSinkHandler = exchangeSinkHandler;
157+
this.exchangeSinkSupplier = exchangeSinkSupplier;
157158
this.enrichLookupService = enrichLookupService;
158159
this.lookupFromIndexService = lookupFromIndexService;
159160
this.physicalOperationProviders = physicalOperationProviders;
@@ -323,7 +324,7 @@ private PhysicalOperation planExchange(ExchangeExec exchangeExec, LocalExecution
323324
}
324325

325326
private PhysicalOperation planExchangeSink(ExchangeSinkExec exchangeSink, LocalExecutionPlannerContext context) {
326-
Objects.requireNonNull(exchangeSinkHandler, "ExchangeSinkHandler wasn't provided");
327+
Objects.requireNonNull(exchangeSinkSupplier, "ExchangeSinkSupplier wasn't provided");
327328
var child = exchangeSink.child();
328329

329330
PhysicalOperation source = plan(child, context);
@@ -332,7 +333,7 @@ private PhysicalOperation planExchangeSink(ExchangeSinkExec exchangeSink, LocalE
332333
? Function.identity()
333334
: alignPageToAttributes(exchangeSink.output(), source.layout);
334335

335-
return source.withSink(new ExchangeSinkOperatorFactory(exchangeSinkHandler::createExchangeSink, transformer), source.layout);
336+
return source.withSink(new ExchangeSinkOperatorFactory(exchangeSinkSupplier, transformer), source.layout);
336337
}
337338

338339
private PhysicalOperation planExchangeSource(ExchangeSourceExec exchangeSource, LocalExecutionPlannerContext context) {

0 commit comments

Comments
 (0)