From de2f6954c501301e7d93bd748c7f81ff6f3713b0 Mon Sep 17 00:00:00 2001 From: "ievgen.degtiarenko" Date: Mon, 28 Apr 2025 14:32:50 +0200 Subject: [PATCH 1/5] Fix request processing scheduling --- muted-tests.yml | 3 --- .../xpack/esql/plugin/DataNodeRequestSender.java | 10 ++++++---- 2 files changed, 6 insertions(+), 7 deletions(-) diff --git a/muted-tests.yml b/muted-tests.yml index 95d68918ed074..4b1a8466f9359 100644 --- a/muted-tests.yml +++ b/muted-tests.yml @@ -417,9 +417,6 @@ tests: - class: org.elasticsearch.packaging.test.DockerTests method: test026InstallBundledRepositoryPluginsViaConfigFile issue: https://github.com/elastic/elasticsearch/issues/127158 -- class: org.elasticsearch.xpack.esql.plugin.DataNodeRequestSenderTests - method: testRetryOnlyMovedShards - issue: https://github.com/elastic/elasticsearch/issues/127168 - class: org.elasticsearch.xpack.esql.plugin.DataNodeRequestSenderIT method: testSearchWhileRelocating issue: https://github.com/elastic/elasticsearch/issues/127188 diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/DataNodeRequestSender.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/DataNodeRequestSender.java index d333eb7238148..2c1baa90e8246 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/DataNodeRequestSender.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/DataNodeRequestSender.java @@ -265,9 +265,8 @@ void onAfter(DriverCompletionInfo info) { concurrentRequests.release(); } - if (pendingRetries.isEmpty() == false && remainingUnavailableShardResolutionAttempts.decrementAndGet() >= 0) { + if (pendingRetries.isEmpty() == false) { try { - sendingLock.lock(); var resolutions = resolveShards(pendingRetries); for (var entry : resolutions.entrySet()) { targetShards.shards.get(entry.getKey()).remainingNodes.addAll(entry.getValue()); @@ -292,8 +291,8 @@ public void onResponse(DataNodeComputeResponse response) { for (var entry : response.shardLevelFailures().entrySet()) { final ShardId shardId = entry.getKey(); trackShardLevelFailure(shardId, false, entry.getValue()); - pendingShardIds.add(shardId); maybeScheduleRetry(shardId, false, entry.getValue()); + pendingShardIds.add(shardId); } onAfter(response.completionInfo()); } @@ -302,8 +301,8 @@ public void onResponse(DataNodeComputeResponse response) { public void onFailure(Exception e, boolean receivedData) { for (ShardId shardId : request.shardIds) { trackShardLevelFailure(shardId, receivedData, e); - pendingShardIds.add(shardId); maybeScheduleRetry(shardId, receivedData, e); + pendingShardIds.add(shardId); } onAfter(DriverCompletionInfo.EMPTY); } @@ -322,6 +321,9 @@ private void maybeScheduleRetry(ShardId shardId, boolean receivedData, Exception if (receivedData == false && targetShards.getShard(shardId).remainingNodes.isEmpty() && unwrapFailure(shardId, e) instanceof NoShardAvailableActionException) { + if (pendingRetries.isEmpty() && remainingUnavailableShardResolutionAttempts.decrementAndGet() >= 0) { + sendingLock.lock(); + } pendingRetries.add(shardId); } } From 7780ed7f8ab251df8ce7864c86c66e8b69a7b6f4 Mon Sep 17 00:00:00 2001 From: "ievgen.degtiarenko" Date: Mon, 28 Apr 2025 16:01:30 +0200 Subject: [PATCH 2/5] fix retry limit --- .../elasticsearch/xpack/esql/plugin/DataNodeRequestSender.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/DataNodeRequestSender.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/DataNodeRequestSender.java index 2c1baa90e8246..e8d56e3fd75ca 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/DataNodeRequestSender.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/DataNodeRequestSender.java @@ -265,7 +265,7 @@ void onAfter(DriverCompletionInfo info) { concurrentRequests.release(); } - if (pendingRetries.isEmpty() == false) { + if (sendingLock.isHeldByCurrentThread()) { try { var resolutions = resolveShards(pendingRetries); for (var entry : resolutions.entrySet()) { From b2dd3daeaeb329de0ec415229a3d1852e09ff83d Mon Sep 17 00:00:00 2001 From: "ievgen.degtiarenko" Date: Tue, 29 Apr 2025 08:54:50 +0200 Subject: [PATCH 3/5] replace `isHeldByCurrentThread`check --- .../xpack/esql/plugin/DataNodeRequestSender.java | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/DataNodeRequestSender.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/DataNodeRequestSender.java index e8d56e3fd75ca..0bb44dc83dc77 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/DataNodeRequestSender.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/DataNodeRequestSender.java @@ -257,7 +257,7 @@ private void sendOneNodeRequest(TargetShards targetShards, ComputeListener compu final ActionListener listener = computeListener.acquireCompute(); sendRequest(request.node, request.shardIds, request.aliasFilters, new NodeListener() { - private final Set pendingRetries = new HashSet<>(); + private Set pendingRetries; void onAfter(DriverCompletionInfo info) { nodePermits.get(request.node).release(); @@ -265,7 +265,7 @@ void onAfter(DriverCompletionInfo info) { concurrentRequests.release(); } - if (sendingLock.isHeldByCurrentThread()) { + if (pendingRetries != null) { try { var resolutions = resolveShards(pendingRetries); for (var entry : resolutions.entrySet()) { @@ -290,8 +290,8 @@ public void onResponse(DataNodeComputeResponse response) { } for (var entry : response.shardLevelFailures().entrySet()) { final ShardId shardId = entry.getKey(); - trackShardLevelFailure(shardId, false, entry.getValue()); maybeScheduleRetry(shardId, false, entry.getValue()); + trackShardLevelFailure(shardId, false, entry.getValue()); pendingShardIds.add(shardId); } onAfter(response.completionInfo()); @@ -300,8 +300,8 @@ public void onResponse(DataNodeComputeResponse response) { @Override public void onFailure(Exception e, boolean receivedData) { for (ShardId shardId : request.shardIds) { - trackShardLevelFailure(shardId, receivedData, e); maybeScheduleRetry(shardId, receivedData, e); + trackShardLevelFailure(shardId, receivedData, e); pendingShardIds.add(shardId); } onAfter(DriverCompletionInfo.EMPTY); @@ -321,10 +321,13 @@ private void maybeScheduleRetry(ShardId shardId, boolean receivedData, Exception if (receivedData == false && targetShards.getShard(shardId).remainingNodes.isEmpty() && unwrapFailure(shardId, e) instanceof NoShardAvailableActionException) { - if (pendingRetries.isEmpty() && remainingUnavailableShardResolutionAttempts.decrementAndGet() >= 0) { + if (pendingRetries == null && remainingUnavailableShardResolutionAttempts.decrementAndGet() >= 0) { + pendingRetries = new HashSet<>(); sendingLock.lock(); } - pendingRetries.add(shardId); + if (pendingRetries != null) { + pendingRetries.add(shardId); + } } } }); From 8a0dcc6a08aa84c2d91923786a4b73c2f20bd9fd Mon Sep 17 00:00:00 2001 From: "ievgen.degtiarenko" Date: Tue, 6 May 2025 10:37:29 +0200 Subject: [PATCH 4/5] Move retry scheduling to trySendingRequestsForPendingShards --- .../esql/plugin/DataNodeRequestSender.java | 55 ++++++------------- 1 file changed, 18 insertions(+), 37 deletions(-) diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/DataNodeRequestSender.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/DataNodeRequestSender.java index 0bb44dc83dc77..09a595164b8fc 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/DataNodeRequestSender.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/DataNodeRequestSender.java @@ -189,15 +189,21 @@ private void trySendingRequestsForPendingShards(TargetShards targetShards, Compu if (changed.compareAndSet(true, false) == false) { break; } + var pendingRetries = new HashSet(); for (ShardId shardId : pendingShardIds) { if (targetShards.getShard(shardId).remainingNodes.isEmpty()) { - shardFailures.compute( - shardId, - (k, v) -> new ShardFailure( - true, - v == null ? new NoShardAvailableActionException(shardId, "no shard copies found") : v.failure - ) - ); + var failure = shardFailures.get(shardId); + if (failure != null && failure.fatal == false && failure.failure instanceof NoShardAvailableActionException) { + pendingRetries.add(shardId); + } else { + shardFailures.compute( + shardId, + (k, v) -> new ShardFailure( + true, + v == null ? new NoShardAvailableActionException(shardId, "no shard copies found") : v.failure + ) + ); + } } } if (reportedFailure @@ -205,6 +211,11 @@ private void trySendingRequestsForPendingShards(TargetShards targetShards, Compu reportedFailure = true; reportFailures(computeListener); } else { + if (pendingRetries.isEmpty() == false && remainingUnavailableShardResolutionAttempts.decrementAndGet() >= 0) { + for (var entry : resolveShards(pendingRetries).entrySet()) { + targetShards.getShard(entry.getKey()).remainingNodes.addAll(entry.getValue()); + } + } for (NodeRequest request : selectNodeRequests(targetShards)) { sendOneNodeRequest(targetShards, computeListener, request); } @@ -257,25 +268,11 @@ private void sendOneNodeRequest(TargetShards targetShards, ComputeListener compu final ActionListener listener = computeListener.acquireCompute(); sendRequest(request.node, request.shardIds, request.aliasFilters, new NodeListener() { - private Set pendingRetries; - void onAfter(DriverCompletionInfo info) { nodePermits.get(request.node).release(); if (concurrentRequests != null) { concurrentRequests.release(); } - - if (pendingRetries != null) { - try { - var resolutions = resolveShards(pendingRetries); - for (var entry : resolutions.entrySet()) { - targetShards.shards.get(entry.getKey()).remainingNodes.addAll(entry.getValue()); - } - } finally { - sendingLock.unlock(); - } - } - trySendingRequestsForPendingShards(targetShards, computeListener); listener.onResponse(info); } @@ -290,7 +287,6 @@ public void onResponse(DataNodeComputeResponse response) { } for (var entry : response.shardLevelFailures().entrySet()) { final ShardId shardId = entry.getKey(); - maybeScheduleRetry(shardId, false, entry.getValue()); trackShardLevelFailure(shardId, false, entry.getValue()); pendingShardIds.add(shardId); } @@ -300,7 +296,6 @@ public void onResponse(DataNodeComputeResponse response) { @Override public void onFailure(Exception e, boolean receivedData) { for (ShardId shardId : request.shardIds) { - maybeScheduleRetry(shardId, receivedData, e); trackShardLevelFailure(shardId, receivedData, e); pendingShardIds.add(shardId); } @@ -316,20 +311,6 @@ public void onSkip() { onResponse(new DataNodeComputeResponse(DriverCompletionInfo.EMPTY, Map.of())); } } - - private void maybeScheduleRetry(ShardId shardId, boolean receivedData, Exception e) { - if (receivedData == false - && targetShards.getShard(shardId).remainingNodes.isEmpty() - && unwrapFailure(shardId, e) instanceof NoShardAvailableActionException) { - if (pendingRetries == null && remainingUnavailableShardResolutionAttempts.decrementAndGet() >= 0) { - pendingRetries = new HashSet<>(); - sendingLock.lock(); - } - if (pendingRetries != null) { - pendingRetries.add(shardId); - } - } - } }); } From b193ec888ab2a709bffa34fbc5b2edba019ceb39 Mon Sep 17 00:00:00 2001 From: "ievgen.degtiarenko" Date: Tue, 6 May 2025 18:26:54 +0200 Subject: [PATCH 5/5] handle all shards unassigned --- .../esql/plugin/DataNodeRequestSender.java | 29 ++++++++------ .../plugin/DataNodeRequestSenderTests.java | 40 +++++++++++++++++++ 2 files changed, 56 insertions(+), 13 deletions(-) diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/DataNodeRequestSender.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/DataNodeRequestSender.java index 09a595164b8fc..f76d9643e4a6d 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/DataNodeRequestSender.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/DataNodeRequestSender.java @@ -195,27 +195,30 @@ private void trySendingRequestsForPendingShards(TargetShards targetShards, Compu var failure = shardFailures.get(shardId); if (failure != null && failure.fatal == false && failure.failure instanceof NoShardAvailableActionException) { pendingRetries.add(shardId); - } else { - shardFailures.compute( - shardId, - (k, v) -> new ShardFailure( - true, - v == null ? new NoShardAvailableActionException(shardId, "no shard copies found") : v.failure - ) - ); } } } + if (pendingRetries.isEmpty() == false && remainingUnavailableShardResolutionAttempts.decrementAndGet() >= 0) { + for (var entry : resolveShards(pendingRetries).entrySet()) { + targetShards.getShard(entry.getKey()).remainingNodes.addAll(entry.getValue()); + } + } + for (ShardId shardId : pendingShardIds) { + if (targetShards.getShard(shardId).remainingNodes.isEmpty()) { + shardFailures.compute( + shardId, + (k, v) -> new ShardFailure( + true, + v == null ? new NoShardAvailableActionException(shardId, "no shard copies found") : v.failure + ) + ); + } + } if (reportedFailure || (allowPartialResults == false && shardFailures.values().stream().anyMatch(shardFailure -> shardFailure.fatal))) { reportedFailure = true; reportFailures(computeListener); } else { - if (pendingRetries.isEmpty() == false && remainingUnavailableShardResolutionAttempts.decrementAndGet() >= 0) { - for (var entry : resolveShards(pendingRetries).entrySet()) { - targetShards.getShard(entry.getKey()).remainingNodes.addAll(entry.getValue()); - } - } for (NodeRequest request : selectNodeRequests(targetShards)) { sendOneNodeRequest(targetShards, computeListener, request); } diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/plugin/DataNodeRequestSenderTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/plugin/DataNodeRequestSenderTests.java index ce0ef53eadb3f..d10185d95a913 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/plugin/DataNodeRequestSenderTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/plugin/DataNodeRequestSenderTests.java @@ -501,6 +501,46 @@ public void testRetryOnlyMovedShards() { assertThat("Must retry only affected shards", resolvedShards, contains(shard2)); } + public void testRetryUnassignedShardWithoutPartialResults() { + var attempt = new AtomicInteger(0); + var future = sendRequests(false, -1, List.of(targetShard(shard1, node1), targetShard(shard2, node2)), shardIds -> { + attempt.incrementAndGet(); + return Map.of(shard1, List.of()); + }, + (node, shardIds, aliasFilters, listener) -> runWithDelay( + () -> listener.onResponse( + Objects.equals(shardIds, List.of(shard2)) + ? new DataNodeComputeResponse(DriverCompletionInfo.EMPTY, Map.of()) + : new DataNodeComputeResponse(DriverCompletionInfo.EMPTY, Map.of(shard1, new ShardNotFoundException(shard1))) + ) + ) + + ); + expectThrows(NoShardAvailableActionException.class, containsString("no such shard"), future::actionGet); + } + + public void testRetryUnassignedShardWithPartialResults() { + var response = safeGet( + sendRequests( + true, + -1, + List.of(targetShard(shard1, node1), targetShard(shard2, node2)), + shardIds -> Map.of(shard1, List.of()), + (node, shardIds, aliasFilters, listener) -> runWithDelay( + () -> listener.onResponse( + Objects.equals(shardIds, List.of(shard2)) + ? new DataNodeComputeResponse(DriverCompletionInfo.EMPTY, Map.of()) + : new DataNodeComputeResponse(DriverCompletionInfo.EMPTY, Map.of(shard1, new ShardNotFoundException(shard1))) + ) + ) + ) + ); + assertThat(response.totalShards, equalTo(2)); + assertThat(response.successfulShards, equalTo(1)); + assertThat(response.skippedShards, equalTo(0)); + assertThat(response.failedShards, equalTo(1)); + } + static DataNodeRequestSender.TargetShard targetShard(ShardId shardId, DiscoveryNode... nodes) { return new DataNodeRequestSender.TargetShard(shardId, new ArrayList<>(Arrays.asList(nodes)), null); }