Skip to content

Commit b79439d

Browse files
committed
Fix DataNodeRequestSender
1 parent 9c235c3 commit b79439d

File tree

2 files changed

+25
-9
lines changed

2 files changed

+25
-9
lines changed

muted-tests.yml

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -372,9 +372,6 @@ tests:
372372
- class: org.elasticsearch.analysis.common.CommonAnalysisClientYamlTestSuiteIT
373373
method: test {yaml=analysis-common/40_token_filters/stemmer_override file access}
374374
issue: https://github.com/elastic/elasticsearch/issues/121625
375-
- class: org.elasticsearch.xpack.esql.plugin.DataNodeRequestSenderTests
376-
method: testDoNotRetryOnRequestLevelFailure
377-
issue: https://github.com/elastic/elasticsearch/issues/121966
378375
- class: org.elasticsearch.xpack.searchablesnapshots.hdfs.SecureHdfsSearchableSnapshotsIT
379376
issue: https://github.com/elastic/elasticsearch/issues/121967
380377
- class: org.elasticsearch.action.search.SearchQueryThenFetchAsyncActionTests

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/DataNodeRequestSender.java

Lines changed: 25 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,9 @@
3434
import org.elasticsearch.xpack.esql.action.EsqlSearchShardsAction;
3535

3636
import java.util.ArrayList;
37+
import java.util.Collections;
3738
import java.util.HashMap;
39+
import java.util.IdentityHashMap;
3840
import java.util.Iterator;
3941
import java.util.List;
4042
import java.util.Map;
@@ -58,6 +60,7 @@ abstract class DataNodeRequestSender {
5860
private final Map<DiscoveryNode, Semaphore> nodePermits = new HashMap<>();
5961
private final Map<ShardId, ShardFailure> shardFailures = ConcurrentCollections.newConcurrentMap();
6062
private final AtomicBoolean changed = new AtomicBoolean();
63+
private boolean reportedFailure = false; // guarded by sendingLock
6164

6265
DataNodeRequestSender(TransportService transportService, Executor esqlExecutor, CancellableTask rootTask) {
6366
this.transportService = transportService;
@@ -106,7 +109,9 @@ private void trySendingRequestsForPendingShards(TargetShards targetShards, Compu
106109
if (changed.compareAndSet(true, false) == false) {
107110
break;
108111
}
109-
for (ShardId shardId : pendingShardIds) {
112+
final Iterator<ShardId> shardIts = pendingShardIds.iterator();
113+
while (shardIts.hasNext()) {
114+
final ShardId shardId = shardIts.next();
110115
if (targetShards.getShard(shardId).remainingNodes.isEmpty()) {
111116
shardFailures.compute(
112117
shardId,
@@ -115,12 +120,12 @@ private void trySendingRequestsForPendingShards(TargetShards targetShards, Compu
115120
v == null ? new NoShardAvailableActionException(shardId, "no shard copies found") : v.failure
116121
)
117122
);
123+
shardIts.remove();
118124
}
119125
}
120-
if (shardFailures.values().stream().anyMatch(shardFailure -> shardFailure.fatal)) {
121-
for (var e : shardFailures.values()) {
122-
computeListener.acquireAvoid().onFailure(e.failure);
123-
}
126+
if (reportedFailure || shardFailures.values().stream().anyMatch(shardFailure -> shardFailure.fatal)) {
127+
reportedFailure = true;
128+
reportFailures(computeListener);
124129
} else {
125130
var nodeRequests = selectNodeRequests(targetShards);
126131
for (NodeRequest request : nodeRequests) {
@@ -136,6 +141,20 @@ private void trySendingRequestsForPendingShards(TargetShards targetShards, Compu
136141
}
137142
}
138143

144+
private void reportFailures(ComputeListener computeListener) {
145+
assert sendingLock.isHeldByCurrentThread();
146+
assert reportedFailure;
147+
Iterator<ShardFailure> it = shardFailures.values().iterator();
148+
Set<Exception> seen = Collections.newSetFromMap(new IdentityHashMap<>());
149+
while (it.hasNext()) {
150+
ShardFailure failure = it.next();
151+
if (seen.add(failure.failure)) {
152+
computeListener.acquireAvoid().onFailure(failure.failure);
153+
}
154+
it.remove();
155+
}
156+
}
157+
139158
private void sendOneNodeRequest(TargetShards targetShards, ComputeListener computeListener, NodeRequest request) {
140159
final ActionListener<List<DriverProfile>> listener = computeListener.acquireCompute();
141160
sendRequest(request.node, request.shardIds, request.aliasFilters, new NodeListener() {
@@ -148,7 +167,7 @@ void onAfter(List<DriverProfile> profiles) {
148167
@Override
149168
public void onResponse(DataNodeComputeResponse response) {
150169
// remove failures of successful shards
151-
for (ShardId shardId : targetShards.shardIds()) {
170+
for (ShardId shardId : request.shardIds()) {
152171
if (response.shardLevelFailures().containsKey(shardId) == false) {
153172
shardFailures.remove(shardId);
154173
}

0 commit comments

Comments
 (0)