Skip to content

Commit 8a0dcc6

Browse files
committed
Move retry scheduling to trySendingRequestsForPendingShards
1 parent 733e5c8 commit 8a0dcc6

File tree

1 file changed

+18
-37
lines changed

1 file changed

+18
-37
lines changed

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/DataNodeRequestSender.java

Lines changed: 18 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -189,22 +189,33 @@ private void trySendingRequestsForPendingShards(TargetShards targetShards, Compu
189189
if (changed.compareAndSet(true, false) == false) {
190190
break;
191191
}
192+
var pendingRetries = new HashSet<ShardId>();
192193
for (ShardId shardId : pendingShardIds) {
193194
if (targetShards.getShard(shardId).remainingNodes.isEmpty()) {
194-
shardFailures.compute(
195-
shardId,
196-
(k, v) -> new ShardFailure(
197-
true,
198-
v == null ? new NoShardAvailableActionException(shardId, "no shard copies found") : v.failure
199-
)
200-
);
195+
var failure = shardFailures.get(shardId);
196+
if (failure != null && failure.fatal == false && failure.failure instanceof NoShardAvailableActionException) {
197+
pendingRetries.add(shardId);
198+
} else {
199+
shardFailures.compute(
200+
shardId,
201+
(k, v) -> new ShardFailure(
202+
true,
203+
v == null ? new NoShardAvailableActionException(shardId, "no shard copies found") : v.failure
204+
)
205+
);
206+
}
201207
}
202208
}
203209
if (reportedFailure
204210
|| (allowPartialResults == false && shardFailures.values().stream().anyMatch(shardFailure -> shardFailure.fatal))) {
205211
reportedFailure = true;
206212
reportFailures(computeListener);
207213
} else {
214+
if (pendingRetries.isEmpty() == false && remainingUnavailableShardResolutionAttempts.decrementAndGet() >= 0) {
215+
for (var entry : resolveShards(pendingRetries).entrySet()) {
216+
targetShards.getShard(entry.getKey()).remainingNodes.addAll(entry.getValue());
217+
}
218+
}
208219
for (NodeRequest request : selectNodeRequests(targetShards)) {
209220
sendOneNodeRequest(targetShards, computeListener, request);
210221
}
@@ -257,25 +268,11 @@ private void sendOneNodeRequest(TargetShards targetShards, ComputeListener compu
257268
final ActionListener<DriverCompletionInfo> listener = computeListener.acquireCompute();
258269
sendRequest(request.node, request.shardIds, request.aliasFilters, new NodeListener() {
259270

260-
private Set<ShardId> pendingRetries;
261-
262271
void onAfter(DriverCompletionInfo info) {
263272
nodePermits.get(request.node).release();
264273
if (concurrentRequests != null) {
265274
concurrentRequests.release();
266275
}
267-
268-
if (pendingRetries != null) {
269-
try {
270-
var resolutions = resolveShards(pendingRetries);
271-
for (var entry : resolutions.entrySet()) {
272-
targetShards.shards.get(entry.getKey()).remainingNodes.addAll(entry.getValue());
273-
}
274-
} finally {
275-
sendingLock.unlock();
276-
}
277-
}
278-
279276
trySendingRequestsForPendingShards(targetShards, computeListener);
280277
listener.onResponse(info);
281278
}
@@ -290,7 +287,6 @@ public void onResponse(DataNodeComputeResponse response) {
290287
}
291288
for (var entry : response.shardLevelFailures().entrySet()) {
292289
final ShardId shardId = entry.getKey();
293-
maybeScheduleRetry(shardId, false, entry.getValue());
294290
trackShardLevelFailure(shardId, false, entry.getValue());
295291
pendingShardIds.add(shardId);
296292
}
@@ -300,7 +296,6 @@ public void onResponse(DataNodeComputeResponse response) {
300296
@Override
301297
public void onFailure(Exception e, boolean receivedData) {
302298
for (ShardId shardId : request.shardIds) {
303-
maybeScheduleRetry(shardId, receivedData, e);
304299
trackShardLevelFailure(shardId, receivedData, e);
305300
pendingShardIds.add(shardId);
306301
}
@@ -316,20 +311,6 @@ public void onSkip() {
316311
onResponse(new DataNodeComputeResponse(DriverCompletionInfo.EMPTY, Map.of()));
317312
}
318313
}
319-
320-
private void maybeScheduleRetry(ShardId shardId, boolean receivedData, Exception e) {
321-
if (receivedData == false
322-
&& targetShards.getShard(shardId).remainingNodes.isEmpty()
323-
&& unwrapFailure(shardId, e) instanceof NoShardAvailableActionException) {
324-
if (pendingRetries == null && remainingUnavailableShardResolutionAttempts.decrementAndGet() >= 0) {
325-
pendingRetries = new HashSet<>();
326-
sendingLock.lock();
327-
}
328-
if (pendingRetries != null) {
329-
pendingRetries.add(shardId);
330-
}
331-
}
332-
}
333314
});
334315
}
335316

0 commit comments

Comments
 (0)