Skip to content

Commit 1c3b804

Browse files
committed
Add tests for non-fatal errors in data node request sender
1 parent d266f58 commit 1c3b804

File tree

2 files changed

+51
-32
lines changed

2 files changed

+51
-32
lines changed

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/DataNodeRequestSender.java

Lines changed: 17 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -81,15 +81,13 @@ final void startComputeOnDataNodes(
8181
final long startTimeInNanos = System.nanoTime();
8282
searchShards(rootTask, clusterAlias, requestFilter, concreteIndices, originalIndices, ActionListener.wrap(targetShards -> {
8383
try (var computeListener = new ComputeListener(transportService.getThreadPool(), runOnTaskFailure, listener.map(profiles -> {
84-
TimeValue took = TimeValue.timeValueNanos(System.nanoTime() - startTimeInNanos);
85-
final int failedShards = shardFailures.size();
8684
return new ComputeResponse(
8785
profiles,
88-
took,
86+
TimeValue.timeValueNanos(System.nanoTime() - startTimeInNanos),
8987
targetShards.totalShards(),
90-
targetShards.totalShards() - failedShards,
88+
targetShards.totalShards() - shardFailures.size(),
9189
targetShards.skippedShards(),
92-
failedShards
90+
shardFailures.size()
9391
);
9492
}))) {
9593
for (TargetShard shard : targetShards.shards.values()) {
@@ -128,8 +126,7 @@ private void trySendingRequestsForPendingShards(TargetShards targetShards, Compu
128126
reportedFailure = true;
129127
reportFailures(computeListener);
130128
} else {
131-
var nodeRequests = selectNodeRequests(targetShards);
132-
for (NodeRequest request : nodeRequests) {
129+
for (NodeRequest request : selectNodeRequests(targetShards)) {
133130
sendOneNodeRequest(targetShards, computeListener, request);
134131
}
135132
}
@@ -211,18 +208,17 @@ private static Exception unwrapFailure(Exception e) {
211208

212209
private void trackShardLevelFailure(ShardId shardId, boolean fatal, Exception originalEx) {
213210
final Exception e = unwrapFailure(originalEx);
214-
// Retain only one meaningful exception and avoid suppressing previous failures to minimize memory usage, especially when handling
215-
// many shards.
211+
final boolean isTaskCanceledException = ExceptionsHelper.unwrap(e, TaskCancelledException.class) != null;
216212
shardFailures.compute(shardId, (k, current) -> {
217-
boolean mergedFatal = fatal || ExceptionsHelper.unwrap(e, TaskCancelledException.class) != null;
218-
if (current == null) {
219-
return new ShardFailure(mergedFatal, e);
220-
}
221-
mergedFatal |= current.fatal;
222-
if (e instanceof NoShardAvailableActionException || ExceptionsHelper.unwrap(e, TaskCancelledException.class) != null) {
223-
return new ShardFailure(mergedFatal, current.failure);
224-
}
225-
return new ShardFailure(mergedFatal, e);
213+
boolean mergedFatal = fatal || isTaskCanceledException;
214+
return current == null
215+
? new ShardFailure(mergedFatal, e)
216+
: new ShardFailure(
217+
mergedFatal || current.fatal,
218+
// Retain only one meaningful exception and avoid suppressing previous failures to minimize memory usage,
219+
// especially when handling many shards.
220+
isTaskCanceledException || e instanceof NoShardAvailableActionException ? current.failure : e
221+
);
226222
});
227223
}
228224

@@ -243,17 +239,11 @@ TargetShard getShard(ShardId shardId) {
243239
/**
244240
* (Remaining) allocated nodes of a given shard id and its alias filter
245241
*/
246-
record TargetShard(ShardId shardId, List<DiscoveryNode> remainingNodes, AliasFilter aliasFilter) {
247-
248-
}
242+
record TargetShard(ShardId shardId, List<DiscoveryNode> remainingNodes, AliasFilter aliasFilter) {}
249243

250-
record NodeRequest(DiscoveryNode node, List<ShardId> shardIds, Map<Index, AliasFilter> aliasFilters) {
244+
record NodeRequest(DiscoveryNode node, List<ShardId> shardIds, Map<Index, AliasFilter> aliasFilters) {}
251245

252-
}
253-
254-
private record ShardFailure(boolean fatal, Exception failure) {
255-
256-
}
246+
private record ShardFailure(boolean fatal, Exception failure) {}
257247

258248
/**
259249
* Selects the next nodes to send requests to. Limits to at most one outstanding request per node.

x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/plugin/DataNodeRequestSenderTests.java

Lines changed: 34 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@
1515
import org.elasticsearch.action.support.PlainActionFuture;
1616
import org.elasticsearch.cluster.node.DiscoveryNode;
1717
import org.elasticsearch.cluster.node.DiscoveryNodeUtils;
18+
import org.elasticsearch.common.breaker.CircuitBreaker;
19+
import org.elasticsearch.common.breaker.CircuitBreakingException;
1820
import org.elasticsearch.common.settings.Settings;
1921
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
2022
import org.elasticsearch.common.util.concurrent.EsExecutors;
@@ -40,6 +42,7 @@
4042
import java.util.HashMap;
4143
import java.util.List;
4244
import java.util.Map;
45+
import java.util.Objects;
4346
import java.util.Queue;
4447
import java.util.Set;
4548
import java.util.concurrent.Executor;
@@ -85,7 +88,7 @@ public void setThreadPool() {
8588
}
8689

8790
@After
88-
public void shutdownThreadPool() throws Exception {
91+
public void shutdownThreadPool() {
8992
terminate(threadPool);
9093
}
9194

@@ -109,8 +112,7 @@ public void testOnePass() {
109112
Queue<NodeRequest> sent = ConcurrentCollections.newQueue();
110113
var future = sendRequests(targetShards, randomBoolean(), (node, shardIds, aliasFilters, listener) -> {
111114
sent.add(new NodeRequest(node, shardIds, aliasFilters));
112-
var resp = new DataNodeComputeResponse(List.of(), Map.of());
113-
runWithDelay(() -> listener.onResponse(resp));
115+
runWithDelay(() -> listener.onResponse(new DataNodeComputeResponse(List.of(), Map.of())));
114116
});
115117
safeGet(future);
116118
assertThat(sent.size(), equalTo(2));
@@ -123,8 +125,7 @@ public void testMissingShards() {
123125
var future = sendRequests(targetShards, false, (node, shardIds, aliasFilters, listener) -> {
124126
fail("expect no data-node request is sent when target shards are missing");
125127
});
126-
var error = expectThrows(NoShardAvailableActionException.class, future::actionGet);
127-
assertThat(error.getMessage(), containsString("no shard copies found"));
128+
expectThrows(NoShardAvailableActionException.class, containsString("no shard copies found"), future::actionGet);
128129
}
129130
{
130131
var targetShards = List.of(targetShard(shard1, node1), targetShard(shard3), targetShard(shard4, node2, node3));
@@ -244,6 +245,34 @@ public void testAllowPartialResults() {
244245
assertThat(resp.successfulShards, equalTo(1));
245246
}
246247

248+
public void testNonFatalErrorIsRetriedOnAnotherShard() {
249+
var targetShards = List.of(targetShard(shard1, node1, node2));
250+
Queue<NodeRequest> sent = ConcurrentCollections.newQueue();
251+
var response = safeGet(sendRequests(targetShards, false, (node, shardIds, aliasFilters, listener) -> {
252+
sent.add(new NodeRequest(node, shardIds, aliasFilters));
253+
if (Objects.equals(node1, node)) {
254+
runWithDelay(() -> listener.onFailure(new CircuitBreakingException("cbe", CircuitBreaker.Durability.TRANSIENT), false));
255+
} else {
256+
runWithDelay(() -> listener.onResponse(new DataNodeComputeResponse(List.of(), Map.of())));
257+
}
258+
}));
259+
assertThat(response.totalShards, equalTo(1));
260+
assertThat(response.successfulShards, equalTo(1));
261+
assertThat(response.failedShards, equalTo(0));
262+
assertThat(sent.size(), equalTo(2));
263+
}
264+
265+
public void testNonFatalFailedOnAllNodes() {
266+
var targetShards = List.of(targetShard(shard1, node1, node2));
267+
Queue<NodeRequest> sent = ConcurrentCollections.newQueue();
268+
var future = sendRequests(targetShards, false, (node, shardIds, aliasFilters, listener) -> {
269+
sent.add(new NodeRequest(node, shardIds, aliasFilters));
270+
runWithDelay(() -> listener.onFailure(new CircuitBreakingException("cbe", CircuitBreaker.Durability.TRANSIENT), false));
271+
});
272+
expectThrows(CircuitBreakingException.class, equalTo("cbe"), future::actionGet);
273+
assertThat(sent.size(), equalTo(2));
274+
}
275+
247276
static DataNodeRequestSender.TargetShard targetShard(ShardId shardId, DiscoveryNode... nodes) {
248277
return new DataNodeRequestSender.TargetShard(shardId, new ArrayList<>(Arrays.asList(nodes)), null);
249278
}

0 commit comments

Comments
 (0)