Skip to content

Commit 50da087

Browse files
committed
count skips
1 parent a5084ab commit 50da087

File tree

2 files changed

+8
-3
lines changed

2 files changed

+8
-3
lines changed

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/DataNodeRequestSender.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@
4646
import java.util.concurrent.Executor;
4747
import java.util.concurrent.Semaphore;
4848
import java.util.concurrent.atomic.AtomicBoolean;
49+
import java.util.concurrent.atomic.AtomicInteger;
4950
import java.util.concurrent.locks.ReentrantLock;
5051

5152
/**
@@ -62,6 +63,7 @@ abstract class DataNodeRequestSender {
6263
private final Queue<ShardId> pendingShardIds = ConcurrentCollections.newQueue();
6364
private final Map<DiscoveryNode, Semaphore> nodePermits = new HashMap<>();
6465
private final Map<ShardId, ShardFailure> shardFailures = ConcurrentCollections.newConcurrentMap();
66+
private final AtomicInteger skippedShards = new AtomicInteger();
6567
private final AtomicBoolean changed = new AtomicBoolean();
6668
private boolean reportedFailure = false; // guarded by sendingLock
6769
private volatile boolean skipRemaining = false;
@@ -91,12 +93,13 @@ final void startComputeOnDataNodes(
9193
final long startTimeInNanos = System.nanoTime();
9294
searchShards(rootTask, clusterAlias, requestFilter, concreteIndices, originalIndices, ActionListener.wrap(targetShards -> {
9395
try (var computeListener = new ComputeListener(transportService.getThreadPool(), runOnTaskFailure, listener.map(profiles -> {
96+
var skipped = skippedShards.get() + pendingShardIds.size();
9497
return new ComputeResponse(
9598
profiles,
9699
TimeValue.timeValueNanos(System.nanoTime() - startTimeInNanos),
97100
targetShards.totalShards(),
98-
targetShards.totalShards() - shardFailures.size(),
99-
targetShards.skippedShards(),
101+
targetShards.totalShards() - shardFailures.size() - skipped,
102+
targetShards.skippedShards() + skipped,
100103
shardFailures.size()
101104
);
102105
}))) {
@@ -202,6 +205,7 @@ public void onFailure(Exception e, boolean receivedData) {
202205

203206
@Override
204207
public void onSkip(boolean skipRemaining) {
208+
DataNodeRequestSender.this.skippedShards.incrementAndGet();
205209
if (skipRemaining) {
206210
DataNodeRequestSender.this.skipRemaining = true;
207211
}

x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/plugin/DataNodeRequestSenderTests.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -346,7 +346,8 @@ public void testDoesNotSendMoreRequestsAfterNodeIsSkipped() {
346346
}));
347347
assertThat(sent.size(), equalTo(2));// onResponse() + onSkip()
348348
assertThat(response.totalShards, equalTo(5));
349-
assertThat(response.successfulShards, equalTo(5));
349+
assertThat(response.successfulShards, equalTo(1));
350+
assertThat(response.skippedShards, equalTo(4));
350351
assertThat(response.failedShards, equalTo(0));
351352
}
352353

0 commit comments

Comments
 (0)