@@ -162,14 +162,8 @@ public DelayedShareFetch(
162162
163163 @ Override
164164 public void onExpiration () {
165- // cancel the remote storage read task, if it has not been executed yet and avoid interrupting the task if it is
166- // already running as it may force closing opened/cached resources as transaction index.
167165 if (remoteFetchOpt .isPresent ()) {
168- boolean cancelled = remoteFetchOpt .get ().remoteFetchTask ().cancel (false );
169- if (!cancelled ) {
170- log .debug ("Remote fetch task for RemoteStorageFetchInfo: {} could not be cancelled and its isDone value is {}" ,
171- remoteFetchOpt .get ().remoteFetchInfo (), remoteFetchOpt .get ().remoteFetchTask ().isDone ());
172- }
166+ cancelRemoteFetchTask ();
173167 }
174168 expiredRequestMeter .mark ();
175169 }
@@ -581,6 +575,11 @@ Lock lock() {
581575 return lock ;
582576 }
583577
578+ // Visible for testing.
579+ RemoteFetch remoteFetch () {
580+ return remoteFetchOpt .orElse (null );
581+ }
582+
584583 // Visible for testing.
585584 Meter expiredRequestMeter () {
586585 return expiredRequestMeter ;
@@ -674,7 +673,7 @@ private Optional<Exception> processRemoteFetchOrException(
674673 private boolean maybeCompletePendingRemoteFetch () {
675674 boolean canComplete = false ;
676675
677- for (Map .Entry <TopicIdPartition , LogOffsetMetadata > entry : remoteFetchOpt .get ().fetchOffsetMetadataMap .entrySet ()) {
676+ for (Map .Entry <TopicIdPartition , LogOffsetMetadata > entry : remoteFetchOpt .get ().fetchOffsetMetadataMap () .entrySet ()) {
678677 TopicIdPartition topicIdPartition = entry .getKey ();
679678 LogOffsetMetadata fetchOffsetMetadata = entry .getValue ();
680679 try {
@@ -695,7 +694,7 @@ private boolean maybeCompletePendingRemoteFetch() {
695694 break ;
696695 }
697696
698- if (canComplete || remoteFetchOpt .get ().remoteFetchResult .isDone ()) { // Case d
697+ if (canComplete || remoteFetchOpt .get ().remoteFetchResult () .isDone ()) { // Case d
699698 boolean completedByMe = forceComplete ();
700699 // If invocation of forceComplete is not successful, then that means the request is already completed
701700 // hence release the acquired locks.
@@ -734,7 +733,7 @@ private void releasePartitionLocksAndAddToActionQueue(Set<TopicIdPartition> topi
734733
735734 /**
736735 * This function completes a share fetch request for which we have identified remoteFetch during tryComplete()
737- * It should only be called when we know that there is remote fetch in-flight/completed.
736+ * Note - This function should only be called when we know that there is remote fetch in-flight/completed.
738737 */
739738 private void completeRemoteStorageShareFetchRequest () {
740739 LinkedHashMap <TopicIdPartition , Long > nonRemoteFetchTopicPartitionData = new LinkedHashMap <>();
@@ -776,6 +775,8 @@ private void completeRemoteStorageShareFetchRequest() {
776775 );
777776 readableBytes += info .records .sizeInBytes ();
778777 }
778+ } else {
779+ cancelRemoteFetchTask ();
779780 }
780781
781782 // If remote fetch bytes < shareFetch.fetchParams().maxBytes, then we will try for a local read.
@@ -832,6 +833,19 @@ private void completeRemoteStorageShareFetchRequest() {
832833 }
833834 }
834835
836+ /**
837+ * Cancel the remote storage read task, if it has not been executed yet and avoid interrupting the task if it is
838+ * already running as it may force closing opened/cached resources as transaction index.
839+ * Note - This function should only be called when we know that there is a remote fetch in-flight/completed.
840+ */
841+ private void cancelRemoteFetchTask () {
842+ boolean cancelled = remoteFetchOpt .get ().remoteFetchTask ().cancel (false );
843+ if (!cancelled ) {
844+ log .debug ("Remote fetch task for RemoteStorageFetchInfo: {} could not be cancelled and its isDone value is {}" ,
845+ remoteFetchOpt .get ().remoteFetchInfo (), remoteFetchOpt .get ().remoteFetchTask ().isDone ());
846+ }
847+ }
848+
835849 public record RemoteFetch (
836850 TopicIdPartition topicIdPartition ,
837851 Future <Void > remoteFetchTask ,
0 commit comments