Skip to content

Commit c43748d

Browse files
committed
Handle index deletion while querying in ES|QL (elastic#137702)
If the index is deleted while querying, ES|QL might try to get the index routing from the latest cluster state. This could throw an index-not-found exception, which is not handled properly, causing the user to never receive a response, even though execution has completed. This also results in a dangling task in the task manager. Relates elastic#126653 Closes elastic#135863 (cherry picked from commit 0051df8)
1 parent 1694009 commit c43748d

File tree

3 files changed

+125
-21
lines changed

3 files changed

+125
-21
lines changed

docs/changelog/137702.yaml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
pr: 137702
2+
summary: Handle index deletion while querying in ES|QL
3+
area: ES|QL
4+
type: bug
5+
issues:
6+
- 135863

x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlRetryIT.java

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,9 @@
88
package org.elasticsearch.xpack.esql.action;
99

1010
import org.elasticsearch.action.index.IndexRequestBuilder;
11+
import org.elasticsearch.cluster.ClusterState;
12+
import org.elasticsearch.cluster.metadata.IndexMetadata;
13+
import org.elasticsearch.cluster.service.ClusterService;
1114
import org.elasticsearch.index.IndexService;
1215
import org.elasticsearch.index.shard.IndexShard;
1316
import org.elasticsearch.indices.IndicesService;
@@ -20,11 +23,14 @@
2023
import java.util.ArrayList;
2124
import java.util.Collection;
2225
import java.util.List;
26+
import java.util.concurrent.CountDownLatch;
27+
import java.util.concurrent.TimeUnit;
2328
import java.util.concurrent.atomic.AtomicBoolean;
2429

2530
import static org.elasticsearch.index.shard.IndexShardTestCase.closeShardNoCheck;
2631
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
2732
import static org.hamcrest.Matchers.equalTo;
33+
import static org.hamcrest.Matchers.not;
2834

2935
public class EsqlRetryIT extends AbstractEsqlIntegTestCase {
3036

@@ -59,6 +65,37 @@ public void testRetryOnShardFailures() throws Exception {
5965
}
6066
}
6167

68+
public void testQueryWhileDeletingIndices() {
69+
populateIndices();
70+
CountDownLatch waitForDeletion = new CountDownLatch(1);
71+
try {
72+
final AtomicBoolean deleted = new AtomicBoolean();
73+
for (String node : internalCluster().getNodeNames()) {
74+
MockTransportService.getInstance(node)
75+
.addRequestHandlingBehavior(ComputeService.DATA_ACTION_NAME, (handler, request, channel, task) -> {
76+
if (deleted.compareAndSet(false, true)) {
77+
deleteIndexCompletely("log-index-2");
78+
waitForDeletion.countDown();
79+
} else {
80+
assertTrue(waitForDeletion.await(10, TimeUnit.SECONDS));
81+
}
82+
handler.messageReceived(request, channel, task);
83+
});
84+
}
85+
EsqlQueryRequest request = new EsqlQueryRequest();
86+
request.query("FROM log-* | STATS COUNT(timestamp) | LIMIT 1");
87+
request.allowPartialResults(true);
88+
try (var resp = run(request)) {
89+
assertTrue(resp.isPartial());
90+
assertThat(EsqlTestUtils.getValuesList(resp).get(0).get(0), equalTo(4L));
91+
}
92+
} finally {
93+
for (String node : internalCluster().getNodeNames()) {
94+
MockTransportService.getInstance(node).clearAllRules();
95+
}
96+
}
97+
}
98+
6299
private void populateIndices() {
63100
internalCluster().ensureAtLeastNumDataNodes(2);
64101
assertAcked(prepareCreate("log-index-1").setSettings(indexSettings(between(1, 3), 1)).setMapping("timestamp", "type=date"));
@@ -88,4 +125,36 @@ private void closeOrFailShards(String nodeName) throws Exception {
88125
}
89126
}
90127
}
128+
129+
/**
130+
* Deletes the given index and ensures it is completely removed from the cluster state and from all nodes
131+
*/
132+
private void deleteIndexCompletely(String indexName) throws Exception {
133+
assertAcked(indicesAdmin().prepareDelete(indexName));
134+
String[] nodeNames = internalCluster().getNodeNames();
135+
assertBusy(() -> {
136+
for (String nodeName : nodeNames) {
137+
ClusterState clusterState = internalCluster().getInstance(ClusterService.class, nodeName).state();
138+
for (IndexMetadata imd : clusterState.metadata().indices().values()) {
139+
assertThat(
140+
"Index [" + indexName + "] still exists in the cluster state on [" + nodeName + "]",
141+
imd.getIndex().getName(),
142+
not(equalTo(indexName))
143+
);
144+
}
145+
}
146+
for (String nodeName : nodeNames) {
147+
final IndicesService indicesService = internalCluster().getInstance(IndicesService.class, nodeName);
148+
for (IndexService indexService : indicesService) {
149+
for (IndexShard indexShard : indexService) {
150+
assertThat(
151+
"Index [" + indexName + "] still exists on node [" + nodeName + "]",
152+
indexShard.shardId().getIndexName(),
153+
not(equalTo(indexName))
154+
);
155+
}
156+
}
157+
}
158+
});
159+
}
91160
}

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/DataNodeRequestSender.java

Lines changed: 50 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,8 @@
2828
import org.elasticsearch.index.Index;
2929
import org.elasticsearch.index.query.QueryBuilder;
3030
import org.elasticsearch.index.shard.ShardId;
31+
import org.elasticsearch.logging.LogManager;
32+
import org.elasticsearch.logging.Logger;
3133
import org.elasticsearch.search.SearchShardTarget;
3234
import org.elasticsearch.search.internal.AliasFilter;
3335
import org.elasticsearch.tasks.CancellableTask;
@@ -64,6 +66,8 @@
6466
*/
6567
abstract class DataNodeRequestSender {
6668

69+
private static final Logger LOGGER = LogManager.getLogger(DataNodeRequestSender.class);
70+
6771
/**
6872
* Query order according to the
6973
* <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/node-roles-overview.html">node roles</a>.
@@ -275,38 +279,52 @@ private List<ShardSearchFailure> selectFailures() {
275279
private void sendOneNodeRequest(TargetShards targetShards, ComputeListener computeListener, NodeRequest request) {
276280
final ActionListener<DriverCompletionInfo> listener = computeListener.acquireCompute();
277281
sendRequest(request.node, request.shardIds, request.aliasFilters, new NodeListener() {
278-
void onAfter(DriverCompletionInfo info) {
282+
283+
void onAfterRequest() {
279284
nodePermits.get(request.node).release();
280285
if (concurrentRequests != null) {
281286
concurrentRequests.release();
282287
}
283288
trySendingRequestsForPendingShards(targetShards, computeListener);
284-
listener.onResponse(info);
285289
}
286290

287291
@Override
288292
public void onResponse(DataNodeComputeResponse response) {
289-
// remove failures of successful shards
290-
for (ShardId shardId : request.shardIds()) {
291-
if (response.shardLevelFailures().containsKey(shardId) == false) {
292-
shardFailures.remove(shardId);
293+
try {
294+
// remove failures of successful shards
295+
for (ShardId shardId : request.shardIds()) {
296+
if (response.shardLevelFailures().containsKey(shardId) == false) {
297+
shardFailures.remove(shardId);
298+
}
293299
}
300+
for (var entry : response.shardLevelFailures().entrySet()) {
301+
final ShardId shardId = entry.getKey();
302+
trackShardLevelFailure(shardId, false, entry.getValue());
303+
pendingShardIds.add(shardId);
304+
}
305+
onAfterRequest();
306+
} catch (Exception ex) {
307+
expectNoFailure("expect no failure while handling data node response", ex);
308+
listener.onFailure(ex);
309+
return;
294310
}
295-
for (Map.Entry<ShardId, Exception> e : response.shardLevelFailures().entrySet()) {
296-
final ShardId shardId = e.getKey();
297-
trackShardLevelFailure(shardId, false, e.getValue());
298-
pendingShardIds.add(shardId);
299-
}
300-
onAfter(response.completionInfo());
311+
listener.onResponse(response.completionInfo());
301312
}
302313

303314
@Override
304315
public void onFailure(Exception e, boolean receivedData) {
305-
for (ShardId shardId : request.shardIds) {
306-
trackShardLevelFailure(shardId, receivedData, e);
307-
pendingShardIds.add(shardId);
316+
try {
317+
for (ShardId shardId : request.shardIds) {
318+
trackShardLevelFailure(shardId, receivedData, e);
319+
pendingShardIds.add(shardId);
320+
}
321+
onAfterRequest();
322+
} catch (Exception ex) {
323+
expectNoFailure("expect no failure while handling failure of data node request", ex);
324+
listener.onFailure(ex);
325+
return;
308326
}
309-
onAfter(DriverCompletionInfo.EMPTY);
327+
listener.onResponse(DriverCompletionInfo.EMPTY);
310328
}
311329

312330
@Override
@@ -318,6 +336,11 @@ public void onSkip() {
318336
onResponse(new DataNodeComputeResponse(DriverCompletionInfo.EMPTY, Map.of()));
319337
}
320338
}
339+
340+
private void expectNoFailure(String message, Exception e) {
341+
LOGGER.error(message, e);
342+
assert false : new AssertionError(message, e);
343+
}
321344
});
322345
}
323346

@@ -506,15 +529,21 @@ Map<ShardId, List<DiscoveryNode>> resolveShards(Set<ShardId> shardIds) {
506529
var state = clusterService.state();
507530
var nodes = Maps.<ShardId, List<DiscoveryNode>>newMapWithExpectedSize(shardIds.size());
508531
for (var shardId : shardIds) {
509-
nodes.put(
510-
shardId,
511-
state.routingTable()
532+
List<DiscoveryNode> allocatedNodes;
533+
try {
534+
allocatedNodes = state.routingTable()
512535
.shardRoutingTable(shardId)
513536
.allShards()
514537
.filter(shard -> shard.active() && shard.isSearchable())
515538
.map(shard -> state.nodes().get(shard.currentNodeId()))
516-
.toList()
517-
);
539+
.toList();
540+
541+
} catch (Exception ignored) {
542+
// If the target index is deleted or the target shard is not found after the query has started,
543+
// we skip resolving its new shard routing, and that shard will not be retried.
544+
continue;
545+
}
546+
nodes.put(shardId, allocatedNodes);
518547
}
519548
return nodes;
520549
}

0 commit comments

Comments
 (0)