Skip to content

Commit de2f695

Browse files
committed
Fix request processing scheduling
1 parent 9772b5e commit de2f695

File tree

2 files changed

+6
-7
lines changed

2 files changed

+6
-7
lines changed

muted-tests.yml

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -417,9 +417,6 @@ tests:
417417
- class: org.elasticsearch.packaging.test.DockerTests
418418
method: test026InstallBundledRepositoryPluginsViaConfigFile
419419
issue: https://github.com/elastic/elasticsearch/issues/127158
420-
- class: org.elasticsearch.xpack.esql.plugin.DataNodeRequestSenderTests
421-
method: testRetryOnlyMovedShards
422-
issue: https://github.com/elastic/elasticsearch/issues/127168
423420
- class: org.elasticsearch.xpack.esql.plugin.DataNodeRequestSenderIT
424421
method: testSearchWhileRelocating
425422
issue: https://github.com/elastic/elasticsearch/issues/127188

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/DataNodeRequestSender.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -265,9 +265,8 @@ void onAfter(DriverCompletionInfo info) {
265265
concurrentRequests.release();
266266
}
267267

268-
if (pendingRetries.isEmpty() == false && remainingUnavailableShardResolutionAttempts.decrementAndGet() >= 0) {
268+
if (pendingRetries.isEmpty() == false) {
269269
try {
270-
sendingLock.lock();
271270
var resolutions = resolveShards(pendingRetries);
272271
for (var entry : resolutions.entrySet()) {
273272
targetShards.shards.get(entry.getKey()).remainingNodes.addAll(entry.getValue());
@@ -292,8 +291,8 @@ public void onResponse(DataNodeComputeResponse response) {
292291
for (var entry : response.shardLevelFailures().entrySet()) {
293292
final ShardId shardId = entry.getKey();
294293
trackShardLevelFailure(shardId, false, entry.getValue());
295-
pendingShardIds.add(shardId);
296294
maybeScheduleRetry(shardId, false, entry.getValue());
295+
pendingShardIds.add(shardId);
297296
}
298297
onAfter(response.completionInfo());
299298
}
@@ -302,8 +301,8 @@ public void onResponse(DataNodeComputeResponse response) {
302301
public void onFailure(Exception e, boolean receivedData) {
303302
for (ShardId shardId : request.shardIds) {
304303
trackShardLevelFailure(shardId, receivedData, e);
305-
pendingShardIds.add(shardId);
306304
maybeScheduleRetry(shardId, receivedData, e);
305+
pendingShardIds.add(shardId);
307306
}
308307
onAfter(DriverCompletionInfo.EMPTY);
309308
}
@@ -322,6 +321,9 @@ private void maybeScheduleRetry(ShardId shardId, boolean receivedData, Exception
322321
if (receivedData == false
323322
&& targetShards.getShard(shardId).remainingNodes.isEmpty()
324323
&& unwrapFailure(shardId, e) instanceof NoShardAvailableActionException) {
324+
if (pendingRetries.isEmpty() && remainingUnavailableShardResolutionAttempts.decrementAndGet() >= 0) {
325+
sendingLock.lock();
326+
}
325327
pendingRetries.add(shardId);
326328
}
327329
}

0 commit comments

Comments
 (0)