Skip to content

Commit 1b2f565

Browse files
authored
Add tests for non-fatal errors in data node request sender (#124203)
1 parent 9544204 commit 1b2f565

File tree

2 files changed

+49
-32
lines changed

2 files changed

+49
-32
lines changed

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/DataNodeRequestSender.java

Lines changed: 17 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -81,15 +81,13 @@ final void startComputeOnDataNodes(
8181
final long startTimeInNanos = System.nanoTime();
8282
searchShards(rootTask, clusterAlias, requestFilter, concreteIndices, originalIndices, ActionListener.wrap(targetShards -> {
8383
try (var computeListener = new ComputeListener(transportService.getThreadPool(), runOnTaskFailure, listener.map(profiles -> {
84-
TimeValue took = TimeValue.timeValueNanos(System.nanoTime() - startTimeInNanos);
85-
final int failedShards = shardFailures.size();
8684
return new ComputeResponse(
8785
profiles,
88-
took,
86+
TimeValue.timeValueNanos(System.nanoTime() - startTimeInNanos),
8987
targetShards.totalShards(),
90-
targetShards.totalShards() - failedShards,
88+
targetShards.totalShards() - shardFailures.size(),
9189
targetShards.skippedShards(),
92-
failedShards
90+
shardFailures.size()
9391
);
9492
}))) {
9593
for (TargetShard shard : targetShards.shards.values()) {
@@ -128,8 +126,7 @@ private void trySendingRequestsForPendingShards(TargetShards targetShards, Compu
128126
reportedFailure = true;
129127
reportFailures(computeListener);
130128
} else {
131-
var nodeRequests = selectNodeRequests(targetShards);
132-
for (NodeRequest request : nodeRequests) {
129+
for (NodeRequest request : selectNodeRequests(targetShards)) {
133130
sendOneNodeRequest(targetShards, computeListener, request);
134131
}
135132
}
@@ -211,18 +208,17 @@ private static Exception unwrapFailure(Exception e) {
211208

212209
private void trackShardLevelFailure(ShardId shardId, boolean fatal, Exception originalEx) {
213210
final Exception e = unwrapFailure(originalEx);
214-
// Retain only one meaningful exception and avoid suppressing previous failures to minimize memory usage, especially when handling
215-
// many shards.
211+
final boolean isTaskCanceledException = ExceptionsHelper.unwrap(e, TaskCancelledException.class) != null;
216212
shardFailures.compute(shardId, (k, current) -> {
217-
boolean mergedFatal = fatal || ExceptionsHelper.unwrap(e, TaskCancelledException.class) != null;
218-
if (current == null) {
219-
return new ShardFailure(mergedFatal, e);
220-
}
221-
mergedFatal |= current.fatal;
222-
if (e instanceof NoShardAvailableActionException || ExceptionsHelper.unwrap(e, TaskCancelledException.class) != null) {
223-
return new ShardFailure(mergedFatal, current.failure);
224-
}
225-
return new ShardFailure(mergedFatal, e);
213+
boolean mergedFatal = fatal || isTaskCanceledException;
214+
return current == null
215+
? new ShardFailure(mergedFatal, e)
216+
: new ShardFailure(
217+
mergedFatal || current.fatal,
218+
// Retain only one meaningful exception and avoid suppressing previous failures to minimize memory usage,
219+
// especially when handling many shards.
220+
isTaskCanceledException || e instanceof NoShardAvailableActionException ? current.failure : e
221+
);
226222
});
227223
}
228224

@@ -243,17 +239,11 @@ TargetShard getShard(ShardId shardId) {
243239
/**
244240
* (Remaining) allocated nodes of a given shard id and its alias filter
245241
*/
246-
record TargetShard(ShardId shardId, List<DiscoveryNode> remainingNodes, AliasFilter aliasFilter) {
247-
248-
}
242+
record TargetShard(ShardId shardId, List<DiscoveryNode> remainingNodes, AliasFilter aliasFilter) {}
249243

250-
record NodeRequest(DiscoveryNode node, List<ShardId> shardIds, Map<Index, AliasFilter> aliasFilters) {
244+
record NodeRequest(DiscoveryNode node, List<ShardId> shardIds, Map<Index, AliasFilter> aliasFilters) {}
251245

252-
}
253-
254-
private record ShardFailure(boolean fatal, Exception failure) {
255-
256-
}
246+
private record ShardFailure(boolean fatal, Exception failure) {}
257247

258248
/**
259249
* Selects the next nodes to send requests to. Limits to at most one outstanding request per node.

x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/plugin/DataNodeRequestSenderTests.java

Lines changed: 32 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
import java.util.HashMap;
4141
import java.util.List;
4242
import java.util.Map;
43+
import java.util.Objects;
4344
import java.util.Queue;
4445
import java.util.Set;
4546
import java.util.concurrent.Executor;
@@ -85,7 +86,7 @@ public void setThreadPool() {
8586
}
8687

8788
@After
88-
public void shutdownThreadPool() throws Exception {
89+
public void shutdownThreadPool() {
8990
terminate(threadPool);
9091
}
9192

@@ -109,8 +110,7 @@ public void testOnePass() {
109110
Queue<NodeRequest> sent = ConcurrentCollections.newQueue();
110111
var future = sendRequests(targetShards, randomBoolean(), (node, shardIds, aliasFilters, listener) -> {
111112
sent.add(new NodeRequest(node, shardIds, aliasFilters));
112-
var resp = new DataNodeComputeResponse(List.of(), Map.of());
113-
runWithDelay(() -> listener.onResponse(resp));
113+
runWithDelay(() -> listener.onResponse(new DataNodeComputeResponse(List.of(), Map.of())));
114114
});
115115
safeGet(future);
116116
assertThat(sent.size(), equalTo(2));
@@ -123,8 +123,7 @@ public void testMissingShards() {
123123
var future = sendRequests(targetShards, false, (node, shardIds, aliasFilters, listener) -> {
124124
fail("expect no data-node request is sent when target shards are missing");
125125
});
126-
var error = expectThrows(NoShardAvailableActionException.class, future::actionGet);
127-
assertThat(error.getMessage(), containsString("no shard copies found"));
126+
expectThrows(NoShardAvailableActionException.class, containsString("no shard copies found"), future::actionGet);
128127
}
129128
{
130129
var targetShards = List.of(targetShard(shard1, node1), targetShard(shard3), targetShard(shard4, node2, node3));
@@ -244,6 +243,34 @@ public void testAllowPartialResults() {
244243
assertThat(resp.successfulShards, equalTo(1));
245244
}
246245

246+
public void testNonFatalErrorIsRetriedOnAnotherShard() {
247+
var targetShards = List.of(targetShard(shard1, node1, node2));
248+
Queue<NodeRequest> sent = ConcurrentCollections.newQueue();
249+
var response = safeGet(sendRequests(targetShards, false, (node, shardIds, aliasFilters, listener) -> {
250+
sent.add(new NodeRequest(node, shardIds, aliasFilters));
251+
if (Objects.equals(node1, node)) {
252+
runWithDelay(() -> listener.onFailure(new RuntimeException("test request level non fatal failure"), false));
253+
} else {
254+
runWithDelay(() -> listener.onResponse(new DataNodeComputeResponse(List.of(), Map.of())));
255+
}
256+
}));
257+
assertThat(response.totalShards, equalTo(1));
258+
assertThat(response.successfulShards, equalTo(1));
259+
assertThat(response.failedShards, equalTo(0));
260+
assertThat(sent.size(), equalTo(2));
261+
}
262+
263+
public void testNonFatalFailedOnAllNodes() {
264+
var targetShards = List.of(targetShard(shard1, node1, node2));
265+
Queue<NodeRequest> sent = ConcurrentCollections.newQueue();
266+
var future = sendRequests(targetShards, false, (node, shardIds, aliasFilters, listener) -> {
267+
sent.add(new NodeRequest(node, shardIds, aliasFilters));
268+
runWithDelay(() -> listener.onFailure(new RuntimeException("test request level non fatal failure"), false));
269+
});
270+
expectThrows(RuntimeException.class, equalTo("test request level non fatal failure"), future::actionGet);
271+
assertThat(sent.size(), equalTo(2));
272+
}
273+
247274
static DataNodeRequestSender.TargetShard targetShard(ShardId shardId, DiscoveryNode... nodes) {
248275
return new DataNodeRequestSender.TargetShard(shardId, new ArrayList<>(Arrays.asList(nodes)), null);
249276
}

0 commit comments

Comments
 (0)