@@ -257,15 +257,15 @@ private void sendOneNodeRequest(TargetShards targetShards, ComputeListener compu
257257 final ActionListener <DriverCompletionInfo > listener = computeListener .acquireCompute ();
258258 sendRequest (request .node , request .shardIds , request .aliasFilters , new NodeListener () {
259259
260- private final Set <ShardId > pendingRetries = new HashSet <>() ;
260+ private Set <ShardId > pendingRetries ;
261261
262262 void onAfter (DriverCompletionInfo info ) {
263263 nodePermits .get (request .node ).release ();
264264 if (concurrentRequests != null ) {
265265 concurrentRequests .release ();
266266 }
267267
268- if (sendingLock . isHeldByCurrentThread () ) {
268+ if (pendingRetries != null ) {
269269 try {
270270 var resolutions = resolveShards (pendingRetries );
271271 for (var entry : resolutions .entrySet ()) {
@@ -290,8 +290,8 @@ public void onResponse(DataNodeComputeResponse response) {
290290 }
291291 for (var entry : response .shardLevelFailures ().entrySet ()) {
292292 final ShardId shardId = entry .getKey ();
293- trackShardLevelFailure (shardId , false , entry .getValue ());
294293 maybeScheduleRetry (shardId , false , entry .getValue ());
294+ trackShardLevelFailure (shardId , false , entry .getValue ());
295295 pendingShardIds .add (shardId );
296296 }
297297 onAfter (response .completionInfo ());
@@ -300,8 +300,8 @@ public void onResponse(DataNodeComputeResponse response) {
300300 @ Override
301301 public void onFailure (Exception e , boolean receivedData ) {
302302 for (ShardId shardId : request .shardIds ) {
303- trackShardLevelFailure (shardId , receivedData , e );
304303 maybeScheduleRetry (shardId , receivedData , e );
304+ trackShardLevelFailure (shardId , receivedData , e );
305305 pendingShardIds .add (shardId );
306306 }
307307 onAfter (DriverCompletionInfo .EMPTY );
@@ -321,10 +321,13 @@ private void maybeScheduleRetry(ShardId shardId, boolean receivedData, Exception
321321 if (receivedData == false
322322 && targetShards .getShard (shardId ).remainingNodes .isEmpty ()
323323 && unwrapFailure (shardId , e ) instanceof NoShardAvailableActionException ) {
324- if (pendingRetries .isEmpty () && remainingUnavailableShardResolutionAttempts .decrementAndGet () >= 0 ) {
324+ if (pendingRetries == null && remainingUnavailableShardResolutionAttempts .decrementAndGet () >= 0 ) {
325+ pendingRetries = new HashSet <>();
325326 sendingLock .lock ();
326327 }
327- pendingRetries .add (shardId );
328+ if (pendingRetries != null ) {
329+ pendingRetries .add (shardId );
330+ }
328331 }
329332 }
330333 });
0 commit comments