Skip to content

Commit 4058ef1

Browse files
committed
clean non-fatal errors on shard skips
1 parent 02ddb1e commit 4058ef1

File tree

2 files changed

+29
-6
lines changed

2 files changed

+29
-6
lines changed

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/DataNodeRequestSender.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -204,7 +204,7 @@ public void onFailure(Exception e, boolean receivedData) {
204204
@Override
205205
public void onSkip() {
206206
skippedShards.incrementAndGet();
207-
onAfter(List.of());
207+
onResponse(new DataNodeComputeResponse(List.of(), Map.of()));
208208
}
209209
});
210210
}

x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/plugin/DataNodeRequestSenderTests.java

Lines changed: 28 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -249,7 +249,7 @@ public void testAllowPartialResults() {
249249

250250
public void testNonFatalErrorIsRetriedOnAnotherShard() {
251251
var targetShards = List.of(targetShard(shard1, node1, node2));
252-
Queue<NodeRequest> sent = ConcurrentCollections.newQueue();
252+
var sent = ConcurrentCollections.<NodeRequest>newQueue();
253253
var response = safeGet(sendRequests(targetShards, false, -1, (node, shardIds, aliasFilters, listener) -> {
254254
sent.add(new NodeRequest(node, shardIds, aliasFilters));
255255
if (Objects.equals(node1, node)) {
@@ -266,7 +266,7 @@ public void testNonFatalErrorIsRetriedOnAnotherShard() {
266266

267267
public void testNonFatalFailedOnAllNodes() {
268268
var targetShards = List.of(targetShard(shard1, node1, node2));
269-
Queue<NodeRequest> sent = ConcurrentCollections.newQueue();
269+
var sent = ConcurrentCollections.<NodeRequest>newQueue();
270270
var future = sendRequests(targetShards, false, -1, (node, shardIds, aliasFilters, listener) -> {
271271
sent.add(new NodeRequest(node, shardIds, aliasFilters));
272272
runWithDelay(() -> listener.onFailure(new RuntimeException("test request level non fatal failure"), false));
@@ -277,7 +277,7 @@ public void testNonFatalFailedOnAllNodes() {
277277

278278
public void testDoNotRetryCircuitBreakerException() {
279279
var targetShards = List.of(targetShard(shard1, node1, node2));
280-
var sent = ConcurrentCollections.newQueue();
280+
var sent = ConcurrentCollections.<NodeRequest>newQueue();
281281
var future = sendRequests(targetShards, false, -1, (node, shardIds, aliasFilters, listener) -> {
282282
sent.add(new NodeRequest(node, shardIds, aliasFilters));
283283
runWithDelay(() -> listener.onFailure(new CircuitBreakingException("cbe", randomFrom(Durability.values())), false));
@@ -298,7 +298,7 @@ public void testLimitConcurrentNodes() {
298298
var concurrency = randomIntBetween(1, 2);
299299
AtomicInteger maxConcurrentRequests = new AtomicInteger(0);
300300
AtomicInteger concurrentRequests = new AtomicInteger(0);
301-
Queue<NodeRequest> sent = ConcurrentCollections.newQueue();
301+
var sent = ConcurrentCollections.<NodeRequest>newQueue();
302302
var response = safeGet(sendRequests(targetShards, randomBoolean(), concurrency, (node, shardIds, aliasFilters, listener) -> {
303303
concurrentRequests.incrementAndGet();
304304

@@ -333,7 +333,7 @@ public void testDoesNotSendMoreRequestsAfterNodeIsSkipped() {
333333
);
334334

335335
AtomicInteger processed = new AtomicInteger(0);
336-
Queue<NodeRequest> sent = ConcurrentCollections.newQueue();
336+
var sent = ConcurrentCollections.<NodeRequest>newQueue();
337337
var response = safeGet(sendRequests(targetShards, randomBoolean(), 1, (node, shardIds, aliasFilters, listener) -> {
338338
sent.add(new NodeRequest(node, shardIds, aliasFilters));
339339
runWithDelay(() -> {
@@ -351,6 +351,29 @@ public void testDoesNotSendMoreRequestsAfterNodeIsSkipped() {
351351
assertThat(response.failedShards, equalTo(0));
352352
}
353353

354+
public void testSkipRemovesPriorNonFatalErrors() {
355+
var targetShards = List.of(targetShard(shard1, node1, node2), targetShard(shard2, node3));
356+
357+
var sent = ConcurrentCollections.<NodeRequest>newQueue();
358+
var response = safeGet(sendRequests(targetShards, randomBoolean(), 1, (node, shardIds, aliasFilters, listener) -> {
359+
sent.add(new NodeRequest(node, shardIds, aliasFilters));
360+
runWithDelay(() -> {
361+
if (Objects.equals(node.getId(), node1.getId()) && shardIds.equals(List.of(shard1))) {
362+
listener.onFailure(new RuntimeException("test request level non fatal failure"), false);
363+
} else if (Objects.equals(node.getId(), node3.getId()) && shardIds.equals(List.of(shard2))) {
364+
listener.onResponse(new DataNodeComputeResponse(List.of(), Map.of()));
365+
} else if (Objects.equals(node.getId(), node2.getId()) && shardIds.equals(List.of(shard1))) {
366+
listener.onSkip();
367+
}
368+
});
369+
}));
370+
assertThat(sent.size(), equalTo(3));
371+
assertThat(response.totalShards, equalTo(2));
372+
assertThat(response.successfulShards, equalTo(1));
373+
assertThat(response.skippedShards, equalTo(1));
374+
assertThat(response.failedShards, equalTo(0));
375+
}
376+
354377
static DataNodeRequestSender.TargetShard targetShard(ShardId shardId, DiscoveryNode... nodes) {
355378
return new DataNodeRequestSender.TargetShard(shardId, new ArrayList<>(Arrays.asList(nodes)), null);
356379
}

0 commit comments

Comments
 (0)