Skip to content

Commit a85f65f

Browse files
authored
Do not retry CBE (elastic#124300) (elastic#124538)
(cherry picked from commit 9e7a90f)
1 parent db4fd94 commit a85f65f

File tree

2 files changed

+16
-1
lines changed

2 files changed

+16
-1
lines changed

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/DataNodeRequestSender.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
import org.elasticsearch.action.search.SearchShardsResponse;
1818
import org.elasticsearch.action.support.TransportActions;
1919
import org.elasticsearch.cluster.node.DiscoveryNode;
20+
import org.elasticsearch.common.breaker.CircuitBreakingException;
2021
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
2122
import org.elasticsearch.compute.operator.DriverProfile;
2223
import org.elasticsearch.compute.operator.FailureCollector;
@@ -209,8 +210,9 @@ private static Exception unwrapFailure(Exception e) {
209210
private void trackShardLevelFailure(ShardId shardId, boolean fatal, Exception originalEx) {
210211
final Exception e = unwrapFailure(originalEx);
211212
final boolean isTaskCanceledException = ExceptionsHelper.unwrap(e, TaskCancelledException.class) != null;
213+
final boolean isCircuitBreakerException = ExceptionsHelper.unwrap(e, CircuitBreakingException.class) != null;
212214
shardFailures.compute(shardId, (k, current) -> {
213-
boolean mergedFatal = fatal || isTaskCanceledException;
215+
boolean mergedFatal = fatal || isTaskCanceledException || isCircuitBreakerException;
214216
return current == null
215217
? new ShardFailure(mergedFatal, e)
216218
: new ShardFailure(

x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/plugin/DataNodeRequestSenderTests.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@
1515
import org.elasticsearch.action.support.PlainActionFuture;
1616
import org.elasticsearch.cluster.node.DiscoveryNode;
1717
import org.elasticsearch.cluster.node.DiscoveryNodeUtils;
18+
import org.elasticsearch.common.breaker.CircuitBreaker.Durability;
19+
import org.elasticsearch.common.breaker.CircuitBreakingException;
1820
import org.elasticsearch.common.settings.Settings;
1921
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
2022
import org.elasticsearch.common.util.concurrent.EsExecutors;
@@ -271,6 +273,17 @@ public void testNonFatalFailedOnAllNodes() {
271273
assertThat(sent.size(), equalTo(2));
272274
}
273275

276+
public void testDoNotRetryCircuitBreakerException() {
277+
var targetShards = List.of(targetShard(shard1, node1, node2));
278+
var sent = ConcurrentCollections.newQueue();
279+
var future = sendRequests(targetShards, false, (node, shardIds, aliasFilters, listener) -> {
280+
sent.add(new NodeRequest(node, shardIds, aliasFilters));
281+
runWithDelay(() -> listener.onFailure(new CircuitBreakingException("cbe", randomFrom(Durability.values())), false));
282+
});
283+
expectThrows(CircuitBreakingException.class, equalTo("cbe"), future::actionGet);
284+
assertThat(sent.size(), equalTo(1));
285+
}
286+
274287
static DataNodeRequestSender.TargetShard targetShard(ShardId shardId, DiscoveryNode... nodes) {
275288
return new DataNodeRequestSender.TargetShard(shardId, new ArrayList<>(Arrays.asList(nodes)), null);
276289
}

0 commit comments

Comments
 (0)