Skip to content
5 changes: 5 additions & 0 deletions docs/changelog/120774.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 120774
summary: Retry ES|QL node requests on shard level failures
area: ES|QL
type: enhancement
issues: []
2 changes: 2 additions & 0 deletions server/src/main/java/org/elasticsearch/TransportVersions.java
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,8 @@ static TransportVersion def(int id) {
public static final TransportVersion TRANSPORT_STATS_HANDLING_TIME_REQUIRED = def(9_003_0_00);
public static final TransportVersion REMOVE_DESIRED_NODE_VERSION = def(9_004_0_00);
public static final TransportVersion ESQL_DRIVER_TASK_DESCRIPTION = def(9_005_0_00);
public static final TransportVersion ESQL_RETRY_ON_SHARD_LEVEL_FAILURE = def(9_006_0_00);

/*
* STOP! READ THIS FIRST! No, really,
* ____ _____ ___ ____ _ ____ _____ _ ____ _____ _ _ ___ ____ _____ ___ ____ ____ _____ _
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ public FailureCollector(int maxExceptions) {
this.nonCancelledExceptionsPermits = new Semaphore(maxExceptions);
}

private static Exception unwrapTransportException(TransportException te) {
public static Exception unwrapTransportException(TransportException te) {
final Throwable cause = te.getCause();
if (cause == null) {
return te;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.xpack.esql.action;

import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.test.transport.MockTransportService;
import org.elasticsearch.xpack.esql.EsqlTestUtils;
import org.elasticsearch.xpack.esql.plugin.ComputeService;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;

import static org.elasticsearch.index.shard.IndexShardTestCase.closeShardNoCheck;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.Matchers.equalTo;

public class EsqlRetryIT extends AbstractEsqlIntegTestCase {

@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
List<Class<? extends Plugin>> plugins = new ArrayList<>(super.nodePlugins());
plugins.add(MockTransportService.TestPlugin.class);
return plugins;
}

public void testRetryOnShardFailures() throws Exception {
populateIndices();
try {
final AtomicBoolean relocated = new AtomicBoolean();
for (String node : internalCluster().getNodeNames()) {
// fail some target shards while handling the data node request
MockTransportService.getInstance(node)
.addRequestHandlingBehavior(ComputeService.DATA_ACTION_NAME, (handler, request, channel, task) -> {
if (relocated.compareAndSet(false, true)) {
closeOrFailShards(node);
}
handler.messageReceived(request, channel, task);
});
}
try (var resp = run("FROM log-* | STATS COUNT(timestamp) | LIMIT 1")) {
assertThat(EsqlTestUtils.getValuesList(resp).get(0).get(0), equalTo(7L));
}
} finally {
for (String node : internalCluster().getNodeNames()) {
MockTransportService.getInstance(node).clearAllRules();
}
}
}

private void populateIndices() {
internalCluster().ensureAtLeastNumDataNodes(2);
assertAcked(prepareCreate("log-index-1").setSettings(indexSettings(between(1, 3), 1)).setMapping("timestamp", "type=date"));
assertAcked(prepareCreate("log-index-2").setSettings(indexSettings(between(1, 3), 1)).setMapping("timestamp", "type=date"));
List<IndexRequestBuilder> reqs = new ArrayList<>();
reqs.add(prepareIndex("log-index-1").setSource("timestamp", "2015-07-08"));
reqs.add(prepareIndex("log-index-1").setSource("timestamp", "2018-07-08"));
reqs.add(prepareIndex("log-index-1").setSource("timestamp", "2020-03-03"));
reqs.add(prepareIndex("log-index-1").setSource("timestamp", "2020-09-09"));
reqs.add(prepareIndex("log-index-2").setSource("timestamp", "2019-10-12"));
reqs.add(prepareIndex("log-index-2").setSource("timestamp", "2020-02-02"));
reqs.add(prepareIndex("log-index-2").setSource("timestamp", "2020-10-10"));
indexRandom(true, reqs);
ensureGreen("log-index-1", "log-index-2");
indicesAdmin().prepareRefresh("log-index-1", "log-index-2").get();
}

private void closeOrFailShards(String nodeName) throws Exception {
final IndicesService indicesService = internalCluster().getInstance(IndicesService.class, nodeName);
for (IndexService indexService : indicesService) {
for (IndexShard indexShard : indexService) {
if (randomBoolean()) {
indexShard.failShard("simulated", new IOException("simulated failure"));
} else if (randomBoolean()) {
closeShardNoCheck(indexShard);
}
}
}
}
}
Loading