Skip to content

Commit dfc0366

Browse files
Addressed Jun's round 3 review comments
1 parent ccf3e06 commit dfc0366

File tree

2 files changed

+13
-21
lines changed

2 files changed

+13
-21
lines changed

core/src/main/java/kafka/server/share/DelayedShareFetch.java

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -618,16 +618,18 @@ private boolean maybeProcessRemoteFetch(
618618
LinkedHashMap<TopicIdPartition, Long> topicPartitionData,
619619
TopicPartitionRemoteFetchInfo topicPartitionRemoteFetchInfo
620620
) throws Exception {
621+
Set<TopicIdPartition> nonRemoteFetchTopicPartitions = new LinkedHashSet<>();
621622
topicPartitionData.keySet().forEach(topicIdPartition -> {
622623
// topic partitions for which fetch would not be happening in this share fetch request.
623624
if (!topicPartitionRemoteFetchInfo.topicIdPartition().equals(topicIdPartition)) {
624-
// Release acquisition lock for the topic partitions that were acquired but were not a part of remote fetch.
625-
releasePartitionLocksAndAddToActionQueue(Set.of(topicIdPartition));
625+
nonRemoteFetchTopicPartitions.add(topicIdPartition);
626626
}
627627
});
628+
// Release fetch lock for the topic partitions that were acquired but were not a part of remote fetch and add
629+
// them to the delayed actions queue.
630+
releasePartitionLocksAndAddToActionQueue(nonRemoteFetchTopicPartitions);
628631
Optional<Exception> exceptionOpt = processRemoteFetchOrException(topicPartitionRemoteFetchInfo);
629632
if (exceptionOpt.isPresent()) {
630-
remoteStorageFetchException = exceptionOpt;
631633
throw exceptionOpt.get();
632634
}
633635
// Check if remote fetch can be completed.
@@ -657,8 +659,10 @@ private Optional<Exception> processRemoteFetchOrException(
657659
} catch (RejectedExecutionException e) {
658660
// Return the error if any in scheduling the remote fetch task.
659661
log.warn("Unable to fetch data from remote storage", e);
662+
remoteStorageFetchException = Optional.of(e);
660663
return Optional.of(e);
661664
} catch (Exception e) {
665+
remoteStorageFetchException = Optional.of(e);
662666
return Optional.of(e);
663667
}
664668
remoteFetchOpt = Optional.of(new RemoteFetch(remoteFetchTopicIdPartition, topicPartitionRemoteFetchInfo.logReadResult(), remoteFetchTask, remoteFetchResult, remoteStorageFetchInfo));
@@ -730,7 +734,7 @@ private void releasePartitionLocksAndAddToActionQueue(Set<TopicIdPartition> topi
730734

731735
/**
732736
* This function completes a share fetch request for which we have identified remoteFetch during tryComplete()
733-
* Note - This function should only be called when we know that there is remote fetch in-flight/completed/expired.
737+
* Note - This function should only be called when we know that there is remote fetch.
734738
*/
735739
private void completeRemoteStorageShareFetchRequest() {
736740
LinkedHashMap<TopicIdPartition, Long> acquiredNonRemoteFetchTopicPartitionData = new LinkedHashMap<>();
@@ -842,7 +846,7 @@ private void handleExceptionInCompletingRemoteStorageShareFetchRequest(
842846
/**
843847
* Cancel the remote storage read task, if it has not been executed yet and avoid interrupting the task if it is
844848
* already running as it may force closing opened/cached resources as transaction index.
845-
* Note - This function should only be called when we know that there is a remote fetch in-flight/expired.
849+
* Note - This function should only be called when we know that there is remote fetch.
846850
*/
847851
private void cancelRemoteFetchTask() {
848852
boolean cancelled = remoteFetchOpt.get().remoteFetchTask().cancel(false);

core/src/test/java/kafka/server/share/DelayedShareFetchTest.java

Lines changed: 4 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1228,8 +1228,7 @@ public void testRemoteStorageFetchTryCompleteReturnsFalse() {
12281228
// Remote fetch object gets created for delayed share fetch object.
12291229
assertNotNull(delayedShareFetch.remoteFetch());
12301230
// Verify the locks are released for local log read topic partitions tp0 and tp1.
1231-
Mockito.verify(delayedShareFetch, times(1)).releasePartitionLocks(Set.of(tp0));
1232-
Mockito.verify(delayedShareFetch, times(1)).releasePartitionLocks(Set.of(tp1));
1231+
Mockito.verify(delayedShareFetch, times(1)).releasePartitionLocks(Set.of(tp0, tp1));
12331232
assertTrue(delayedShareFetch.lock().tryLock());
12341233
delayedShareFetch.lock().unlock();
12351234
}
@@ -1459,24 +1458,19 @@ public void testRemoteStorageFetchRequestCompletionOnFutureCompletionFailure() {
14591458
public void testRemoteStorageFetchRequestCompletionOnFutureCompletionSuccessfully() {
14601459
ReplicaManager replicaManager = mock(ReplicaManager.class);
14611460
TopicIdPartition tp0 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0));
1462-
TopicIdPartition tp1 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 1));
14631461

14641462
SharePartition sp0 = mock(SharePartition.class);
1465-
SharePartition sp1 = mock(SharePartition.class);
14661463

14671464
// sp0 is acquirable, sp1 is not acquirable.
14681465
when(sp0.maybeAcquireFetchLock()).thenReturn(true);
1469-
when(sp1.maybeAcquireFetchLock()).thenReturn(false);
14701466
when(sp0.canAcquireRecords()).thenReturn(true);
1471-
when(sp1.canAcquireRecords()).thenReturn(false);
14721467

14731468
LinkedHashMap<TopicIdPartition, SharePartition> sharePartitions = new LinkedHashMap<>();
14741469
sharePartitions.put(tp0, sp0);
1475-
sharePartitions.put(tp1, sp1);
14761470

14771471
CompletableFuture<Map<TopicIdPartition, ShareFetchResponseData.PartitionData>> future = new CompletableFuture<>();
14781472
ShareFetch shareFetch = new ShareFetch(FETCH_PARAMS, "grp", Uuid.randomUuid().toString(),
1479-
future, List.of(tp0, tp1), BATCH_SIZE, MAX_FETCH_RECORDS,
1473+
future, List.of(tp0), BATCH_SIZE, MAX_FETCH_RECORDS,
14801474
BROKER_TOPIC_STATS);
14811475

14821476
when(sp0.nextFetchOffset()).thenReturn(10L);
@@ -1504,7 +1498,7 @@ public void testRemoteStorageFetchRequestCompletionOnFutureCompletionSuccessfull
15041498
.withShareFetchData(shareFetch)
15051499
.withSharePartitions(sharePartitions)
15061500
.withReplicaManager(replicaManager)
1507-
.withPartitionMaxBytesStrategy(mockPartitionMaxBytes(Set.of(tp0, tp1)))
1501+
.withPartitionMaxBytesStrategy(mockPartitionMaxBytes(Set.of(tp0)))
15081502
.build());
15091503

15101504
when(sp0.acquire(any(), anyInt(), anyInt(), anyLong(), any(), any())).thenReturn(
@@ -1531,18 +1525,15 @@ public void testRemoteStorageFetchRequestCompletionAlongWithLocalLogRead() {
15311525
TopicIdPartition tp0 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0));
15321526
TopicIdPartition tp1 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 1));
15331527
TopicIdPartition tp2 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 2));
1534-
TopicIdPartition tp3 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 3));
15351528

15361529
SharePartition sp0 = mock(SharePartition.class);
15371530
SharePartition sp1 = mock(SharePartition.class);
15381531
SharePartition sp2 = mock(SharePartition.class);
1539-
SharePartition sp3 = mock(SharePartition.class);
15401532

15411533
// Except tp3, all the topic partitions are acquirable.
15421534
when(sp0.maybeAcquireFetchLock()).thenReturn(true);
15431535
when(sp1.maybeAcquireFetchLock()).thenReturn(true);
15441536
when(sp2.maybeAcquireFetchLock()).thenReturn(true);
1545-
when(sp3.maybeAcquireFetchLock()).thenReturn(false);
15461537
when(sp0.canAcquireRecords()).thenReturn(true);
15471538
when(sp1.canAcquireRecords()).thenReturn(true);
15481539
when(sp2.canAcquireRecords()).thenReturn(true);
@@ -1551,11 +1542,10 @@ public void testRemoteStorageFetchRequestCompletionAlongWithLocalLogRead() {
15511542
sharePartitions.put(tp0, sp0);
15521543
sharePartitions.put(tp1, sp1);
15531544
sharePartitions.put(tp2, sp2);
1554-
sharePartitions.put(tp3, sp3);
15551545

15561546
CompletableFuture<Map<TopicIdPartition, ShareFetchResponseData.PartitionData>> future = new CompletableFuture<>();
15571547
ShareFetch shareFetch = new ShareFetch(FETCH_PARAMS, "grp", Uuid.randomUuid().toString(),
1558-
future, List.of(tp0, tp1, tp2, tp3), BATCH_SIZE, MAX_FETCH_RECORDS,
1548+
future, List.of(tp0, tp1, tp2), BATCH_SIZE, MAX_FETCH_RECORDS,
15591549
BROKER_TOPIC_STATS);
15601550

15611551
when(sp0.nextFetchOffset()).thenReturn(10L);
@@ -1679,8 +1669,6 @@ public void testRemoteStorageFetchOnlyHappensForFirstTopicPartition() {
16791669

16801670
when(sp0.acquire(any(), anyInt(), anyInt(), anyLong(), any(), any())).thenReturn(
16811671
createShareAcquiredRecords(new ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short) 1)));
1682-
when(sp1.acquire(any(), anyInt(), anyInt(), anyLong(), any(), any())).thenReturn(
1683-
createShareAcquiredRecords(new ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short) 1)));
16841672

16851673
assertFalse(delayedShareFetch.isCompleted());
16861674
assertTrue(delayedShareFetch.tryComplete());

0 commit comments

Comments
 (0)