diff --git a/muted-tests.yml b/muted-tests.yml index ff2ff8c112d3e..663fd7f5e2e97 100644 --- a/muted-tests.yml +++ b/muted-tests.yml @@ -361,9 +361,6 @@ tests: - class: org.elasticsearch.analysis.common.CommonAnalysisClientYamlTestSuiteIT method: test {yaml=analysis-common/40_token_filters/stemmer_override file access} issue: https://github.com/elastic/elasticsearch/issues/121625 -- class: org.elasticsearch.xpack.esql.plugin.DataNodeRequestSenderTests - method: testDoNotRetryOnRequestLevelFailure - issue: https://github.com/elastic/elasticsearch/issues/121966 - class: org.elasticsearch.xpack.searchablesnapshots.hdfs.SecureHdfsSearchableSnapshotsIT issue: https://github.com/elastic/elasticsearch/issues/121967 - class: org.elasticsearch.action.search.SearchQueryThenFetchAsyncActionTests @@ -386,13 +383,8 @@ tests: - class: org.elasticsearch.xpack.ml.integration.ClassificationIT method: testWithOnlyTrainingRowsAndTrainingPercentIsFifty_DependentVariableIsBoolean issue: https://github.com/elastic/elasticsearch/issues/121680 -- class: org.elasticsearch.xpack.esql.action.EsqlActionBreakerIT - issue: https://github.com/elastic/elasticsearch/issues/122153 - class: org.elasticsearch.xpack.esql.qa.single_node.EsqlSpecIT issue: https://github.com/elastic/elasticsearch/issues/121411 -- class: org.elasticsearch.xpack.esql.action.EsqlNodeFailureIT - method: testFailureLoadingFields - issue: https://github.com/elastic/elasticsearch/issues/122132 - class: org.elasticsearch.xpack.downsample.DownsampleActionSingleNodeTests method: testDuplicateDownsampleRequest issue: https://github.com/elastic/elasticsearch/issues/122158 diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/DataNodeRequestSender.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/DataNodeRequestSender.java index 6af2c12ace086..8c2a6bb06da9b 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/DataNodeRequestSender.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/DataNodeRequestSender.java @@ -34,7 +34,9 @@ import org.elasticsearch.xpack.esql.action.EsqlSearchShardsAction; import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; +import java.util.IdentityHashMap; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -58,6 +60,7 @@ abstract class DataNodeRequestSender { private final Map nodePermits = new HashMap<>(); private final Map shardFailures = ConcurrentCollections.newConcurrentMap(); private final AtomicBoolean changed = new AtomicBoolean(); + private boolean reportedFailure = false; // guarded by sendingLock DataNodeRequestSender(TransportService transportService, Executor esqlExecutor, CancellableTask rootTask) { this.transportService = transportService; @@ -117,11 +120,14 @@ private void trySendingRequestsForPendingShards(TargetShards targetShards, Compu ); } } - if (shardFailures.values().stream().anyMatch(shardFailure -> shardFailure.fatal)) { - for (var e : shardFailures.values()) { - computeListener.acquireAvoid().onFailure(e.failure); - } + if (reportedFailure || shardFailures.values().stream().anyMatch(shardFailure -> shardFailure.fatal)) { + reportedFailure = true; + reportFailures(computeListener); } else { + pendingShardIds.removeIf(shr -> { + var failure = shardFailures.get(shr); + return failure != null && failure.fatal; + }); var nodeRequests = selectNodeRequests(targetShards); for (NodeRequest request : nodeRequests) { sendOneNodeRequest(targetShards, computeListener, request); @@ -136,6 +142,20 @@ private void trySendingRequestsForPendingShards(TargetShards targetShards, Compu } } + private void reportFailures(ComputeListener computeListener) { + assert sendingLock.isHeldByCurrentThread(); + assert reportedFailure; + Iterator it = shardFailures.values().iterator(); + Set seen = Collections.newSetFromMap(new IdentityHashMap<>()); + while (it.hasNext()) { + ShardFailure failure = it.next(); + if (seen.add(failure.failure)) { + computeListener.acquireAvoid().onFailure(failure.failure); + } + it.remove(); + } + } + private void sendOneNodeRequest(TargetShards targetShards, ComputeListener computeListener, NodeRequest request) { final ActionListener> listener = computeListener.acquireCompute(); sendRequest(request.node, request.shardIds, request.aliasFilters, new NodeListener() { @@ -148,7 +168,7 @@ void onAfter(List profiles) { @Override public void onResponse(DataNodeComputeResponse response) { // remove failures of successful shards - for (ShardId shardId : targetShards.shardIds()) { + for (ShardId shardId : request.shardIds()) { if (response.shardLevelFailures().containsKey(shardId) == false) { shardFailures.remove(shardId); } @@ -250,6 +270,7 @@ private List selectNodeRequests(TargetShards targetShards) { final Iterator shardsIt = pendingShardIds.iterator(); while (shardsIt.hasNext()) { ShardId shardId = shardsIt.next(); + assert shardFailures.get(shardId) == null || shardFailures.get(shardId).fatal == false; TargetShard shard = targetShards.getShard(shardId); Iterator nodesIt = shard.remainingNodes.iterator(); DiscoveryNode selectedNode = null;