Skip to content

Commit af9e6ca

Browse files
Code refactor
1 parent 7cadffd commit af9e6ca

File tree

2 files changed

+36
-14
lines changed

2 files changed

+36
-14
lines changed

core/src/main/java/kafka/server/share/DelayedShareFetch.java

Lines changed: 33 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -631,6 +631,13 @@ private Optional<Exception> processRemoteFetchOrException(
631631
LinkedHashMap<TopicIdPartition, RemoteStorageFetchInfo> remoteStorageFetchInfoMap,
632632
LinkedHashMap<TopicIdPartition, LogReadResult> replicaManagerReadResponse
633633
) {
634+
// TODO: There is a limitation in remote storage fetch for consumer groups that we can only perform remote fetch for
635+
// a single topic partition in a fetch request. Since, the logic of fetch is largely based on how consumer groups work,
636+
// we are following the same logic. However, this problem should be addressed as part of KAFKA-19133 which should help us perform
637+
// fetch for multiple remote fetch topic partition in a single share fetch request
638+
TopicIdPartition remoteFetchTopicIdPartition = getRemoteFetchTopicIdPartition(remoteStorageFetchInfoMap);
639+
RemoteStorageFetchInfo remoteStorageFetchInfo = remoteStorageFetchInfoMap.get(remoteFetchTopicIdPartition);
640+
634641
LinkedHashMap<TopicIdPartition, LogOffsetMetadata> fetchOffsetMetadataMap = new LinkedHashMap<>();
635642
remoteStorageFetchInfoMap.forEach((topicIdPartition, logReadResult) -> fetchOffsetMetadataMap.put(
636643
topicIdPartition,
@@ -639,16 +646,12 @@ private Optional<Exception> processRemoteFetchOrException(
639646

640647
Future<Void> remoteFetchTask;
641648
CompletableFuture<RemoteLogReadResult> remoteFetchResult = new CompletableFuture<>();
642-
// TODO: This is a limitation in remote storage fetch that there will be fetch only for a single topic partition.
643-
Map.Entry<TopicIdPartition, RemoteStorageFetchInfo> firstRemoteStorageFetchInfo = remoteStorageFetchInfoMap.entrySet().iterator().next();
644-
TopicIdPartition topicIdPartition = firstRemoteStorageFetchInfo.getKey();
645-
RemoteStorageFetchInfo remoteStorageFetchInfo = firstRemoteStorageFetchInfo.getValue();
646649
try {
647650
remoteFetchTask = replicaManager.remoteLogManager().get().asyncRead(
648651
remoteStorageFetchInfo,
649652
result -> {
650653
remoteFetchResult.complete(result);
651-
replicaManager.completeDelayedShareFetchRequest(new DelayedShareFetchGroupKey(shareFetch.groupId(), topicIdPartition.topicId(), topicIdPartition.partition()));
654+
replicaManager.completeDelayedShareFetchRequest(new DelayedShareFetchGroupKey(shareFetch.groupId(), remoteFetchTopicIdPartition.topicId(), remoteFetchTopicIdPartition.partition()));
652655
}
653656
);
654657
} catch (RejectedExecutionException e) {
@@ -658,10 +661,28 @@ private Optional<Exception> processRemoteFetchOrException(
658661
} catch (Exception e) {
659662
return Optional.of(e);
660663
}
661-
remoteFetchOpt = Optional.of(new RemoteFetch(topicIdPartition, remoteFetchTask, remoteFetchResult, remoteStorageFetchInfo, fetchOffsetMetadataMap));
664+
remoteFetchOpt = Optional.of(new RemoteFetch(remoteFetchTopicIdPartition, remoteFetchTask, remoteFetchResult, remoteStorageFetchInfo, fetchOffsetMetadataMap));
662665
return Optional.empty();
663666
}
664667

668+
/**
669+
* This function returns the first topic partition for which we need to perform remote storage fetch. We remove all the
670+
* other partitions that can have a remote storage fetch for further processing and release the fetch locks for them.
671+
* @param remoteStorageFetchInfoMap map containing topic partition to remote storage fetch information.
672+
* @return the first topic partition for which we need to perform remote storage fetch
673+
*/
674+
private TopicIdPartition getRemoteFetchTopicIdPartition(LinkedHashMap<TopicIdPartition, RemoteStorageFetchInfo> remoteStorageFetchInfoMap) {
675+
Map.Entry<TopicIdPartition, RemoteStorageFetchInfo> firstRemoteStorageFetchInfo = remoteStorageFetchInfoMap.entrySet().iterator().next();
676+
TopicIdPartition remoteFetchTopicIdPartition = firstRemoteStorageFetchInfo.getKey();
677+
remoteStorageFetchInfoMap.keySet().forEach(topicIdPartition -> {
678+
if (!topicIdPartition.equals(remoteFetchTopicIdPartition)) {
679+
partitionsAcquired.remove(topicIdPartition);
680+
releasePartitionLocks(Set.of(topicIdPartition));
681+
}
682+
});
683+
return remoteFetchTopicIdPartition;
684+
}
685+
665686
/**
666687
* This function checks if the remote fetch can be completed or not. It should always be called once you confirm remoteFetchOpt.isPresent().
667688
* The operation can be completed if:
@@ -741,26 +762,26 @@ private void completeRemoteStorageShareFetchRequest() {
741762
try {
742763
List<ShareFetchPartitionData> shareFetchPartitionData = new ArrayList<>();
743764
int readableBytes = 0;
744-
if (remoteFetchOpt.get().remoteFetchResult.isDone()) {
765+
if (remoteFetchOpt.get().remoteFetchResult().isDone()) {
745766
RemoteFetch remoteFetch = remoteFetchOpt.get();
746767
if (remoteFetch.remoteFetchResult().get().error.isPresent()) {
747768
Throwable error = remoteFetch.remoteFetchResult().get().error.get();
748769
// If there is any error for the remote fetch topic partition, we populate the error accordingly.
749770
shareFetchPartitionData.add(
750771
new ShareFetchPartitionData(
751-
remoteFetch.topicIdPartition,
752-
partitionsAcquired.get(remoteFetch.topicIdPartition),
772+
remoteFetch.topicIdPartition(),
773+
partitionsAcquired.get(remoteFetch.topicIdPartition()),
753774
ReplicaManager.createLogReadResult(error).toFetchPartitionData(false)
754775
)
755776
);
756777
} else {
757778
FetchDataInfo info = remoteFetch.remoteFetchResult().get().fetchDataInfo.get();
758-
TopicIdPartition topicIdPartition = remoteFetch.topicIdPartition;
779+
TopicIdPartition topicIdPartition = remoteFetch.topicIdPartition();
759780
LogReadResult logReadResult = localPartitionsAlreadyFetched.get(topicIdPartition);
760781
shareFetchPartitionData.add(
761782
new ShareFetchPartitionData(
762783
topicIdPartition,
763-
partitionsAcquired.get(remoteFetch.topicIdPartition),
784+
partitionsAcquired.get(remoteFetch.topicIdPartition()),
764785
new FetchPartitionData(
765786
logReadResult.error(),
766787
logReadResult.highWatermark(),
@@ -785,7 +806,7 @@ private void completeRemoteStorageShareFetchRequest() {
785806
// Get the local log read based topic partitions.
786807
LinkedHashMap<TopicIdPartition, SharePartition> nonRemoteFetchSharePartitions = new LinkedHashMap<>();
787808
sharePartitions.forEach((topicIdPartition, sharePartition) -> {
788-
if (!partitionsAcquired.containsKey(topicIdPartition)) {
809+
if (!partitionsAcquired.containsKey(topicIdPartition) && !remoteFetchOpt.get().fetchOffsetMetadataMap().containsKey(topicIdPartition)) {
789810
nonRemoteFetchSharePartitions.put(topicIdPartition, sharePartition);
790811
}
791812
});

core/src/test/java/kafka/server/share/DelayedShareFetchTest.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1687,8 +1687,9 @@ public void testRemoteStorageFetchOnlyHappensForFirstTopicPartition() {
16871687
assertTrue(delayedShareFetch.isCompleted());
16881688
// Pending remote fetch object gets created for delayed share fetch.
16891689
assertNotNull(delayedShareFetch.remoteFetch());
1690-
// Verify the locks are released for tp0 and tp1.
1691-
Mockito.verify(delayedShareFetch, times(1)).releasePartitionLocks(Set.of(tp0, tp1));
1690+
// Verify the locks are released separately for tp0 (from onComplete) and tp1 (from tryComplete).
1691+
Mockito.verify(delayedShareFetch, times(1)).releasePartitionLocks(Set.of(tp0));
1692+
Mockito.verify(delayedShareFetch, times(1)).releasePartitionLocks(Set.of(tp1));
16921693
assertTrue(shareFetch.isCompleted());
16931694
// Share fetch response only contains the first remote storage fetch topic partition - tp0.
16941695
assertEquals(Set.of(tp0), future.join().keySet());

0 commit comments

Comments
 (0)