Skip to content

Commit 5ae1203

Browse files
Addressed Jun's round 1 review comments - part 1
1 parent e39508c commit 5ae1203

File tree

2 files changed

+35
-25
lines changed

2 files changed

+35
-25
lines changed

core/src/main/java/kafka/server/share/DelayedShareFetch.java

Lines changed: 33 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -131,7 +131,19 @@ public DelayedShareFetch(
131131
);
132132
}
133133

134-
// The direct usage of this constructor is only from tests.
134+
/**
135+
* This function constructs an instance of delayed share fetch operation for completing share fetch
136+
* requests instantaneously or with delay. The direct usage of this constructor is only from tests.
137+
*
138+
* @param shareFetch The share fetch parameters of the share fetch request.
139+
* @param replicaManager The replica manager instance used to read from log/complete the request.
140+
* @param exceptionHandler The handler to complete share fetch requests with exception.
141+
* @param sharePartitions The share partitions referenced in the share fetch request.
142+
* @param partitionMaxBytesStrategy The strategy to identify the max bytes for topic partitions in the share fetch request.
143+
* @param shareGroupMetrics The share group metrics to record the metrics.
144+
* @param time The system time.
145+
* @param remoteFetchOpt Optional containing an in-flight remote fetch object or an empty optional.
146+
*/
135147
DelayedShareFetch(
136148
ShareFetch shareFetch,
137149
ReplicaManager replicaManager,
@@ -162,9 +174,6 @@ public DelayedShareFetch(
162174

163175
@Override
164176
public void onExpiration() {
165-
if (remoteFetchOpt.isPresent()) {
166-
cancelRemoteFetchTask();
167-
}
168177
expiredRequestMeter.mark();
169178
}
170179

@@ -224,10 +233,10 @@ private void completeLocalLogShareFetchRequest() {
224233
log.trace("Fetchable share partitions data: {} with groupId: {} fetch params: {}",
225234
topicPartitionData, shareFetch.groupId(), shareFetch.fetchParams());
226235

227-
processAcquiredTopicPartitions(topicPartitionData);
236+
processAcquiredTopicPartitionsForLocalLogFetch(topicPartitionData);
228237
}
229238

230-
private void processAcquiredTopicPartitions(LinkedHashMap<TopicIdPartition, Long> topicPartitionData) {
239+
private void processAcquiredTopicPartitionsForLocalLogFetch(LinkedHashMap<TopicIdPartition, Long> topicPartitionData) {
231240
try {
232241
LinkedHashMap<TopicIdPartition, LogReadResult> responseData;
233242
if (localPartitionsAlreadyFetched.isEmpty())
@@ -269,8 +278,6 @@ private void processAcquiredTopicPartitions(LinkedHashMap<TopicIdPartition, Long
269278
@Override
270279
public boolean tryComplete() {
271280
// Check to see if the remote fetch is in flight. If there is an in flight remote fetch we want to resolve it first.
272-
// This will help to prevent starving remote storage partitions and wasting the significant upfront work involved with
273-
// kicking off a fetch from remote storage.
274281
if (remoteFetchOpt.isPresent()) {
275282
return maybeCompletePendingRemoteFetch();
276283
}
@@ -594,7 +601,6 @@ private LinkedHashMap<TopicIdPartition, RemoteStorageFetchInfo> maybePrepareRemo
594601
if (logReadResult.info().delayedRemoteStorageFetch.isPresent()) {
595602
remoteStorageFetchMetadataMap.put(topicIdPartition, logReadResult.info().delayedRemoteStorageFetch.get());
596603
partitionsAcquired.put(topicIdPartition, topicPartitionData.get(topicIdPartition));
597-
localPartitionsAlreadyFetched.put(topicIdPartition, logReadResult);
598604
}
599605
});
600606
return remoteStorageFetchMetadataMap;
@@ -605,15 +611,13 @@ private boolean maybeProcessRemoteFetch(
605611
LinkedHashMap<TopicIdPartition, RemoteStorageFetchInfo> remoteStorageFetchInfoMap,
606612
LinkedHashMap<TopicIdPartition, LogReadResult> replicaManagerReadResponse
607613
) throws Exception {
608-
// topic partitions for which fetching would be happening from local log and not remote storage.
609-
Set<TopicIdPartition> localFetchTopicPartitions = new LinkedHashSet<>();
610614
topicPartitionData.keySet().forEach(topicIdPartition -> {
615+
// topic partitions for which fetching would be happening from local log and not remote storage.
611616
if (!remoteStorageFetchInfoMap.containsKey(topicIdPartition)) {
612-
localFetchTopicPartitions.add(topicIdPartition);
617+
// Release acquisition lock for the topic partitions that were acquired but were not a part of remote fetch.
618+
releasePartitionLocks(Set.of(topicIdPartition));
613619
}
614620
});
615-
// Release acquisition lock for the topic partitions that were acquired but were not a part of remote fetch.
616-
releasePartitionLocks(localFetchTopicPartitions);
617621
Optional<Exception> exceptionOpt = processRemoteFetchOrException(remoteStorageFetchInfoMap, replicaManagerReadResponse);
618622
if (exceptionOpt.isPresent()) {
619623
remoteStorageFetchException = exceptionOpt;
@@ -626,6 +630,7 @@ private boolean maybeProcessRemoteFetch(
626630
/**
627631
* Returns an option containing an exception if a task for RemoteStorageFetchInfo could not be scheduled successfully else returns empty optional.
628632
* @param remoteStorageFetchInfoMap - The topic partition to remote storage fetch info map
633+
* @param replicaManagerReadResponse - The replica manager read response containing log read results for acquired topic partitions
629634
*/
630635
private Optional<Exception> processRemoteFetchOrException(
631636
LinkedHashMap<TopicIdPartition, RemoteStorageFetchInfo> remoteStorageFetchInfoMap,
@@ -635,14 +640,15 @@ private Optional<Exception> processRemoteFetchOrException(
635640
// a single topic partition in a fetch request. Since, the logic of fetch is largely based on how consumer groups work,
636641
// we are following the same logic. However, this problem should be addressed as part of KAFKA-19133 which should help us perform
637642
// fetch for multiple remote fetch topic partition in a single share fetch request
638-
TopicIdPartition remoteFetchTopicIdPartition = getRemoteFetchTopicIdPartition(remoteStorageFetchInfoMap);
643+
TopicIdPartition remoteFetchTopicIdPartition = remoteFetchTopicIdPartition(remoteStorageFetchInfoMap);
639644
RemoteStorageFetchInfo remoteStorageFetchInfo = remoteStorageFetchInfoMap.get(remoteFetchTopicIdPartition);
640645

641646
LinkedHashMap<TopicIdPartition, LogOffsetMetadata> fetchOffsetMetadataMap = new LinkedHashMap<>();
642647
remoteStorageFetchInfoMap.forEach((topicIdPartition, logReadResult) -> fetchOffsetMetadataMap.put(
643648
topicIdPartition,
644649
replicaManagerReadResponse.get(topicIdPartition).info().fetchOffsetMetadata
645650
));
651+
LogReadResult logReadResult = replicaManagerReadResponse.get(remoteFetchTopicIdPartition);
646652

647653
Future<Void> remoteFetchTask;
648654
CompletableFuture<RemoteLogReadResult> remoteFetchResult = new CompletableFuture<>();
@@ -661,7 +667,7 @@ private Optional<Exception> processRemoteFetchOrException(
661667
} catch (Exception e) {
662668
return Optional.of(e);
663669
}
664-
remoteFetchOpt = Optional.of(new RemoteFetch(remoteFetchTopicIdPartition, remoteFetchTask, remoteFetchResult, remoteStorageFetchInfo, fetchOffsetMetadataMap));
670+
remoteFetchOpt = Optional.of(new RemoteFetch(remoteFetchTopicIdPartition, logReadResult, remoteFetchTask, remoteFetchResult, remoteStorageFetchInfo, fetchOffsetMetadataMap));
665671
return Optional.empty();
666672
}
667673

@@ -671,7 +677,7 @@ private Optional<Exception> processRemoteFetchOrException(
671677
* @param remoteStorageFetchInfoMap map containing topic partition to remote storage fetch information.
672678
* @return the first topic partition for which we need to perform remote storage fetch
673679
*/
674-
private TopicIdPartition getRemoteFetchTopicIdPartition(LinkedHashMap<TopicIdPartition, RemoteStorageFetchInfo> remoteStorageFetchInfoMap) {
680+
private TopicIdPartition remoteFetchTopicIdPartition(LinkedHashMap<TopicIdPartition, RemoteStorageFetchInfo> remoteStorageFetchInfoMap) {
675681
Map.Entry<TopicIdPartition, RemoteStorageFetchInfo> firstRemoteStorageFetchInfo = remoteStorageFetchInfoMap.entrySet().iterator().next();
676682
TopicIdPartition remoteFetchTopicIdPartition = firstRemoteStorageFetchInfo.getKey();
677683
remoteStorageFetchInfoMap.keySet().forEach(topicIdPartition -> {
@@ -764,8 +770,9 @@ private void completeRemoteStorageShareFetchRequest() {
764770
int readableBytes = 0;
765771
if (remoteFetchOpt.get().remoteFetchResult().isDone()) {
766772
RemoteFetch remoteFetch = remoteFetchOpt.get();
767-
if (remoteFetch.remoteFetchResult().get().error.isPresent()) {
768-
Throwable error = remoteFetch.remoteFetchResult().get().error.get();
773+
RemoteLogReadResult remoteLogReadResult = remoteFetch.remoteFetchResult().get();
774+
if (remoteLogReadResult.error.isPresent()) {
775+
Throwable error = remoteLogReadResult.error.get();
769776
// If there is any error for the remote fetch topic partition, we populate the error accordingly.
770777
shareFetchPartitionData.add(
771778
new ShareFetchPartitionData(
@@ -775,9 +782,9 @@ private void completeRemoteStorageShareFetchRequest() {
775782
)
776783
);
777784
} else {
778-
FetchDataInfo info = remoteFetch.remoteFetchResult().get().fetchDataInfo.get();
785+
FetchDataInfo info = remoteLogReadResult.fetchDataInfo.get();
779786
TopicIdPartition topicIdPartition = remoteFetch.topicIdPartition();
780-
LogReadResult logReadResult = localPartitionsAlreadyFetched.get(topicIdPartition);
787+
LogReadResult logReadResult = remoteFetch.logReadResult();
781788
shareFetchPartitionData.add(
782789
new ShareFetchPartitionData(
783790
topicIdPartition,
@@ -833,9 +840,9 @@ private void completeRemoteStorageShareFetchRequest() {
833840
}
834841

835842
// Update metric to record acquired to requested partitions.
836-
double requestTopicToAcquired = (double) (partitionsAcquired.size() + nonRemoteFetchTopicPartitionData.size()) / shareFetch.topicIdPartitions().size();
837-
if (requestTopicToAcquired > 0)
838-
shareGroupMetrics.recordTopicPartitionsFetchRatio(shareFetch.groupId(), (int) (requestTopicToAcquired * 100));
843+
double acquiredRatio = (double) (partitionsAcquired.size() + nonRemoteFetchTopicPartitionData.size()) / shareFetch.topicIdPartitions().size();
844+
if (acquiredRatio > 0)
845+
shareGroupMetrics.recordTopicPartitionsFetchRatio(shareFetch.groupId(), (int) (acquiredRatio * 100));
839846

840847
Map<TopicIdPartition, ShareFetchResponseData.PartitionData> remoteFetchResponse = ShareFetchUtils.processFetchResponse(
841848
shareFetch, shareFetchPartitionData, sharePartitions, replicaManager, exceptionHandler);
@@ -870,6 +877,7 @@ private void cancelRemoteFetchTask() {
870877

871878
public record RemoteFetch(
872879
TopicIdPartition topicIdPartition,
880+
LogReadResult logReadResult,
873881
Future<Void> remoteFetchTask,
874882
CompletableFuture<RemoteLogReadResult> remoteFetchResult,
875883
RemoteStorageFetchInfo remoteFetchInfo,
@@ -879,6 +887,7 @@ public record RemoteFetch(
879887
public String toString() {
880888
return "RemoteFetch(" +
881889
"topicIdPartition=" + topicIdPartition +
890+
", logReadResult=" + logReadResult +
882891
", remoteFetchTask=" + remoteFetchTask +
883892
", remoteFetchResult=" + remoteFetchResult +
884893
", remoteFetchInfo=" + remoteFetchInfo +

core/src/test/java/kafka/server/share/DelayedShareFetchTest.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1228,7 +1228,8 @@ public void testRemoteStorageFetchTryCompleteReturnsFalse() {
12281228
// Remote fetch object gets created for delayed share fetch object.
12291229
assertNotNull(delayedShareFetch.remoteFetch());
12301230
// Verify the locks are released for local log read topic partitions tp0 and tp1.
1231-
Mockito.verify(delayedShareFetch, times(1)).releasePartitionLocks(Set.of(tp0, tp1));
1231+
Mockito.verify(delayedShareFetch, times(1)).releasePartitionLocks(Set.of(tp0));
1232+
Mockito.verify(delayedShareFetch, times(1)).releasePartitionLocks(Set.of(tp1));
12321233
assertTrue(delayedShareFetch.lock().tryLock());
12331234
delayedShareFetch.lock().unlock();
12341235
}

0 commit comments

Comments
 (0)