File tree Expand file tree Collapse file tree 1 file changed +10
-1
lines changed
core/src/main/java/kafka/server/share Expand file tree Collapse file tree 1 file changed +10
-1
lines changed Original file line number Diff line number Diff line change @@ -331,8 +331,17 @@ public boolean tryComplete() {
331331 releasePartitionLocks (topicPartitionData .keySet ());
332332 partitionsAcquired .clear ();
333333 localPartitionsAlreadyFetched .clear ();
334+ return forceComplete ();
335+ } else {
336+ boolean completedByMe = forceComplete ();
337+ // If invocation of forceComplete is not successful, then that means the request is already completed
338+ // hence release the acquired locks. This can occur in case of remote storage fetch if there is a thread that
339+ // completes the operation (due to expiration) right before a different thread is about to enter tryComplete.
340+ if (!completedByMe ) {
341+ releasePartitionLocks (partitionsAcquired .keySet ());
342+ }
343+ return completedByMe ;
334344 }
335- return forceComplete ();
336345 }
337346 }
338347
You can’t perform that action at this time.
0 commit comments