Skip to content

Commit 2283ca1

Browse files
committed
Do not retry CBE
1 parent b218f69 commit 2283ca1

File tree

2 files changed

+26
-13
lines changed

2 files changed

+26
-13
lines changed

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/DataNodeRequestSender.java

Lines changed: 12 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
import org.elasticsearch.action.search.SearchShardsResponse;
1818
import org.elasticsearch.action.support.TransportActions;
1919
import org.elasticsearch.cluster.node.DiscoveryNode;
20+
import org.elasticsearch.common.breaker.CircuitBreakingException;
2021
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
2122
import org.elasticsearch.compute.operator.DriverProfile;
2223
import org.elasticsearch.compute.operator.FailureCollector;
@@ -211,18 +212,18 @@ private static Exception unwrapFailure(Exception e) {
211212

212213
private void trackShardLevelFailure(ShardId shardId, boolean fatal, Exception originalEx) {
213214
final Exception e = unwrapFailure(originalEx);
214-
// Retain only one meaningful exception and avoid suppressing previous failures to minimize memory usage, especially when handling
215-
// many shards.
215+
final boolean isTaskCanceledException = ExceptionsHelper.unwrap(e, TaskCancelledException.class) != null;
216+
final boolean isCircuitBreakerException = ExceptionsHelper.unwrap(e, CircuitBreakingException.class) != null;
216217
shardFailures.compute(shardId, (k, current) -> {
217-
boolean mergedFatal = fatal || ExceptionsHelper.unwrap(e, TaskCancelledException.class) != null;
218-
if (current == null) {
219-
return new ShardFailure(mergedFatal, e);
220-
}
221-
mergedFatal |= current.fatal;
222-
if (e instanceof NoShardAvailableActionException || ExceptionsHelper.unwrap(e, TaskCancelledException.class) != null) {
223-
return new ShardFailure(mergedFatal, current.failure);
224-
}
225-
return new ShardFailure(mergedFatal, e);
218+
boolean mergedFatal = fatal || isTaskCanceledException || isCircuitBreakerException;
219+
return current == null
220+
? new ShardFailure(mergedFatal, e)
221+
: new ShardFailure(
222+
mergedFatal || current.fatal,
223+
// Retain only one meaningful exception and avoid suppressing previous failures to minimize memory usage,
224+
// especially when handling many shards.
225+
isTaskCanceledException || e instanceof NoShardAvailableActionException ? current.failure : e
226+
);
226227
});
227228
}
228229

x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/plugin/DataNodeRequestSenderTests.java

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@
1515
import org.elasticsearch.action.support.PlainActionFuture;
1616
import org.elasticsearch.cluster.node.DiscoveryNode;
1717
import org.elasticsearch.cluster.node.DiscoveryNodeUtils;
18+
import org.elasticsearch.common.breaker.CircuitBreaker.Durability;
19+
import org.elasticsearch.common.breaker.CircuitBreakingException;
1820
import org.elasticsearch.common.settings.Settings;
1921
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
2022
import org.elasticsearch.common.util.concurrent.EsExecutors;
@@ -123,8 +125,7 @@ public void testMissingShards() {
123125
var future = sendRequests(targetShards, false, (node, shardIds, aliasFilters, listener) -> {
124126
fail("expect no data-node request is sent when target shards are missing");
125127
});
126-
var error = expectThrows(NoShardAvailableActionException.class, future::actionGet);
127-
assertThat(error.getMessage(), containsString("no shard copies found"));
128+
expectThrows(NoShardAvailableActionException.class, containsString("no shard copies found"), future::actionGet);
128129
}
129130
{
130131
var targetShards = List.of(targetShard(shard1, node1), targetShard(shard3), targetShard(shard4, node2, node3));
@@ -244,6 +245,17 @@ public void testAllowPartialResults() {
244245
assertThat(resp.successfulShards, equalTo(1));
245246
}
246247

248+
public void testDoNotRetryCircuitBreakerException() {
249+
var targetShards = List.of(targetShard(shard1, node1, node2));
250+
var sent = ConcurrentCollections.newQueue();
251+
var future = sendRequests(targetShards, false, (node, shardIds, aliasFilters, listener) -> {
252+
sent.add(new NodeRequest(node, shardIds, aliasFilters));
253+
runWithDelay(() -> listener.onFailure(new CircuitBreakingException("cbe", randomFrom(Durability.values())), false));
254+
});
255+
expectThrows(CircuitBreakingException.class, equalTo("cbe"), future::actionGet);
256+
assertThat(sent.size(), equalTo(1));
257+
}
258+
247259
static DataNodeRequestSender.TargetShard targetShard(ShardId shardId, DiscoveryNode... nodes) {
248260
return new DataNodeRequestSender.TargetShard(shardId, new ArrayList<>(Arrays.asList(nodes)), null);
249261
}

0 commit comments

Comments
 (0)