6060import java .util .OptionalLong ;
6161import java .util .Set ;
6262import java .util .concurrent .CompletableFuture ;
63+ import java .util .concurrent .ExecutionException ;
6364import java .util .concurrent .Future ;
6465import java .util .concurrent .RejectedExecutionException ;
6566import java .util .concurrent .TimeUnit ;
@@ -295,7 +296,7 @@ public boolean tryComplete() {
295296 Optional <TopicPartitionRemoteFetchInfo > topicPartitionRemoteFetchInfoOpt = maybePrepareRemoteStorageFetchInfo (topicPartitionData , replicaManagerReadResponse );
296297
297298 if (topicPartitionRemoteFetchInfoOpt .isPresent ()) {
298- return maybeProcessRemoteFetch (topicPartitionData , topicPartitionRemoteFetchInfoOpt .get (), replicaManagerReadResponse );
299+ return maybeProcessRemoteFetch (topicPartitionData , topicPartitionRemoteFetchInfoOpt .get ());
299300 }
300301 maybeUpdateFetchOffsetMetadata (topicPartitionData , replicaManagerReadResponse );
301302 if (anyPartitionHasLogReadError (replicaManagerReadResponse ) || isMinBytesSatisfied (topicPartitionData , partitionMaxBytesStrategy .maxBytes (shareFetch .fetchParams ().maxBytes , topicPartitionData .keySet (), topicPartitionData .size ()))) {
@@ -596,7 +597,7 @@ private Optional<TopicPartitionRemoteFetchInfo> maybePrepareRemoteStorageFetchIn
596597 LinkedHashMap <TopicIdPartition , Long > topicPartitionData ,
597598 LinkedHashMap <TopicIdPartition , LogReadResult > replicaManagerReadResponse
598599 ) {
599- Optional <TopicPartitionRemoteFetchInfo > remoteStorageFetchMetadataMap = Optional .empty ();
600+ Optional <TopicPartitionRemoteFetchInfo > topicPartitionRemoteFetchInfoOpt = Optional .empty ();
600601 for (Map .Entry <TopicIdPartition , LogReadResult > entry : replicaManagerReadResponse .entrySet ()) {
601602 TopicIdPartition topicIdPartition = entry .getKey ();
602603 LogReadResult logReadResult = entry .getValue ();
@@ -605,27 +606,26 @@ private Optional<TopicPartitionRemoteFetchInfo> maybePrepareRemoteStorageFetchIn
605606 // a single topic partition in a fetch request. Since, the logic of fetch is largely based on how consumer groups work,
606607 // we are following the same logic. However, this problem should be addressed as part of KAFKA-19133 which should help us perform
607608 // fetch for multiple remote fetch topic partition in a single share fetch request
608- remoteStorageFetchMetadataMap = Optional .of (new TopicPartitionRemoteFetchInfo (topicIdPartition , logReadResult . info (). delayedRemoteStorageFetch . get () ));
609+ topicPartitionRemoteFetchInfoOpt = Optional .of (new TopicPartitionRemoteFetchInfo (topicIdPartition , logReadResult ));
609610 partitionsAcquired .put (topicIdPartition , topicPartitionData .get (topicIdPartition ));
610611 break ;
611612 }
612613 }
613- return remoteStorageFetchMetadataMap ;
614+ return topicPartitionRemoteFetchInfoOpt ;
614615 }
615616
616617 private boolean maybeProcessRemoteFetch (
617618 LinkedHashMap <TopicIdPartition , Long > topicPartitionData ,
618- TopicPartitionRemoteFetchInfo topicPartitionRemoteFetchInfo ,
619- LinkedHashMap <TopicIdPartition , LogReadResult > replicaManagerReadResponse
619+ TopicPartitionRemoteFetchInfo topicPartitionRemoteFetchInfo
620620 ) throws Exception {
621621 topicPartitionData .keySet ().forEach (topicIdPartition -> {
622622 // topic partitions for which fetch would not be happening in this share fetch request.
623623 if (!topicPartitionRemoteFetchInfo .topicIdPartition ().equals (topicIdPartition )) {
624624 // Release acquisition lock for the topic partitions that were acquired but were not a part of remote fetch.
625- releasePartitionLocks (Set .of (topicIdPartition ));
625+ releasePartitionLocksAndAddToActionQueue (Set .of (topicIdPartition ));
626626 }
627627 });
628- Optional <Exception > exceptionOpt = processRemoteFetchOrException (topicPartitionRemoteFetchInfo , replicaManagerReadResponse );
628+ Optional <Exception > exceptionOpt = processRemoteFetchOrException (topicPartitionRemoteFetchInfo );
629629 if (exceptionOpt .isPresent ()) {
630630 remoteStorageFetchException = exceptionOpt ;
631631 throw exceptionOpt .get ();
@@ -637,15 +637,12 @@ private boolean maybeProcessRemoteFetch(
637637 /**
638638 * Returns an option containing an exception if a task for RemoteStorageFetchInfo could not be scheduled successfully else returns empty optional.
639639 * @param topicPartitionRemoteFetchInfo - The remote storage fetch topic partition information.
640- * @param replicaManagerReadResponse - The replica manager read response containing log read results for acquired topic partitions
641640 */
642641 private Optional <Exception > processRemoteFetchOrException (
643- TopicPartitionRemoteFetchInfo topicPartitionRemoteFetchInfo ,
644- LinkedHashMap <TopicIdPartition , LogReadResult > replicaManagerReadResponse
642+ TopicPartitionRemoteFetchInfo topicPartitionRemoteFetchInfo
645643 ) {
646644 TopicIdPartition remoteFetchTopicIdPartition = topicPartitionRemoteFetchInfo .topicIdPartition ();
647- RemoteStorageFetchInfo remoteStorageFetchInfo = topicPartitionRemoteFetchInfo .remoteStorageFetchInfo ();
648- LogReadResult logReadResult = replicaManagerReadResponse .get (remoteFetchTopicIdPartition );
645+ RemoteStorageFetchInfo remoteStorageFetchInfo = topicPartitionRemoteFetchInfo .logReadResult ().info ().delayedRemoteStorageFetch .get ();
649646
650647 Future <Void > remoteFetchTask ;
651648 CompletableFuture <RemoteLogReadResult > remoteFetchResult = new CompletableFuture <>();
@@ -664,7 +661,7 @@ private Optional<Exception> processRemoteFetchOrException(
664661 } catch (Exception e ) {
665662 return Optional .of (e );
666663 }
667- remoteFetchOpt = Optional .of (new RemoteFetch (remoteFetchTopicIdPartition , logReadResult , remoteFetchTask , remoteFetchResult , remoteStorageFetchInfo ));
664+ remoteFetchOpt = Optional .of (new RemoteFetch (remoteFetchTopicIdPartition , topicPartitionRemoteFetchInfo . logReadResult () , remoteFetchTask , remoteFetchResult , remoteStorageFetchInfo ));
668665 return Optional .empty ();
669666 }
670667
@@ -733,10 +730,10 @@ private void releasePartitionLocksAndAddToActionQueue(Set<TopicIdPartition> topi
733730
734731 /**
735732 * This function completes a share fetch request for which we have identified remoteFetch during tryComplete()
736- * Note - This function should only be called when we know that there is remote fetch in-flight/completed.
733+ * Note - This function should only be called when we know that there is remote fetch in-flight/completed/expired .
737734 */
738735 private void completeRemoteStorageShareFetchRequest () {
739- LinkedHashMap <TopicIdPartition , Long > nonRemoteFetchTopicPartitionData = new LinkedHashMap <>();
736+ LinkedHashMap <TopicIdPartition , Long > acquiredNonRemoteFetchTopicPartitionData = new LinkedHashMap <>();
740737 try {
741738 List <ShareFetchPartitionData > shareFetchPartitionData = new ArrayList <>();
742739 int readableBytes = 0 ;
@@ -789,20 +786,20 @@ private void completeRemoteStorageShareFetchRequest() {
789786 nonRemoteFetchSharePartitions .put (topicIdPartition , sharePartition );
790787 }
791788 });
792- nonRemoteFetchTopicPartitionData = acquirablePartitions (nonRemoteFetchSharePartitions );
793- if (!nonRemoteFetchTopicPartitionData .isEmpty ()) {
789+ acquiredNonRemoteFetchTopicPartitionData = acquirablePartitions (nonRemoteFetchSharePartitions );
790+ if (!acquiredNonRemoteFetchTopicPartitionData .isEmpty ()) {
794791 log .trace ("Fetchable local share partitions for a remote share fetch request data: {} with groupId: {} fetch params: {}" ,
795- nonRemoteFetchTopicPartitionData , shareFetch .groupId (), shareFetch .fetchParams ());
792+ acquiredNonRemoteFetchTopicPartitionData , shareFetch .groupId (), shareFetch .fetchParams ());
796793
797794 LinkedHashMap <TopicIdPartition , LogReadResult > responseData = readFromLog (
798- nonRemoteFetchTopicPartitionData ,
799- partitionMaxBytesStrategy .maxBytes (shareFetch .fetchParams ().maxBytes - readableBytes , nonRemoteFetchTopicPartitionData .keySet (), nonRemoteFetchTopicPartitionData .size ()));
795+ acquiredNonRemoteFetchTopicPartitionData ,
796+ partitionMaxBytesStrategy .maxBytes (shareFetch .fetchParams ().maxBytes - readableBytes , acquiredNonRemoteFetchTopicPartitionData .keySet (), acquiredNonRemoteFetchTopicPartitionData .size ()));
800797 for (Map .Entry <TopicIdPartition , LogReadResult > entry : responseData .entrySet ()) {
801798 if (entry .getValue ().info ().delayedRemoteStorageFetch .isEmpty ()) {
802799 shareFetchPartitionData .add (
803800 new ShareFetchPartitionData (
804801 entry .getKey (),
805- nonRemoteFetchTopicPartitionData .get (entry .getKey ()),
802+ acquiredNonRemoteFetchTopicPartitionData .get (entry .getKey ()),
806803 entry .getValue ().toFetchPartitionData (false )
807804 )
808805 );
@@ -812,32 +809,40 @@ private void completeRemoteStorageShareFetchRequest() {
812809 }
813810
814811 // Update metric to record acquired to requested partitions.
815- double acquiredRatio = (double ) (partitionsAcquired .size () + nonRemoteFetchTopicPartitionData .size ()) / shareFetch .topicIdPartitions ().size ();
812+ double acquiredRatio = (double ) (partitionsAcquired .size () + acquiredNonRemoteFetchTopicPartitionData .size ()) / shareFetch .topicIdPartitions ().size ();
816813 if (acquiredRatio > 0 )
817814 shareGroupMetrics .recordTopicPartitionsFetchRatio (shareFetch .groupId (), (int ) (acquiredRatio * 100 ));
818815
819816 Map <TopicIdPartition , ShareFetchResponseData .PartitionData > remoteFetchResponse = ShareFetchUtils .processFetchResponse (
820817 shareFetch , shareFetchPartitionData , sharePartitions , replicaManager , exceptionHandler );
821818 shareFetch .maybeComplete (remoteFetchResponse );
822819 log .trace ("Remote share fetch request completed successfully, response: {}" , remoteFetchResponse );
823- } catch (RuntimeException e ) {
824- throw e ;
820+ } catch (InterruptedException | ExecutionException e ) {
821+ log .error ("Exception occurred in completing remote fetch {} for delayed share fetch request {}" , remoteFetchOpt .get (), e );
822+ handleExceptionInCompletingRemoteStorageShareFetchRequest (acquiredNonRemoteFetchTopicPartitionData .keySet (), e );
825823 } catch (Exception e ) {
826- log .error ("Error processing delayed share fetch request" , e );
827- Set <TopicIdPartition > topicIdPartitions = new LinkedHashSet <>(partitionsAcquired .keySet ());
828- topicIdPartitions .addAll (nonRemoteFetchTopicPartitionData .keySet ());
829- handleFetchException (shareFetch , topicIdPartitions , e );
824+ log .error ("Unexpected error in processing delayed share fetch request" , e );
825+ handleExceptionInCompletingRemoteStorageShareFetchRequest (acquiredNonRemoteFetchTopicPartitionData .keySet (), e );
830826 } finally {
831827 Set <TopicIdPartition > topicIdPartitions = new LinkedHashSet <>(partitionsAcquired .keySet ());
832- topicIdPartitions .addAll (nonRemoteFetchTopicPartitionData .keySet ());
828+ topicIdPartitions .addAll (acquiredNonRemoteFetchTopicPartitionData .keySet ());
833829 releasePartitionLocksAndAddToActionQueue (topicIdPartitions );
834830 }
835831 }
836832
833+ private void handleExceptionInCompletingRemoteStorageShareFetchRequest (
834+ Set <TopicIdPartition > acquiredNonRemoteFetchTopicPartitions ,
835+ Exception e
836+ ) {
837+ Set <TopicIdPartition > topicIdPartitions = new LinkedHashSet <>(partitionsAcquired .keySet ());
838+ topicIdPartitions .addAll (acquiredNonRemoteFetchTopicPartitions );
839+ handleFetchException (shareFetch , topicIdPartitions , e );
840+ }
841+
837842 /**
838843 * Cancel the remote storage read task, if it has not been executed yet and avoid interrupting the task if it is
839844 * already running as it may force closing opened/cached resources as transaction index.
840- * Note - This function should only be called when we know that there is a remote fetch in-flight/completed .
845+ * Note - This function should only be called when we know that there is a remote fetch in-flight/expired .
841846 */
842847 private void cancelRemoteFetchTask () {
843848 boolean cancelled = remoteFetchOpt .get ().remoteFetchTask ().cancel (false );
@@ -868,13 +873,13 @@ public String toString() {
868873
869874 public record TopicPartitionRemoteFetchInfo (
870875 TopicIdPartition topicIdPartition ,
871- RemoteStorageFetchInfo remoteStorageFetchInfo
876+ LogReadResult logReadResult
872877 ) {
873878 @ Override
874879 public String toString () {
875880 return "TopicPartitionRemoteFetchInfo(" +
876881 "topicIdPartition=" + topicIdPartition +
877- ", remoteStorageFetchInfo =" + remoteStorageFetchInfo +
882+ ", logReadResult =" + logReadResult +
878883 ")" ;
879884 }
880885 }
0 commit comments