6262import java .util .concurrent .CompletableFuture ;
6363import java .util .concurrent .ExecutionException ;
6464import java .util .concurrent .Future ;
65- import java .util .concurrent .RejectedExecutionException ;
6665import java .util .concurrent .TimeUnit ;
6766import java .util .concurrent .locks .Lock ;
6867import java .util .function .BiConsumer ;
@@ -304,7 +303,7 @@ public boolean tryComplete() {
304303 localPartitionsAlreadyFetched = replicaManagerReadResponse ;
305304 boolean completedByMe = forceComplete ();
306305 // If invocation of forceComplete is not successful, then that means the request is already completed
307- // hence release the acquired locks.
306+ // hence the acquired locks are already released .
308307 if (!completedByMe ) {
309308 releasePartitionLocks (partitionsAcquired .keySet ());
310309 }
@@ -335,7 +334,7 @@ public boolean tryComplete() {
335334 } else {
336335 boolean completedByMe = forceComplete ();
337336 // If invocation of forceComplete is not successful, then that means the request is already completed
338- // hence release the acquired locks. This can occur in case of remote storage fetch if there is a thread that
337+ // hence the acquired locks are already released . This can occur in case of remote storage fetch if there is a thread that
339338 // completes the operation (due to expiration) right before a different thread is about to enter tryComplete.
340339 if (!completedByMe ) {
341340 releasePartitionLocks (partitionsAcquired .keySet ());
@@ -626,7 +625,7 @@ private Optional<TopicPartitionRemoteFetchInfo> maybePrepareRemoteStorageFetchIn
626625 private boolean maybeProcessRemoteFetch (
627626 LinkedHashMap <TopicIdPartition , Long > topicPartitionData ,
628627 TopicPartitionRemoteFetchInfo topicPartitionRemoteFetchInfo
629- ) throws Exception {
628+ ) {
630629 Set <TopicIdPartition > nonRemoteFetchTopicPartitions = new LinkedHashSet <>();
631630 topicPartitionData .keySet ().forEach (topicIdPartition -> {
632631 // topic partitions for which fetch would not be happening in this share fetch request.
@@ -637,19 +636,16 @@ private boolean maybeProcessRemoteFetch(
637636 // Release fetch lock for the topic partitions that were acquired but were not a part of remote fetch and add
638637 // them to the delayed actions queue.
639638 releasePartitionLocksAndAddToActionQueue (nonRemoteFetchTopicPartitions );
640- Optional <Exception > exceptionOpt = processRemoteFetchOrException (topicPartitionRemoteFetchInfo );
641- if (exceptionOpt .isPresent ()) {
642- throw exceptionOpt .get ();
643- }
639+ processRemoteFetchOrException (topicPartitionRemoteFetchInfo );
644640 // Check if remote fetch can be completed.
645641 return maybeCompletePendingRemoteFetch ();
646642 }
647643
648644 /**
649- * Returns an option containing an exception if a task for RemoteStorageFetchInfo could not be scheduled successfully else returns empty optional .
650- * @param topicPartitionRemoteFetchInfo - The remote storage fetch topic partition information.
645+ * Throws an exception if a task for remote storage fetch could not be scheduled successfully else updates remoteFetchOpt .
646+ * @param topicPartitionRemoteFetchInfo - The remote storage fetch information.
651647 */
652- private Optional < Exception > processRemoteFetchOrException (
648+ private void processRemoteFetchOrException (
653649 TopicPartitionRemoteFetchInfo topicPartitionRemoteFetchInfo
654650 ) {
655651 TopicIdPartition remoteFetchTopicIdPartition = topicPartitionRemoteFetchInfo .topicIdPartition ();
@@ -665,17 +661,12 @@ private Optional<Exception> processRemoteFetchOrException(
665661 replicaManager .completeDelayedShareFetchRequest (new DelayedShareFetchGroupKey (shareFetch .groupId (), remoteFetchTopicIdPartition .topicId (), remoteFetchTopicIdPartition .partition ()));
666662 }
667663 );
668- } catch (RejectedExecutionException e ) {
669- // Return the error if any in scheduling the remote fetch task.
670- log .warn ("Unable to fetch data from remote storage" , e );
671- remoteStorageFetchException = Optional .of (e );
672- return Optional .of (e );
673664 } catch (Exception e ) {
665+ // Throw the error if any in scheduling the remote fetch task.
674666 remoteStorageFetchException = Optional .of (e );
675- return Optional . of ( e ) ;
667+ throw e ;
676668 }
677669 remoteFetchOpt = Optional .of (new RemoteFetch (remoteFetchTopicIdPartition , topicPartitionRemoteFetchInfo .logReadResult (), remoteFetchTask , remoteFetchResult , remoteStorageFetchInfo ));
678- return Optional .empty ();
679670 }
680671
681672 /**
0 commit comments