3434import org .elasticsearch .xpack .esql .action .EsqlSearchShardsAction ;
3535
3636import java .util .ArrayList ;
37+ import java .util .Collections ;
3738import java .util .HashMap ;
39+ import java .util .IdentityHashMap ;
3840import java .util .Iterator ;
3941import java .util .List ;
4042import java .util .Map ;
@@ -59,6 +61,7 @@ abstract class DataNodeRequestSender {
5961 private final Map <DiscoveryNode , Semaphore > nodePermits = new HashMap <>();
6062 private final Map <ShardId , ShardFailure > shardFailures = ConcurrentCollections .newConcurrentMap ();
6163 private final AtomicBoolean changed = new AtomicBoolean ();
64+ private boolean reportedFailure = false ; // guarded by sendingLock
6265
6366 DataNodeRequestSender (TransportService transportService , Executor esqlExecutor , CancellableTask rootTask , boolean allowPartialResults ) {
6467 this .transportService = transportService ;
@@ -108,7 +111,9 @@ private void trySendingRequestsForPendingShards(TargetShards targetShards, Compu
108111 if (changed .compareAndSet (true , false ) == false ) {
109112 break ;
110113 }
111- for (ShardId shardId : pendingShardIds ) {
114+ final Iterator <ShardId > shardIts = pendingShardIds .iterator ();
115+ while (shardIts .hasNext ()) {
116+ final ShardId shardId = shardIts .next ();
112117 if (targetShards .getShard (shardId ).remainingNodes .isEmpty ()) {
113118 shardFailures .compute (
114119 shardId ,
@@ -117,12 +122,12 @@ private void trySendingRequestsForPendingShards(TargetShards targetShards, Compu
117122 v == null ? new NoShardAvailableActionException (shardId , "no shard copies found" ) : v .failure
118123 )
119124 );
125+ shardIts .remove ();
120126 }
121127 }
122- if (shardFailures .values ().stream ().anyMatch (shardFailure -> shardFailure .fatal )) {
123- for (var e : shardFailures .values ()) {
124- computeListener .acquireAvoid ().onFailure (e .failure );
125- }
128+ if (reportedFailure || shardFailures .values ().stream ().anyMatch (shardFailure -> shardFailure .fatal )) {
129+ reportedFailure = true ;
130+ reportFailures (computeListener );
126131 } else {
127132 var nodeRequests = selectNodeRequests (targetShards );
128133 for (NodeRequest request : nodeRequests ) {
@@ -138,6 +143,20 @@ private void trySendingRequestsForPendingShards(TargetShards targetShards, Compu
138143 }
139144 }
140145
146+ private void reportFailures (ComputeListener computeListener ) {
147+ assert sendingLock .isHeldByCurrentThread ();
148+ assert reportedFailure ;
149+ Iterator <ShardFailure > it = shardFailures .values ().iterator ();
150+ Set <Exception > seen = Collections .newSetFromMap (new IdentityHashMap <>());
151+ while (it .hasNext ()) {
152+ ShardFailure failure = it .next ();
153+ if (seen .add (failure .failure )) {
154+ computeListener .acquireAvoid ().onFailure (failure .failure );
155+ }
156+ it .remove ();
157+ }
158+ }
159+
141160 private void sendOneNodeRequest (TargetShards targetShards , ComputeListener computeListener , NodeRequest request ) {
142161 final ActionListener <List <DriverProfile >> listener = computeListener .acquireCompute ();
143162 sendRequest (request .node , request .shardIds , request .aliasFilters , new NodeListener () {
@@ -150,7 +169,7 @@ void onAfter(List<DriverProfile> profiles) {
150169 @ Override
151170 public void onResponse (DataNodeComputeResponse response ) {
152171 // remove failures of successful shards
153- for (ShardId shardId : targetShards .shardIds ()) {
172+ for (ShardId shardId : request .shardIds ()) {
154173 if (response .shardLevelFailures ().containsKey (shardId ) == false ) {
155174 shardFailures .remove (shardId );
156175 }
0 commit comments