Skip to content

Commit a5084ab

Browse files
committed
skip remaining nodes
1 parent 5ce4095 commit a5084ab

File tree

3 files changed

+40
-7
lines changed

3 files changed

+40
-7
lines changed

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/DataNodeComputeHandler.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,7 @@ protected void sendRequest(
113113
NodeListener nodeListener
114114
) {
115115
if (exchangeSource.isFinished()) {
116-
nodeListener.onSkip();
116+
nodeListener.onSkip(true);
117117
return;
118118
}
119119

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/DataNodeRequestSender.java

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,7 @@ abstract class DataNodeRequestSender {
6464
private final Map<ShardId, ShardFailure> shardFailures = ConcurrentCollections.newConcurrentMap();
6565
private final AtomicBoolean changed = new AtomicBoolean();
6666
private boolean reportedFailure = false; // guarded by sendingLock
67+
private volatile boolean skipRemaining = false;
6768

6869
DataNodeRequestSender(
6970
TransportService transportService,
@@ -134,7 +135,7 @@ private void trySendingRequestsForPendingShards(TargetShards targetShards, Compu
134135
|| (allowPartialResults == false && shardFailures.values().stream().anyMatch(shardFailure -> shardFailure.fatal))) {
135136
reportedFailure = true;
136137
reportFailures(computeListener);
137-
} else {
138+
} else if (skipRemaining == false) {
138139
for (NodeRequest request : selectNodeRequests(targetShards)) {
139140
sendOneNodeRequest(targetShards, computeListener, request);
140141
}
@@ -200,7 +201,10 @@ public void onFailure(Exception e, boolean receivedData) {
200201
}
201202

202203
@Override
203-
public void onSkip() {
204+
public void onSkip(boolean skipRemaining) {
205+
if (skipRemaining) {
206+
DataNodeRequestSender.this.skipRemaining = true;
207+
}
204208
onAfter(List.of());
205209
}
206210
});
@@ -213,7 +217,7 @@ interface NodeListener {
213217

214218
void onFailure(Exception e, boolean receivedData);
215219

216-
void onSkip();
220+
void onSkip(boolean skipRemaining);
217221
}
218222

219223
private static Exception unwrapFailure(Exception e) {

x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/plugin/DataNodeRequestSenderTests.java

Lines changed: 32 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -299,7 +299,7 @@ public void testLimitConcurrentNodes() {
299299
AtomicInteger maxConcurrentRequests = new AtomicInteger(0);
300300
AtomicInteger concurrentRequests = new AtomicInteger(0);
301301
Queue<NodeRequest> sent = ConcurrentCollections.newQueue();
302-
var future = sendRequests(targetShards, randomBoolean(), concurrency, (node, shardIds, aliasFilters, listener) -> {
302+
var response = safeGet(sendRequests(targetShards, randomBoolean(), concurrency, (node, shardIds, aliasFilters, listener) -> {
303303
concurrentRequests.incrementAndGet();
304304

305305
while (true) {
@@ -315,10 +315,39 @@ public void testLimitConcurrentNodes() {
315315
concurrentRequests.decrementAndGet();
316316
listener.onResponse(new DataNodeComputeResponse(List.of(), Map.of()));
317317
});
318-
});
319-
safeGet(future);
318+
}));
320319
assertThat(sent.size(), equalTo(5));
321320
assertThat(maxConcurrentRequests.get(), equalTo(concurrency));
321+
assertThat(response.totalShards, equalTo(5));
322+
assertThat(response.successfulShards, equalTo(5));
323+
assertThat(response.failedShards, equalTo(0));
324+
}
325+
326+
public void testDoesNotSendMoreRequestsAfterNodeIsSkipped() {
327+
var targetShards = List.of(
328+
targetShard(shard1, node1),
329+
targetShard(shard2, node2),
330+
targetShard(shard3, node3),
331+
targetShard(shard4, node4),
332+
targetShard(shard5, node5)
333+
);
334+
335+
AtomicInteger processed = new AtomicInteger(0);
336+
Queue<NodeRequest> sent = ConcurrentCollections.newQueue();
337+
var response = safeGet(sendRequests(targetShards, randomBoolean(), 1, (node, shardIds, aliasFilters, listener) -> {
338+
sent.add(new NodeRequest(node, shardIds, aliasFilters));
339+
runWithDelay(() -> {
340+
if (processed.incrementAndGet() == 1) {
341+
listener.onResponse(new DataNodeComputeResponse(List.of(), Map.of()));
342+
} else {
343+
listener.onSkip(true);
344+
}
345+
});
346+
}));
347+
assertThat(sent.size(), equalTo(2));// onResponse() + onSkip()
348+
assertThat(response.totalShards, equalTo(5));
349+
assertThat(response.successfulShards, equalTo(5));
350+
assertThat(response.failedShards, equalTo(0));
322351
}
323352

324353
static DataNodeRequestSender.TargetShard targetShard(ShardId shardId, DiscoveryNode... nodes) {

0 commit comments

Comments
 (0)