Skip to content

Commit d3ff1cf

Browse files
committed
Retry ES|QL node requests on shard level failures (elastic#120774)
Today, ES|QL fails fast on any failure. This PR introduces support for retrying within a cluster when data-node requests fail. There are two types of failures that occur with data-node requests: entire request failures and individual shard failures. For individual shard failures, we can retry the next copies of the failing shards. For entire request failures, we can retry every shard in the node request if no pages have been received. On the handling side, ES|QL executes against a batch of shards concurrently. Here, we need to track whether any pages have been produced. If pages have been produced, the entire request must fail. Otherwise, we can track the failed shards and send them back to the sender for retries. There are two decisions around how quickly we should retry: 1. Should we notify the sender of failing shards immediately (via a different channel) to enable quick retries, or should we accumulate failures and return them in the final response? 2. What is the maximum number of inflight requests we should allow on the sending side? This PR considers failures often occurring when the cluster is under load or during a rolling upgrade. To prevent retries from adding more load and to allow the cluster to stabilize, this PR chooses to send shard failures in the final response and limits the number of inflight requests to one per data node
1 parent af9f562 commit d3ff1cf

File tree

8 files changed

+961
-202
lines changed

8 files changed

+961
-202
lines changed

docs/changelog/120774.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 120774
2+
summary: Retry ES|QL node requests on shard level failures
3+
area: ES|QL
4+
type: enhancement
5+
issues: []

server/src/main/java/org/elasticsearch/TransportVersions.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -179,6 +179,7 @@ static TransportVersion def(int id) {
179179
public static final TransportVersion INFERENCE_REQUEST_ADAPTIVE_RATE_LIMITING = def(8_839_0_00);
180180
public static final TransportVersion ML_INFERENCE_IBM_WATSONX_RERANK_ADDED = def(8_840_0_00);
181181
public static final TransportVersion COHERE_BIT_EMBEDDING_TYPE_SUPPORT_ADDED_BACKPORT_8_X = def(8_840_0_01);
182+
public static final TransportVersion ESQL_RETRY_ON_SHARD_LEVEL_FAILURE_BACKPORT_8_19 = def(8_840_0_02);
182183

183184
/*
184185
* STOP! READ THIS FIRST! No, really,

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/FailureCollector.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ public FailureCollector(int maxExceptions) {
4545
this.nonCancelledExceptionsPermits = new Semaphore(maxExceptions);
4646
}
4747

48-
private static Exception unwrapTransportException(TransportException te) {
48+
public static Exception unwrapTransportException(TransportException te) {
4949
final Throwable cause = te.getCause();
5050
if (cause == null) {
5151
return te;
Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
8+
package org.elasticsearch.xpack.esql.action;
9+
10+
import org.elasticsearch.action.index.IndexRequestBuilder;
11+
import org.elasticsearch.index.IndexService;
12+
import org.elasticsearch.index.shard.IndexShard;
13+
import org.elasticsearch.indices.IndicesService;
14+
import org.elasticsearch.plugins.Plugin;
15+
import org.elasticsearch.test.transport.MockTransportService;
16+
import org.elasticsearch.xpack.esql.EsqlTestUtils;
17+
import org.elasticsearch.xpack.esql.plugin.ComputeService;
18+
19+
import java.io.IOException;
20+
import java.util.ArrayList;
21+
import java.util.Collection;
22+
import java.util.List;
23+
import java.util.concurrent.atomic.AtomicBoolean;
24+
25+
import static org.elasticsearch.index.shard.IndexShardTestCase.closeShardNoCheck;
26+
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
27+
import static org.hamcrest.Matchers.equalTo;
28+
29+
public class EsqlRetryIT extends AbstractEsqlIntegTestCase {
30+
31+
@Override
32+
protected Collection<Class<? extends Plugin>> nodePlugins() {
33+
List<Class<? extends Plugin>> plugins = new ArrayList<>(super.nodePlugins());
34+
plugins.add(MockTransportService.TestPlugin.class);
35+
return plugins;
36+
}
37+
38+
public void testRetryOnShardFailures() throws Exception {
39+
populateIndices();
40+
try {
41+
final AtomicBoolean relocated = new AtomicBoolean();
42+
for (String node : internalCluster().getNodeNames()) {
43+
// fail some target shards while handling the data node request
44+
MockTransportService.getInstance(node)
45+
.addRequestHandlingBehavior(ComputeService.DATA_ACTION_NAME, (handler, request, channel, task) -> {
46+
if (relocated.compareAndSet(false, true)) {
47+
closeOrFailShards(node);
48+
}
49+
handler.messageReceived(request, channel, task);
50+
});
51+
}
52+
try (var resp = run("FROM log-* | STATS COUNT(timestamp) | LIMIT 1")) {
53+
assertThat(EsqlTestUtils.getValuesList(resp).get(0).get(0), equalTo(7L));
54+
}
55+
} finally {
56+
for (String node : internalCluster().getNodeNames()) {
57+
MockTransportService.getInstance(node).clearAllRules();
58+
}
59+
}
60+
}
61+
62+
private void populateIndices() {
63+
internalCluster().ensureAtLeastNumDataNodes(2);
64+
assertAcked(prepareCreate("log-index-1").setSettings(indexSettings(between(1, 3), 1)).setMapping("timestamp", "type=date"));
65+
assertAcked(prepareCreate("log-index-2").setSettings(indexSettings(between(1, 3), 1)).setMapping("timestamp", "type=date"));
66+
List<IndexRequestBuilder> reqs = new ArrayList<>();
67+
reqs.add(prepareIndex("log-index-1").setSource("timestamp", "2015-07-08"));
68+
reqs.add(prepareIndex("log-index-1").setSource("timestamp", "2018-07-08"));
69+
reqs.add(prepareIndex("log-index-1").setSource("timestamp", "2020-03-03"));
70+
reqs.add(prepareIndex("log-index-1").setSource("timestamp", "2020-09-09"));
71+
reqs.add(prepareIndex("log-index-2").setSource("timestamp", "2019-10-12"));
72+
reqs.add(prepareIndex("log-index-2").setSource("timestamp", "2020-02-02"));
73+
reqs.add(prepareIndex("log-index-2").setSource("timestamp", "2020-10-10"));
74+
indexRandom(true, reqs);
75+
ensureGreen("log-index-1", "log-index-2");
76+
indicesAdmin().prepareRefresh("log-index-1", "log-index-2").get();
77+
}
78+
79+
private void closeOrFailShards(String nodeName) throws Exception {
80+
final IndicesService indicesService = internalCluster().getInstance(IndicesService.class, nodeName);
81+
for (IndexService indexService : indicesService) {
82+
for (IndexShard indexShard : indexService) {
83+
if (randomBoolean()) {
84+
indexShard.failShard("simulated", new IOException("simulated failure"));
85+
} else if (randomBoolean()) {
86+
closeShardNoCheck(indexShard);
87+
}
88+
}
89+
}
90+
}
91+
}

0 commit comments

Comments
 (0)