Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 0 additions & 8 deletions muted-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -361,9 +361,6 @@ tests:
- class: org.elasticsearch.analysis.common.CommonAnalysisClientYamlTestSuiteIT
method: test {yaml=analysis-common/40_token_filters/stemmer_override file access}
issue: https://github.com/elastic/elasticsearch/issues/121625
- class: org.elasticsearch.xpack.esql.plugin.DataNodeRequestSenderTests
method: testDoNotRetryOnRequestLevelFailure
issue: https://github.com/elastic/elasticsearch/issues/121966
- class: org.elasticsearch.xpack.searchablesnapshots.hdfs.SecureHdfsSearchableSnapshotsIT
issue: https://github.com/elastic/elasticsearch/issues/121967
- class: org.elasticsearch.action.search.SearchQueryThenFetchAsyncActionTests
Expand All @@ -386,13 +383,8 @@ tests:
- class: org.elasticsearch.xpack.ml.integration.ClassificationIT
method: testWithOnlyTrainingRowsAndTrainingPercentIsFifty_DependentVariableIsBoolean
issue: https://github.com/elastic/elasticsearch/issues/121680
- class: org.elasticsearch.xpack.esql.action.EsqlActionBreakerIT
issue: https://github.com/elastic/elasticsearch/issues/122153
- class: org.elasticsearch.xpack.esql.qa.single_node.EsqlSpecIT
issue: https://github.com/elastic/elasticsearch/issues/121411
- class: org.elasticsearch.xpack.esql.action.EsqlNodeFailureIT
method: testFailureLoadingFields
issue: https://github.com/elastic/elasticsearch/issues/122132
- class: org.elasticsearch.xpack.downsample.DownsampleActionSingleNodeTests
method: testDuplicateDownsampleRequest
issue: https://github.com/elastic/elasticsearch/issues/122158
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,9 @@
import org.elasticsearch.xpack.esql.action.EsqlSearchShardsAction;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.IdentityHashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
Expand All @@ -58,6 +60,7 @@ abstract class DataNodeRequestSender {
private final Map<DiscoveryNode, Semaphore> nodePermits = new HashMap<>();
private final Map<ShardId, ShardFailure> shardFailures = ConcurrentCollections.newConcurrentMap();
private final AtomicBoolean changed = new AtomicBoolean();
private boolean reportedFailure = false; // guarded by sendingLock

DataNodeRequestSender(TransportService transportService, Executor esqlExecutor, CancellableTask rootTask) {
this.transportService = transportService;
Expand Down Expand Up @@ -117,11 +120,14 @@ private void trySendingRequestsForPendingShards(TargetShards targetShards, Compu
);
}
}
if (shardFailures.values().stream().anyMatch(shardFailure -> shardFailure.fatal)) {
for (var e : shardFailures.values()) {
computeListener.acquireAvoid().onFailure(e.failure);
}
if (reportedFailure || shardFailures.values().stream().anyMatch(shardFailure -> shardFailure.fatal)) {
reportedFailure = true;
reportFailures(computeListener);
} else {
pendingShardIds.removeIf(shr -> {
var failure = shardFailures.get(shr);
return failure != null && failure.fatal;
});
var nodeRequests = selectNodeRequests(targetShards);
for (NodeRequest request : nodeRequests) {
sendOneNodeRequest(targetShards, computeListener, request);
Expand All @@ -136,6 +142,20 @@ private void trySendingRequestsForPendingShards(TargetShards targetShards, Compu
}
}

private void reportFailures(ComputeListener computeListener) {
assert sendingLock.isHeldByCurrentThread();
assert reportedFailure;
Iterator<ShardFailure> it = shardFailures.values().iterator();
Set<Exception> seen = Collections.newSetFromMap(new IdentityHashMap<>());
while (it.hasNext()) {
ShardFailure failure = it.next();
if (seen.add(failure.failure)) {
computeListener.acquireAvoid().onFailure(failure.failure);
}
it.remove();
}
}

private void sendOneNodeRequest(TargetShards targetShards, ComputeListener computeListener, NodeRequest request) {
final ActionListener<List<DriverProfile>> listener = computeListener.acquireCompute();
sendRequest(request.node, request.shardIds, request.aliasFilters, new NodeListener() {
Expand All @@ -148,7 +168,7 @@ void onAfter(List<DriverProfile> profiles) {
@Override
public void onResponse(DataNodeComputeResponse response) {
// remove failures of successful shards
for (ShardId shardId : targetShards.shardIds()) {
for (ShardId shardId : request.shardIds()) {
if (response.shardLevelFailures().containsKey(shardId) == false) {
shardFailures.remove(shardId);
}
Expand Down Expand Up @@ -250,6 +270,7 @@ private List<NodeRequest> selectNodeRequests(TargetShards targetShards) {
final Iterator<ShardId> shardsIt = pendingShardIds.iterator();
while (shardsIt.hasNext()) {
ShardId shardId = shardsIt.next();
assert shardFailures.get(shardId) == null || shardFailures.get(shardId).fatal == false;
TargetShard shard = targetShards.getShard(shardId);
Iterator<DiscoveryNode> nodesIt = shard.remainingNodes.iterator();
DiscoveryNode selectedNode = null;
Expand Down