Skip to content

Commit 02ddb1e

Browse files
committed
remove skipping
1 parent 1458611 commit 02ddb1e

File tree

3 files changed

+10
-15
lines changed

3 files changed

+10
-15
lines changed

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/DataNodeComputeHandler.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,7 @@ protected void sendRequest(
113113
NodeListener nodeListener
114114
) {
115115
if (exchangeSource.isFinished()) {
116-
nodeListener.onSkip(true);
116+
nodeListener.onSkip();
117117
return;
118118
}
119119

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/DataNodeRequestSender.java

Lines changed: 6 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,6 @@ abstract class DataNodeRequestSender {
6666
private final AtomicInteger skippedShards = new AtomicInteger();
6767
private final AtomicBoolean changed = new AtomicBoolean();
6868
private boolean reportedFailure = false; // guarded by sendingLock
69-
private volatile boolean skipRemaining = false;
7069

7170
DataNodeRequestSender(
7271
TransportService transportService,
@@ -93,13 +92,12 @@ final void startComputeOnDataNodes(
9392
final long startTimeInNanos = System.nanoTime();
9493
searchShards(rootTask, clusterAlias, requestFilter, concreteIndices, originalIndices, ActionListener.wrap(targetShards -> {
9594
try (var computeListener = new ComputeListener(transportService.getThreadPool(), runOnTaskFailure, listener.map(profiles -> {
96-
var skipped = skippedShards.get() + pendingShardIds.size();
9795
return new ComputeResponse(
9896
profiles,
9997
TimeValue.timeValueNanos(System.nanoTime() - startTimeInNanos),
10098
targetShards.totalShards(),
101-
targetShards.totalShards() - shardFailures.size() - skipped,
102-
targetShards.skippedShards() + skipped,
99+
targetShards.totalShards() - shardFailures.size() - skippedShards.get(),
100+
targetShards.skippedShards() + skippedShards.get(),
103101
shardFailures.size()
104102
);
105103
}))) {
@@ -138,7 +136,7 @@ private void trySendingRequestsForPendingShards(TargetShards targetShards, Compu
138136
|| (allowPartialResults == false && shardFailures.values().stream().anyMatch(shardFailure -> shardFailure.fatal))) {
139137
reportedFailure = true;
140138
reportFailures(computeListener);
141-
} else if (skipRemaining == false) {
139+
} else {
142140
for (NodeRequest request : selectNodeRequests(targetShards)) {
143141
sendOneNodeRequest(targetShards, computeListener, request);
144142
}
@@ -204,11 +202,8 @@ public void onFailure(Exception e, boolean receivedData) {
204202
}
205203

206204
@Override
207-
public void onSkip(boolean skipRemaining) {
208-
DataNodeRequestSender.this.skippedShards.incrementAndGet();
209-
if (skipRemaining) {
210-
DataNodeRequestSender.this.skipRemaining = true;
211-
}
205+
public void onSkip() {
206+
skippedShards.incrementAndGet();
212207
onAfter(List.of());
213208
}
214209
});
@@ -221,7 +216,7 @@ interface NodeListener {
221216

222217
void onFailure(Exception e, boolean receivedData);
223218

224-
void onSkip(boolean skipRemaining);
219+
void onSkip();
225220
}
226221

227222
private static Exception unwrapFailure(Exception e) {

x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/plugin/DataNodeRequestSenderTests.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -295,7 +295,7 @@ public void testLimitConcurrentNodes() {
295295
targetShard(shard5, node5)
296296
);
297297

298-
var concurrency = randomIntBetween(1, targetShards.size() - 1);
298+
var concurrency = randomIntBetween(1, 2);
299299
AtomicInteger maxConcurrentRequests = new AtomicInteger(0);
300300
AtomicInteger concurrentRequests = new AtomicInteger(0);
301301
Queue<NodeRequest> sent = ConcurrentCollections.newQueue();
@@ -340,11 +340,11 @@ public void testDoesNotSendMoreRequestsAfterNodeIsSkipped() {
340340
if (processed.incrementAndGet() == 1) {
341341
listener.onResponse(new DataNodeComputeResponse(List.of(), Map.of()));
342342
} else {
343-
listener.onSkip(true);
343+
listener.onSkip();
344344
}
345345
});
346346
}));
347-
assertThat(sent.size(), equalTo(2));// onResponse() + onSkip()
347+
assertThat(sent.size(), equalTo(5));
348348
assertThat(response.totalShards, equalTo(5));
349349
assertThat(response.successfulShards, equalTo(1));
350350
assertThat(response.skippedShards, equalTo(4));

0 commit comments

Comments
 (0)