3434import org .elasticsearch .xpack .esql .action .EsqlSearchShardsAction ;
3535
3636import java .util .ArrayList ;
37+ import java .util .Collections ;
3738import java .util .HashMap ;
39+ import java .util .IdentityHashMap ;
3840import java .util .Iterator ;
3941import java .util .List ;
4042import java .util .Map ;
@@ -58,6 +60,7 @@ abstract class DataNodeRequestSender {
5860 private final Map <DiscoveryNode , Semaphore > nodePermits = new HashMap <>();
5961 private final Map <ShardId , ShardFailure > shardFailures = ConcurrentCollections .newConcurrentMap ();
6062 private final AtomicBoolean changed = new AtomicBoolean ();
63+ private boolean reportedFailure = false ; // guarded by sendingLock
6164
6265 DataNodeRequestSender (TransportService transportService , Executor esqlExecutor , CancellableTask rootTask ) {
6366 this .transportService = transportService ;
@@ -106,7 +109,9 @@ private void trySendingRequestsForPendingShards(TargetShards targetShards, Compu
106109 if (changed .compareAndSet (true , false ) == false ) {
107110 break ;
108111 }
109- for (ShardId shardId : pendingShardIds ) {
112+ final Iterator <ShardId > shardIts = pendingShardIds .iterator ();
113+ while (shardIts .hasNext ()) {
114+ final ShardId shardId = shardIts .next ();
110115 if (targetShards .getShard (shardId ).remainingNodes .isEmpty ()) {
111116 shardFailures .compute (
112117 shardId ,
@@ -115,12 +120,12 @@ private void trySendingRequestsForPendingShards(TargetShards targetShards, Compu
115120 v == null ? new NoShardAvailableActionException (shardId , "no shard copies found" ) : v .failure
116121 )
117122 );
123+ shardIts .remove ();
118124 }
119125 }
120- if (shardFailures .values ().stream ().anyMatch (shardFailure -> shardFailure .fatal )) {
121- for (var e : shardFailures .values ()) {
122- computeListener .acquireAvoid ().onFailure (e .failure );
123- }
126+ if (reportedFailure || shardFailures .values ().stream ().anyMatch (shardFailure -> shardFailure .fatal )) {
127+ reportedFailure = true ;
128+ reportFailures (computeListener );
124129 } else {
125130 var nodeRequests = selectNodeRequests (targetShards );
126131 for (NodeRequest request : nodeRequests ) {
@@ -136,6 +141,19 @@ private void trySendingRequestsForPendingShards(TargetShards targetShards, Compu
136141 }
137142 }
138143
144+ private void reportFailures (ComputeListener computeListener ) {
145+ assert sendingLock .isHeldByCurrentThread ();
146+ Iterator <ShardFailure > it = shardFailures .values ().iterator ();
147+ Set <Exception > seen = Collections .newSetFromMap (new IdentityHashMap <>());
148+ while (it .hasNext ()) {
149+ ShardFailure failure = it .next ();
150+ if (seen .add (failure .failure )) {
151+ computeListener .acquireAvoid ().onFailure (failure .failure );
152+ }
153+ it .remove ();
154+ }
155+ }
156+
139157 private void sendOneNodeRequest (TargetShards targetShards , ComputeListener computeListener , NodeRequest request ) {
140158 final ActionListener <List <DriverProfile >> listener = computeListener .acquireCompute ();
141159 sendRequest (request .node , request .shardIds , request .aliasFilters , new NodeListener () {
@@ -148,7 +166,7 @@ void onAfter(List<DriverProfile> profiles) {
148166 @ Override
149167 public void onResponse (DataNodeComputeResponse response ) {
150168 // remove failures of successful shards
151- for (ShardId shardId : targetShards .shardIds ()) {
169+ for (ShardId shardId : request .shardIds ()) {
152170 if (response .shardLevelFailures ().containsKey (shardId ) == false ) {
153171 shardFailures .remove (shardId );
154172 }
0 commit comments