Skip to content

Commit 0051df8

Browse files
authored
Handle index deletion while querying in ES|QL (#137702)
If the index is deleted while querying, ES|QL might try to get the index routing from the latest cluster state. This could throw an index-not-found exception, which is not handled properly, causing the user to never receive a response, even though execution has completed. This also results in a dangling task in the task manager. Relates #126653 Closes #135863
1 parent f08651e commit 0051df8

File tree

3 files changed

+125
-21
lines changed

3 files changed

+125
-21
lines changed

docs/changelog/137702.yaml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
pr: 137702
2+
summary: Handle index deletion while querying in ES|QL
3+
area: ES|QL
4+
type: bug
5+
issues:
6+
- 135863

x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlRetryIT.java

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,9 @@
88
package org.elasticsearch.xpack.esql.action;
99

1010
import org.elasticsearch.action.index.IndexRequestBuilder;
11+
import org.elasticsearch.cluster.ClusterState;
12+
import org.elasticsearch.cluster.metadata.IndexMetadata;
13+
import org.elasticsearch.cluster.service.ClusterService;
1114
import org.elasticsearch.index.IndexService;
1215
import org.elasticsearch.index.shard.IndexShard;
1316
import org.elasticsearch.indices.IndicesService;
@@ -20,11 +23,14 @@
2023
import java.util.ArrayList;
2124
import java.util.Collection;
2225
import java.util.List;
26+
import java.util.concurrent.CountDownLatch;
27+
import java.util.concurrent.TimeUnit;
2328
import java.util.concurrent.atomic.AtomicBoolean;
2429

2530
import static org.elasticsearch.index.shard.IndexShardTestCase.closeShardNoCheck;
2631
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
2732
import static org.hamcrest.Matchers.equalTo;
33+
import static org.hamcrest.Matchers.not;
2834

2935
public class EsqlRetryIT extends AbstractEsqlIntegTestCase {
3036

@@ -59,6 +65,37 @@ public void testRetryOnShardFailures() throws Exception {
5965
}
6066
}
6167

68+
public void testQueryWhileDeletingIndices() {
69+
populateIndices();
70+
CountDownLatch waitForDeletion = new CountDownLatch(1);
71+
try {
72+
final AtomicBoolean deleted = new AtomicBoolean();
73+
for (String node : internalCluster().getNodeNames()) {
74+
MockTransportService.getInstance(node)
75+
.addRequestHandlingBehavior(ComputeService.DATA_ACTION_NAME, (handler, request, channel, task) -> {
76+
if (deleted.compareAndSet(false, true)) {
77+
deleteIndexCompletely("log-index-2");
78+
waitForDeletion.countDown();
79+
} else {
80+
assertTrue(waitForDeletion.await(10, TimeUnit.SECONDS));
81+
}
82+
handler.messageReceived(request, channel, task);
83+
});
84+
}
85+
EsqlQueryRequest request = new EsqlQueryRequest();
86+
request.query("FROM log-* | STATS COUNT(timestamp) | LIMIT 1");
87+
request.allowPartialResults(true);
88+
try (var resp = run(request)) {
89+
assertTrue(resp.isPartial());
90+
assertThat(EsqlTestUtils.getValuesList(resp).getFirst().getFirst(), equalTo(4L));
91+
}
92+
} finally {
93+
for (String node : internalCluster().getNodeNames()) {
94+
MockTransportService.getInstance(node).clearAllRules();
95+
}
96+
}
97+
}
98+
6299
private void populateIndices() {
63100
internalCluster().ensureAtLeastNumDataNodes(2);
64101
assertAcked(prepareCreate("log-index-1").setSettings(indexSettings(between(1, 3), 1)).setMapping("timestamp", "type=date"));
@@ -88,4 +125,36 @@ private void closeOrFailShards(String nodeName) throws Exception {
88125
}
89126
}
90127
}
128+
129+
/**
130+
* Deletes the given index and ensures it is completely removed from the cluster state and from all nodes
131+
*/
132+
private void deleteIndexCompletely(String indexName) throws Exception {
133+
assertAcked(indicesAdmin().prepareDelete(indexName));
134+
String[] nodeNames = internalCluster().getNodeNames();
135+
assertBusy(() -> {
136+
for (String nodeName : nodeNames) {
137+
ClusterState clusterState = internalCluster().getInstance(ClusterService.class, nodeName).state();
138+
for (IndexMetadata imd : clusterState.metadata().indicesAllProjects()) {
139+
assertThat(
140+
"Index [" + indexName + "] still exists in the cluster state on [" + nodeName + "]",
141+
imd.getIndex().getName(),
142+
not(equalTo(indexName))
143+
);
144+
}
145+
}
146+
for (String nodeName : nodeNames) {
147+
final IndicesService indicesService = internalCluster().getInstance(IndicesService.class, nodeName);
148+
for (IndexService indexService : indicesService) {
149+
for (IndexShard indexShard : indexService) {
150+
assertThat(
151+
"Index [" + indexName + "] still exists on node [" + nodeName + "]",
152+
indexShard.shardId().getIndexName(),
153+
not(equalTo(indexName))
154+
);
155+
}
156+
}
157+
}
158+
});
159+
}
91160
}

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/DataNodeRequestSender.java

Lines changed: 50 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,8 @@
3030
import org.elasticsearch.index.Index;
3131
import org.elasticsearch.index.query.QueryBuilder;
3232
import org.elasticsearch.index.shard.ShardId;
33+
import org.elasticsearch.logging.LogManager;
34+
import org.elasticsearch.logging.Logger;
3335
import org.elasticsearch.search.SearchShardTarget;
3436
import org.elasticsearch.search.internal.AliasFilter;
3537
import org.elasticsearch.tasks.CancellableTask;
@@ -67,6 +69,8 @@
6769
*/
6870
abstract class DataNodeRequestSender {
6971

72+
private static final Logger LOGGER = LogManager.getLogger(DataNodeRequestSender.class);
73+
7074
/**
7175
* Query order according to the
7276
* <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/node-roles-overview.html">node roles</a>.
@@ -283,38 +287,53 @@ private void sendOneNodeRequest(TargetShards targetShards, ComputeListener compu
283287
final ActionListener<DriverCompletionInfo> listener = computeListener.acquireCompute();
284288
sendRequest(request.node, request.shards, request.aliasFilters, new NodeListener() {
285289

286-
void onAfter(DriverCompletionInfo info) {
290+
void onAfterRequest() {
287291
nodePermits.get(request.node).release();
288292
if (concurrentRequests != null) {
289293
concurrentRequests.release();
290294
}
291295
trySendingRequestsForPendingShards(targetShards, computeListener);
292-
listener.onResponse(info);
293296
}
294297

295298
@Override
296299
public void onResponse(DataNodeComputeResponse response) {
297-
// remove failures of successful shards
298-
for (DataNodeRequest.Shard shard : request.shards()) {
299-
if (response.shardLevelFailures().containsKey(shard.shardId()) == false) {
300-
shardFailures.remove(shard.shardId());
300+
try {
301+
// remove failures of successful shards
302+
for (var shard : request.shards) {
303+
ShardId shardId = shard.shardId();
304+
if (response.shardLevelFailures().containsKey(shardId) == false) {
305+
shardFailures.remove(shardId);
306+
}
301307
}
308+
for (var entry : response.shardLevelFailures().entrySet()) {
309+
final ShardId shardId = entry.getKey();
310+
trackShardLevelFailure(shardId, false, entry.getValue());
311+
pendingShardIds.add(shardId);
312+
}
313+
onAfterRequest();
314+
} catch (Exception ex) {
315+
expectNoFailure("expect no failure while handling data node response", ex);
316+
listener.onFailure(ex);
317+
return;
302318
}
303-
for (var entry : response.shardLevelFailures().entrySet()) {
304-
final ShardId shardId = entry.getKey();
305-
trackShardLevelFailure(shardId, false, entry.getValue());
306-
pendingShardIds.add(shardId);
307-
}
308-
onAfter(response.completionInfo());
319+
listener.onResponse(response.completionInfo());
309320
}
310321

311322
@Override
312323
public void onFailure(Exception e, boolean receivedData) {
313-
for (DataNodeRequest.Shard shard : request.shards) {
314-
trackShardLevelFailure(shard.shardId(), receivedData, e);
315-
pendingShardIds.add(shard.shardId());
324+
try {
325+
for (var shard : request.shards) {
326+
ShardId shardId = shard.shardId();
327+
trackShardLevelFailure(shardId, receivedData, e);
328+
pendingShardIds.add(shardId);
329+
}
330+
onAfterRequest();
331+
} catch (Exception ex) {
332+
expectNoFailure("expect no failure while handling failure of data node request", ex);
333+
listener.onFailure(ex);
334+
return;
316335
}
317-
onAfter(DriverCompletionInfo.EMPTY);
336+
listener.onResponse(DriverCompletionInfo.EMPTY);
318337
}
319338

320339
@Override
@@ -326,6 +345,11 @@ public void onSkip() {
326345
onResponse(new DataNodeComputeResponse(DriverCompletionInfo.EMPTY, Map.of()));
327346
}
328347
}
348+
349+
private void expectNoFailure(String message, Exception e) {
350+
LOGGER.error(message, e);
351+
assert false : new AssertionError(message, e);
352+
}
329353
});
330354
}
331355

@@ -527,15 +551,20 @@ Map<ShardId, List<DiscoveryNode>> resolveShards(Set<ShardId> shardIds) {
527551
var project = projectResolver.getProjectState(clusterService.state());
528552
var nodes = Maps.<ShardId, List<DiscoveryNode>>newMapWithExpectedSize(shardIds.size());
529553
for (var shardId : shardIds) {
530-
nodes.put(
531-
shardId,
532-
project.routingTable()
554+
List<DiscoveryNode> allocatedNodes;
555+
try {
556+
allocatedNodes = project.routingTable()
533557
.shardRoutingTable(shardId)
534558
.allShards()
535559
.filter(shard -> shard.active() && shard.isSearchable())
536560
.map(shard -> project.cluster().nodes().get(shard.currentNodeId()))
537-
.toList()
538-
);
561+
.toList();
562+
} catch (Exception ignored) {
563+
// If the target index is deleted or the target shard is not found after the query has started,
564+
// we skip resolving its new shard routing, and that shard will not be retried.
565+
continue;
566+
}
567+
nodes.put(shardId, allocatedNodes);
539568
}
540569
return nodes;
541570
}

0 commit comments

Comments
 (0)