diff --git a/muted-tests.yml b/muted-tests.yml index 2bcc6f32371f8..897cb81e7bbd5 100644 --- a/muted-tests.yml +++ b/muted-tests.yml @@ -399,9 +399,6 @@ tests: - class: org.elasticsearch.packaging.test.DockerTests method: test026InstallBundledRepositoryPluginsViaConfigFile issue: https://github.com/elastic/elasticsearch/issues/127158 -- class: org.elasticsearch.xpack.esql.plugin.DataNodeRequestSenderTests - method: testRetryOnlyMovedShards - issue: https://github.com/elastic/elasticsearch/issues/127168 - class: org.elasticsearch.xpack.esql.plugin.DataNodeRequestSenderIT method: testSearchWhileRelocating issue: https://github.com/elastic/elasticsearch/issues/127188 diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/DataNodeRequestSender.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/DataNodeRequestSender.java index d333eb7238148..f76d9643e4a6d 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/DataNodeRequestSender.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/DataNodeRequestSender.java @@ -189,6 +189,20 @@ private void trySendingRequestsForPendingShards(TargetShards targetShards, Compu if (changed.compareAndSet(true, false) == false) { break; } + var pendingRetries = new HashSet(); + for (ShardId shardId : pendingShardIds) { + if (targetShards.getShard(shardId).remainingNodes.isEmpty()) { + var failure = shardFailures.get(shardId); + if (failure != null && failure.fatal == false && failure.failure instanceof NoShardAvailableActionException) { + pendingRetries.add(shardId); + } + } + } + if (pendingRetries.isEmpty() == false && remainingUnavailableShardResolutionAttempts.decrementAndGet() >= 0) { + for (var entry : resolveShards(pendingRetries).entrySet()) { + targetShards.getShard(entry.getKey()).remainingNodes.addAll(entry.getValue()); + } + } for (ShardId shardId : pendingShardIds) { if (targetShards.getShard(shardId).remainingNodes.isEmpty()) { shardFailures.compute( @@ -257,26 +271,11 @@ private void sendOneNodeRequest(TargetShards targetShards, ComputeListener compu final ActionListener listener = computeListener.acquireCompute(); sendRequest(request.node, request.shardIds, request.aliasFilters, new NodeListener() { - private final Set pendingRetries = new HashSet<>(); - void onAfter(DriverCompletionInfo info) { nodePermits.get(request.node).release(); if (concurrentRequests != null) { concurrentRequests.release(); } - - if (pendingRetries.isEmpty() == false && remainingUnavailableShardResolutionAttempts.decrementAndGet() >= 0) { - try { - sendingLock.lock(); - var resolutions = resolveShards(pendingRetries); - for (var entry : resolutions.entrySet()) { - targetShards.shards.get(entry.getKey()).remainingNodes.addAll(entry.getValue()); - } - } finally { - sendingLock.unlock(); - } - } - trySendingRequestsForPendingShards(targetShards, computeListener); listener.onResponse(info); } @@ -293,7 +292,6 @@ public void onResponse(DataNodeComputeResponse response) { final ShardId shardId = entry.getKey(); trackShardLevelFailure(shardId, false, entry.getValue()); pendingShardIds.add(shardId); - maybeScheduleRetry(shardId, false, entry.getValue()); } onAfter(response.completionInfo()); } @@ -303,7 +301,6 @@ public void onFailure(Exception e, boolean receivedData) { for (ShardId shardId : request.shardIds) { trackShardLevelFailure(shardId, receivedData, e); pendingShardIds.add(shardId); - maybeScheduleRetry(shardId, receivedData, e); } onAfter(DriverCompletionInfo.EMPTY); } @@ -317,14 +314,6 @@ public void onSkip() { onResponse(new DataNodeComputeResponse(DriverCompletionInfo.EMPTY, Map.of())); } } - - private void maybeScheduleRetry(ShardId shardId, boolean receivedData, Exception e) { - if (receivedData == false - && targetShards.getShard(shardId).remainingNodes.isEmpty() - && unwrapFailure(shardId, e) instanceof NoShardAvailableActionException) { - pendingRetries.add(shardId); - } - } }); } diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/plugin/DataNodeRequestSenderTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/plugin/DataNodeRequestSenderTests.java index ce0ef53eadb3f..d10185d95a913 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/plugin/DataNodeRequestSenderTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/plugin/DataNodeRequestSenderTests.java @@ -501,6 +501,46 @@ public void testRetryOnlyMovedShards() { assertThat("Must retry only affected shards", resolvedShards, contains(shard2)); } + public void testRetryUnassignedShardWithoutPartialResults() { + var attempt = new AtomicInteger(0); + var future = sendRequests(false, -1, List.of(targetShard(shard1, node1), targetShard(shard2, node2)), shardIds -> { + attempt.incrementAndGet(); + return Map.of(shard1, List.of()); + }, + (node, shardIds, aliasFilters, listener) -> runWithDelay( + () -> listener.onResponse( + Objects.equals(shardIds, List.of(shard2)) + ? new DataNodeComputeResponse(DriverCompletionInfo.EMPTY, Map.of()) + : new DataNodeComputeResponse(DriverCompletionInfo.EMPTY, Map.of(shard1, new ShardNotFoundException(shard1))) + ) + ) + + ); + expectThrows(NoShardAvailableActionException.class, containsString("no such shard"), future::actionGet); + } + + public void testRetryUnassignedShardWithPartialResults() { + var response = safeGet( + sendRequests( + true, + -1, + List.of(targetShard(shard1, node1), targetShard(shard2, node2)), + shardIds -> Map.of(shard1, List.of()), + (node, shardIds, aliasFilters, listener) -> runWithDelay( + () -> listener.onResponse( + Objects.equals(shardIds, List.of(shard2)) + ? new DataNodeComputeResponse(DriverCompletionInfo.EMPTY, Map.of()) + : new DataNodeComputeResponse(DriverCompletionInfo.EMPTY, Map.of(shard1, new ShardNotFoundException(shard1))) + ) + ) + ) + ); + assertThat(response.totalShards, equalTo(2)); + assertThat(response.successfulShards, equalTo(1)); + assertThat(response.skippedShards, equalTo(0)); + assertThat(response.failedShards, equalTo(1)); + } + static DataNodeRequestSender.TargetShard targetShard(ShardId shardId, DiscoveryNode... nodes) { return new DataNodeRequestSender.TargetShard(shardId, new ArrayList<>(Arrays.asList(nodes)), null); }