@@ -291,11 +291,11 @@ public boolean tryComplete() {
291291 // replicaManager.readFromLog to populate the offset metadata and update the fetch offset metadata for
292292 // those topic partitions.
293293 LinkedHashMap <TopicIdPartition , LogReadResult > replicaManagerReadResponse = maybeReadFromLog (topicPartitionData );
294- // Map to store the remote fetch metadata corresponding to the topic partitions for which we need to perform remote fetch.
295- LinkedHashMap < TopicIdPartition , RemoteStorageFetchInfo > remoteStorageFetchInfoMap = maybePrepareRemoteStorageFetchInfo (topicPartitionData , replicaManagerReadResponse );
294+ // Store the remote fetch info and the topic partition for which we need to perform remote fetch.
295+ Optional < TopicPartitionRemoteFetchInfo > topicPartitionRemoteFetchInfoOpt = maybePrepareRemoteStorageFetchInfo (topicPartitionData , replicaManagerReadResponse );
296296
297- if (! remoteStorageFetchInfoMap . isEmpty ()) {
298- return maybeProcessRemoteFetch (topicPartitionData , remoteStorageFetchInfoMap , replicaManagerReadResponse );
297+ if (topicPartitionRemoteFetchInfoOpt . isPresent ()) {
298+ return maybeProcessRemoteFetch (topicPartitionData , topicPartitionRemoteFetchInfoOpt . get () , replicaManagerReadResponse );
299299 }
300300 maybeUpdateFetchOffsetMetadata (topicPartitionData , replicaManagerReadResponse );
301301 if (anyPartitionHasLogReadError (replicaManagerReadResponse ) || isMinBytesSatisfied (topicPartitionData , partitionMaxBytesStrategy .maxBytes (shareFetch .fetchParams ().maxBytes , topicPartitionData .keySet (), topicPartitionData .size ()))) {
@@ -592,33 +592,40 @@ Meter expiredRequestMeter() {
592592 return expiredRequestMeter ;
593593 }
594594
595- private LinkedHashMap < TopicIdPartition , RemoteStorageFetchInfo > maybePrepareRemoteStorageFetchInfo (
595+ private Optional < TopicPartitionRemoteFetchInfo > maybePrepareRemoteStorageFetchInfo (
596596 LinkedHashMap <TopicIdPartition , Long > topicPartitionData ,
597597 LinkedHashMap <TopicIdPartition , LogReadResult > replicaManagerReadResponse
598598 ) {
599- LinkedHashMap <TopicIdPartition , RemoteStorageFetchInfo > remoteStorageFetchMetadataMap = new LinkedHashMap <>();
600- replicaManagerReadResponse .forEach ((topicIdPartition , logReadResult ) -> {
599+ Optional <TopicPartitionRemoteFetchInfo > remoteStorageFetchMetadataMap = Optional .empty ();
600+ for (Map .Entry <TopicIdPartition , LogReadResult > entry : replicaManagerReadResponse .entrySet ()) {
601+ TopicIdPartition topicIdPartition = entry .getKey ();
602+ LogReadResult logReadResult = entry .getValue ();
601603 if (logReadResult .info ().delayedRemoteStorageFetch .isPresent ()) {
602- remoteStorageFetchMetadataMap .put (topicIdPartition , logReadResult .info ().delayedRemoteStorageFetch .get ());
604+ // TODO: There is a limitation in remote storage fetch for consumer groups that we can only perform remote fetch for
605+ // a single topic partition in a fetch request. Since, the logic of fetch is largely based on how consumer groups work,
606+ // we are following the same logic. However, this problem should be addressed as part of KAFKA-19133 which should help us perform
607+ // fetch for multiple remote fetch topic partition in a single share fetch request
608+ remoteStorageFetchMetadataMap = Optional .of (new TopicPartitionRemoteFetchInfo (topicIdPartition , logReadResult .info ().delayedRemoteStorageFetch .get ()));
603609 partitionsAcquired .put (topicIdPartition , topicPartitionData .get (topicIdPartition ));
610+ break ;
604611 }
605- });
612+ }
606613 return remoteStorageFetchMetadataMap ;
607614 }
608615
609616 private boolean maybeProcessRemoteFetch (
610617 LinkedHashMap <TopicIdPartition , Long > topicPartitionData ,
611- LinkedHashMap < TopicIdPartition , RemoteStorageFetchInfo > remoteStorageFetchInfoMap ,
618+ TopicPartitionRemoteFetchInfo topicPartitionRemoteFetchInfo ,
612619 LinkedHashMap <TopicIdPartition , LogReadResult > replicaManagerReadResponse
613620 ) throws Exception {
614621 topicPartitionData .keySet ().forEach (topicIdPartition -> {
615- // topic partitions for which fetching would be happening from local log and not remote storage .
616- if (!remoteStorageFetchInfoMap . containsKey (topicIdPartition )) {
622+ // topic partitions for which fetch would not be happening in this share fetch request .
623+ if (!topicPartitionRemoteFetchInfo . topicIdPartition (). equals (topicIdPartition )) {
617624 // Release acquisition lock for the topic partitions that were acquired but were not a part of remote fetch.
618625 releasePartitionLocks (Set .of (topicIdPartition ));
619626 }
620627 });
621- Optional <Exception > exceptionOpt = processRemoteFetchOrException (remoteStorageFetchInfoMap , replicaManagerReadResponse );
628+ Optional <Exception > exceptionOpt = processRemoteFetchOrException (topicPartitionRemoteFetchInfo , replicaManagerReadResponse );
622629 if (exceptionOpt .isPresent ()) {
623630 remoteStorageFetchException = exceptionOpt ;
624631 throw exceptionOpt .get ();
@@ -629,25 +636,15 @@ private boolean maybeProcessRemoteFetch(
629636
630637 /**
631638 * Returns an option containing an exception if a task for RemoteStorageFetchInfo could not be scheduled successfully else returns empty optional.
632- * @param remoteStorageFetchInfoMap - The topic partition to remote storage fetch info map
639+ * @param topicPartitionRemoteFetchInfo - The remote storage fetch topic partition information.
633640 * @param replicaManagerReadResponse - The replica manager read response containing log read results for acquired topic partitions
634641 */
635642 private Optional <Exception > processRemoteFetchOrException (
636- LinkedHashMap < TopicIdPartition , RemoteStorageFetchInfo > remoteStorageFetchInfoMap ,
643+ TopicPartitionRemoteFetchInfo topicPartitionRemoteFetchInfo ,
637644 LinkedHashMap <TopicIdPartition , LogReadResult > replicaManagerReadResponse
638645 ) {
639- // TODO: There is a limitation in remote storage fetch for consumer groups that we can only perform remote fetch for
640- // a single topic partition in a fetch request. Since, the logic of fetch is largely based on how consumer groups work,
641- // we are following the same logic. However, this problem should be addressed as part of KAFKA-19133 which should help us perform
642- // fetch for multiple remote fetch topic partition in a single share fetch request
643- TopicIdPartition remoteFetchTopicIdPartition = remoteFetchTopicIdPartition (remoteStorageFetchInfoMap );
644- RemoteStorageFetchInfo remoteStorageFetchInfo = remoteStorageFetchInfoMap .get (remoteFetchTopicIdPartition );
645-
646- LinkedHashMap <TopicIdPartition , LogOffsetMetadata > fetchOffsetMetadataMap = new LinkedHashMap <>();
647- remoteStorageFetchInfoMap .forEach ((topicIdPartition , logReadResult ) -> fetchOffsetMetadataMap .put (
648- topicIdPartition ,
649- replicaManagerReadResponse .get (topicIdPartition ).info ().fetchOffsetMetadata
650- ));
646+ TopicIdPartition remoteFetchTopicIdPartition = topicPartitionRemoteFetchInfo .topicIdPartition ();
647+ RemoteStorageFetchInfo remoteStorageFetchInfo = topicPartitionRemoteFetchInfo .remoteStorageFetchInfo ();
651648 LogReadResult logReadResult = replicaManagerReadResponse .get (remoteFetchTopicIdPartition );
652649
653650 Future <Void > remoteFetchTask ;
@@ -667,28 +664,10 @@ private Optional<Exception> processRemoteFetchOrException(
667664 } catch (Exception e ) {
668665 return Optional .of (e );
669666 }
670- remoteFetchOpt = Optional .of (new RemoteFetch (remoteFetchTopicIdPartition , logReadResult , remoteFetchTask , remoteFetchResult , remoteStorageFetchInfo , fetchOffsetMetadataMap ));
667+ remoteFetchOpt = Optional .of (new RemoteFetch (remoteFetchTopicIdPartition , logReadResult , remoteFetchTask , remoteFetchResult , remoteStorageFetchInfo ));
671668 return Optional .empty ();
672669 }
673670
674- /**
675- * This function returns the first topic partition for which we need to perform remote storage fetch. We remove all the
676- * other partitions that can have a remote storage fetch for further processing and release the fetch locks for them.
677- * @param remoteStorageFetchInfoMap map containing topic partition to remote storage fetch information.
678- * @return the first topic partition for which we need to perform remote storage fetch
679- */
680- private TopicIdPartition remoteFetchTopicIdPartition (LinkedHashMap <TopicIdPartition , RemoteStorageFetchInfo > remoteStorageFetchInfoMap ) {
681- Map .Entry <TopicIdPartition , RemoteStorageFetchInfo > firstRemoteStorageFetchInfo = remoteStorageFetchInfoMap .entrySet ().iterator ().next ();
682- TopicIdPartition remoteFetchTopicIdPartition = firstRemoteStorageFetchInfo .getKey ();
683- remoteStorageFetchInfoMap .keySet ().forEach (topicIdPartition -> {
684- if (!topicIdPartition .equals (remoteFetchTopicIdPartition )) {
685- partitionsAcquired .remove (topicIdPartition );
686- releasePartitionLocks (Set .of (topicIdPartition ));
687- }
688- });
689- return remoteFetchTopicIdPartition ;
690- }
691-
692671 /**
693672 * This function checks if the remote fetch can be completed or not. It should always be called once you confirm remoteFetchOpt.isPresent().
694673 * The operation can be completed if:
@@ -701,25 +680,18 @@ private TopicIdPartition remoteFetchTopicIdPartition(LinkedHashMap<TopicIdPartit
701680 private boolean maybeCompletePendingRemoteFetch () {
702681 boolean canComplete = false ;
703682
704- for (Map .Entry <TopicIdPartition , LogOffsetMetadata > entry : remoteFetchOpt .get ().fetchOffsetMetadataMap ().entrySet ()) {
705- TopicIdPartition topicIdPartition = entry .getKey ();
706- LogOffsetMetadata fetchOffsetMetadata = entry .getValue ();
707- try {
708- if (fetchOffsetMetadata != LogOffsetMetadata .UNKNOWN_OFFSET_METADATA ) {
709- replicaManager .getPartitionOrException (topicIdPartition .topicPartition ());
710- }
711- } catch (KafkaStorageException e ) { // Case a
712- log .debug ("TopicPartition {} is in an offline log directory, satisfy {} immediately" , topicIdPartition , shareFetch .fetchParams ());
713- canComplete = true ;
714- } catch (UnknownTopicOrPartitionException e ) { // Case b
715- log .debug ("Broker no longer knows of topicPartition {}, satisfy {} immediately" , topicIdPartition , shareFetch .fetchParams ());
716- canComplete = true ;
717- } catch (NotLeaderOrFollowerException e ) { // Case c
718- log .debug ("Broker is no longer the leader or follower of topicPartition {}, satisfy {} immediately" , topicIdPartition , shareFetch .fetchParams ());
719- canComplete = true ;
720- }
721- if (canComplete )
722- break ;
683+ TopicIdPartition topicIdPartition = remoteFetchOpt .get ().topicIdPartition ();
684+ try {
685+ replicaManager .getPartitionOrException (topicIdPartition .topicPartition ());
686+ } catch (KafkaStorageException e ) { // Case a
687+ log .debug ("TopicPartition {} is in an offline log directory, satisfy {} immediately" , topicIdPartition , shareFetch .fetchParams ());
688+ canComplete = true ;
689+ } catch (UnknownTopicOrPartitionException e ) { // Case b
690+ log .debug ("Broker no longer knows of topicPartition {}, satisfy {} immediately" , topicIdPartition , shareFetch .fetchParams ());
691+ canComplete = true ;
692+ } catch (NotLeaderOrFollowerException e ) { // Case c
693+ log .debug ("Broker is no longer the leader or follower of topicPartition {}, satisfy {} immediately" , topicIdPartition , shareFetch .fetchParams ());
694+ canComplete = true ;
723695 }
724696
725697 if (canComplete || remoteFetchOpt .get ().remoteFetchResult ().isDone ()) { // Case d
@@ -813,7 +785,7 @@ private void completeRemoteStorageShareFetchRequest() {
813785 // Get the local log read based topic partitions.
814786 LinkedHashMap <TopicIdPartition , SharePartition > nonRemoteFetchSharePartitions = new LinkedHashMap <>();
815787 sharePartitions .forEach ((topicIdPartition , sharePartition ) -> {
816- if (!partitionsAcquired .containsKey (topicIdPartition ) && ! remoteFetchOpt . get (). fetchOffsetMetadataMap (). containsKey ( topicIdPartition ) ) {
788+ if (!partitionsAcquired .containsKey (topicIdPartition )) {
817789 nonRemoteFetchSharePartitions .put (topicIdPartition , sharePartition );
818790 }
819791 });
@@ -880,8 +852,7 @@ public record RemoteFetch(
880852 LogReadResult logReadResult ,
881853 Future <Void > remoteFetchTask ,
882854 CompletableFuture <RemoteLogReadResult > remoteFetchResult ,
883- RemoteStorageFetchInfo remoteFetchInfo ,
884- LinkedHashMap <TopicIdPartition , LogOffsetMetadata > fetchOffsetMetadataMap
855+ RemoteStorageFetchInfo remoteFetchInfo
885856 ) {
886857 @ Override
887858 public String toString () {
@@ -891,7 +862,19 @@ public String toString() {
891862 ", remoteFetchTask=" + remoteFetchTask +
892863 ", remoteFetchResult=" + remoteFetchResult +
893864 ", remoteFetchInfo=" + remoteFetchInfo +
894- ", fetchOffsetMetadataMap=" + fetchOffsetMetadataMap +
865+ ")" ;
866+ }
867+ }
868+
869+ public record TopicPartitionRemoteFetchInfo (
870+ TopicIdPartition topicIdPartition ,
871+ RemoteStorageFetchInfo remoteStorageFetchInfo
872+ ) {
873+ @ Override
874+ public String toString () {
875+ return "TopicPartitionRemoteFetchInfo(" +
876+ "topicIdPartition=" + topicIdPartition +
877+ ", remoteStorageFetchInfo=" + remoteStorageFetchInfo +
895878 ")" ;
896879 }
897880 }
0 commit comments