Skip to content

Commit 6c67d6a

Browse files
committed
Return shard level failures in node response
1 parent 26e5373 commit 6c67d6a

File tree

4 files changed

+270
-77
lines changed

4 files changed

+270
-77
lines changed

server/src/main/java/org/elasticsearch/TransportVersions.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -166,6 +166,7 @@ static TransportVersion def(int id) {
166166
public static final TransportVersion ESQL_RESPONSE_PARTIAL = def(8_832_00_0);
167167
public static final TransportVersion RANK_DOC_OPTIONAL_METADATA_FOR_EXPLAIN = def(8_833_00_0);
168168
public static final TransportVersion ILM_ADD_SEARCHABLE_SNAPSHOT_ADD_REPLICATE_FOR = def(8_834_00_0);
169+
public static final TransportVersion ESQL_RETRY_ON_SHARD_LEVEL_FAILURE = def(8_835_00_0);
169170

170171
/*
171172
* STOP! READ THIS FIRST! No, really,
Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
8+
package org.elasticsearch.xpack.esql.action;
9+
10+
import org.elasticsearch.action.index.IndexRequestBuilder;
11+
import org.elasticsearch.index.IndexService;
12+
import org.elasticsearch.index.shard.IndexShard;
13+
import org.elasticsearch.indices.IndicesService;
14+
import org.elasticsearch.plugins.Plugin;
15+
import org.elasticsearch.test.transport.MockTransportService;
16+
import org.elasticsearch.xpack.esql.EsqlTestUtils;
17+
import org.elasticsearch.xpack.esql.plugin.ComputeService;
18+
19+
import java.io.IOException;
20+
import java.util.ArrayList;
21+
import java.util.Collection;
22+
import java.util.List;
23+
import java.util.concurrent.atomic.AtomicBoolean;
24+
25+
import static org.elasticsearch.index.shard.IndexShardTestCase.closeShardNoCheck;
26+
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
27+
import static org.hamcrest.Matchers.equalTo;
28+
29+
public class EsqlRetryIT extends AbstractEsqlIntegTestCase {
30+
31+
@Override
32+
protected Collection<Class<? extends Plugin>> nodePlugins() {
33+
List<Class<? extends Plugin>> plugins = new ArrayList<>(super.nodePlugins());
34+
plugins.add(MockTransportService.TestPlugin.class);
35+
return plugins;
36+
}
37+
38+
public void testRetryOnShardFailures() throws Exception {
39+
populateIndices();
40+
try {
41+
final AtomicBoolean relocated = new AtomicBoolean();
42+
for (String node : internalCluster().getNodeNames()) {
43+
// fail some target shards while handling the data node request
44+
MockTransportService.getInstance(node)
45+
.addRequestHandlingBehavior(ComputeService.DATA_ACTION_NAME, (handler, request, channel, task) -> {
46+
if (relocated.compareAndSet(false, true)) {
47+
closeOrFailShards(node);
48+
}
49+
handler.messageReceived(request, channel, task);
50+
});
51+
}
52+
try (var resp = run("FROM log-* | STATS COUNT(timestamp) | LIMIT 1")) {
53+
assertThat(EsqlTestUtils.getValuesList(resp).get(0).get(0), equalTo(7L));
54+
}
55+
} finally {
56+
for (String node : internalCluster().getNodeNames()) {
57+
MockTransportService.getInstance(node).clearAllRules();
58+
}
59+
}
60+
}
61+
62+
private void populateIndices() {
63+
internalCluster().ensureAtLeastNumDataNodes(2);
64+
assertAcked(prepareCreate("log-index-1").setSettings(indexSettings(between(1, 3), 1)).setMapping("timestamp", "type=date"));
65+
assertAcked(prepareCreate("log-index-2").setSettings(indexSettings(between(1, 3), 1)).setMapping("timestamp", "type=date"));
66+
List<IndexRequestBuilder> reqs = new ArrayList<>();
67+
reqs.add(prepareIndex("log-index-1").setSource("timestamp", "2015-07-08"));
68+
reqs.add(prepareIndex("log-index-1").setSource("timestamp", "2018-07-08"));
69+
reqs.add(prepareIndex("log-index-1").setSource("timestamp", "2020-03-03"));
70+
reqs.add(prepareIndex("log-index-1").setSource("timestamp", "2020-09-09"));
71+
reqs.add(prepareIndex("log-index-2").setSource("timestamp", "2019-10-12"));
72+
reqs.add(prepareIndex("log-index-2").setSource("timestamp", "2020-02-02"));
73+
reqs.add(prepareIndex("log-index-2").setSource("timestamp", "2020-10-10"));
74+
indexRandom(true, reqs);
75+
ensureGreen("log-index-1", "log-index-2");
76+
indicesAdmin().prepareRefresh("log-index-1", "log-index-2").get();
77+
}
78+
79+
private void closeOrFailShards(String nodeName) throws Exception {
80+
final IndicesService indicesService = internalCluster().getInstance(IndicesService.class, nodeName);
81+
for (IndexService indexService : indicesService) {
82+
for (IndexShard indexShard : indexService) {
83+
if (randomBoolean()) {
84+
indexShard.failShard("simulated", new IOException("simulated failure"));
85+
} else if (randomBoolean()) {
86+
closeShardNoCheck(indexShard);
87+
}
88+
}
89+
}
90+
}
91+
}

0 commit comments

Comments
 (0)