Skip to content

Commit 43a2504

Browse files
KAFKA-19567: Added the check for underlying partition being the leader in delayedShareFetch tryComplete method (#20280)
In the current implementation, some delayed share fetch operations get trapped in the delayed share fetch purgatory when the partition leaderships change during share consumption. This is because there is no check in code to make sure the current broker is still the partition leader corresponding to the share partitions. So, when leadership changes, the share partitions cannot be acquired, because they have already been fenced, and tryComplete returns false. Although the operatio does get completed when the timer expires for it, but it is too late by then, and the operation get stuck in the watchers list waiting for it to get purged when estimated operations increase to more than 1000. This Pr resolves this by adding the required check so that if partition leadership changes, then the delayed share fetches waiting on it gets completed instantaneously. Reviewers: Apoorv Mittal <[email protected]>, Andrew Schofield <[email protected]>
1 parent 8deb6c6 commit 43a2504

File tree

4 files changed

+195
-5
lines changed

4 files changed

+195
-5
lines changed

core/src/main/java/kafka/server/share/DelayedShareFetch.java

Lines changed: 20 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import org.apache.kafka.common.protocol.Errors;
3030
import org.apache.kafka.common.requests.FetchRequest;
3131
import org.apache.kafka.common.utils.Time;
32+
import org.apache.kafka.raft.errors.NotLeaderException;
3233
import org.apache.kafka.server.LogReadResult;
3334
import org.apache.kafka.server.metrics.KafkaMetricsGroup;
3435
import org.apache.kafka.server.purgatory.DelayedOperation;
@@ -368,6 +369,14 @@ public boolean tryComplete() {
368369
"topic partitions {}", shareFetch.groupId(), shareFetch.memberId(),
369370
sharePartitions.keySet());
370371
}
372+
// At this point, there could be delayed requests sitting in the purgatory which are waiting on
373+
// DelayedShareFetchPartitionKeys corresponding to partitions, whose leader has been changed to a different broker.
374+
// In that case, such partitions would not be able to get acquired, and the tryComplete will keep on returning false.
375+
// Eventually the operation will get timed out and completed, but it might not get removed from the purgatory.
376+
// This has been eventually left it like this because the purging mechanism will trigger whenever the number of completed
377+
// but still being watched operations is larger than the purge interval. This purge interval is defined by the config
378+
// share.fetch.purgatory.purge.interval.requests and is 1000 by default, thereby ensuring that such stale operations do not
379+
// grow indefinitely.
371380
return false;
372381
} catch (Exception e) {
373382
log.error("Error processing delayed share fetch request", e);
@@ -757,30 +766,37 @@ private void processRemoteFetchOrException(
757766
* Case a: The partition is in an offline log directory on this broker
758767
* Case b: This broker does not know the partition it tries to fetch
759768
* Case c: This broker is no longer the leader of the partition it tries to fetch
760-
* Case d: All remote storage read requests completed
769+
* Case d: This broker is no longer the leader or follower of the partition it tries to fetch
770+
* Case e: All remote storage read requests completed
761771
* @return boolean representing whether the remote fetch is completed or not.
762772
*/
763773
private boolean maybeCompletePendingRemoteFetch() {
764774
boolean canComplete = false;
765775

766776
for (TopicIdPartition topicIdPartition : pendingRemoteFetchesOpt.get().fetchOffsetMetadataMap().keySet()) {
767777
try {
768-
replicaManager.getPartitionOrException(topicIdPartition.topicPartition());
778+
Partition partition = replicaManager.getPartitionOrException(topicIdPartition.topicPartition());
779+
if (!partition.isLeader()) {
780+
throw new NotLeaderException("Broker is no longer the leader of topicPartition: " + topicIdPartition);
781+
}
769782
} catch (KafkaStorageException e) { // Case a
770783
log.debug("TopicPartition {} is in an offline log directory, satisfy {} immediately", topicIdPartition, shareFetch.fetchParams());
771784
canComplete = true;
772785
} catch (UnknownTopicOrPartitionException e) { // Case b
773786
log.debug("Broker no longer knows of topicPartition {}, satisfy {} immediately", topicIdPartition, shareFetch.fetchParams());
774787
canComplete = true;
775-
} catch (NotLeaderOrFollowerException e) { // Case c
788+
} catch (NotLeaderException e) { // Case c
789+
log.debug("Broker is no longer the leader of topicPartition {}, satisfy {} immediately", topicIdPartition, shareFetch.fetchParams());
790+
canComplete = true;
791+
} catch (NotLeaderOrFollowerException e) { // Case d
776792
log.debug("Broker is no longer the leader or follower of topicPartition {}, satisfy {} immediately", topicIdPartition, shareFetch.fetchParams());
777793
canComplete = true;
778794
}
779795
if (canComplete)
780796
break;
781797
}
782798

783-
if (canComplete || pendingRemoteFetchesOpt.get().isDone()) { // Case d
799+
if (canComplete || pendingRemoteFetchesOpt.get().isDone()) { // Case e
784800
return forceComplete();
785801
} else
786802
return false;

core/src/main/java/kafka/server/share/SharePartitionManager.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -774,6 +774,7 @@ private static void removeSharePartitionFromCache(
774774
if (sharePartition != null) {
775775
sharePartition.markFenced();
776776
replicaManager.removeListener(sharePartitionKey.topicIdPartition().topicPartition(), sharePartition.listener());
777+
replicaManager.completeDelayedShareFetchRequest(new DelayedShareFetchGroupKey(sharePartitionKey.groupId(), sharePartitionKey.topicIdPartition()));
777778
}
778779
}
779780

core/src/test/java/kafka/server/share/DelayedShareFetchTest.java

Lines changed: 172 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -153,13 +153,24 @@ public void testDelayedShareFetchTryCompleteReturnsFalseDueToNonAcquirablePartit
153153
when(sp0.canAcquireRecords()).thenReturn(false);
154154
when(sp1.canAcquireRecords()).thenReturn(false);
155155

156+
Partition p0 = mock(Partition.class);
157+
when(p0.isLeader()).thenReturn(true);
158+
159+
Partition p1 = mock(Partition.class);
160+
when(p1.isLeader()).thenReturn(true);
161+
162+
ReplicaManager replicaManager = mock(ReplicaManager.class);
163+
when(replicaManager.getPartitionOrException(tp0.topicPartition())).thenReturn(p0);
164+
when(replicaManager.getPartitionOrException(tp1.topicPartition())).thenReturn(p1);
165+
156166
ShareGroupMetrics shareGroupMetrics = new ShareGroupMetrics(new MockTime());
157167
Uuid fetchId = Uuid.randomUuid();
158168
DelayedShareFetch delayedShareFetch = spy(DelayedShareFetchBuilder.builder()
159169
.withShareFetchData(shareFetch)
160170
.withSharePartitions(sharePartitions)
161171
.withShareGroupMetrics(shareGroupMetrics)
162172
.withFetchId(fetchId)
173+
.withReplicaManager(replicaManager)
163174
.build());
164175

165176
when(sp0.maybeAcquireFetchLock(fetchId)).thenReturn(true);
@@ -218,6 +229,15 @@ public void testTryCompleteWhenMinBytesNotSatisfiedOnFirstFetch() {
218229

219230
PartitionMaxBytesStrategy partitionMaxBytesStrategy = mockPartitionMaxBytes(Set.of(tp0));
220231

232+
Partition p0 = mock(Partition.class);
233+
when(p0.isLeader()).thenReturn(true);
234+
235+
Partition p1 = mock(Partition.class);
236+
when(p1.isLeader()).thenReturn(true);
237+
238+
when(replicaManager.getPartitionOrException(tp0.topicPartition())).thenReturn(p0);
239+
when(replicaManager.getPartitionOrException(tp1.topicPartition())).thenReturn(p1);
240+
221241
Time time = mock(Time.class);
222242
when(time.hiResClockMs()).thenReturn(100L).thenReturn(110L);
223243
ShareGroupMetrics shareGroupMetrics = new ShareGroupMetrics(time);
@@ -287,6 +307,15 @@ public void testTryCompleteWhenMinBytesNotSatisfiedOnSubsequentFetch() {
287307
mockTopicIdPartitionFetchBytes(replicaManager, tp0, hwmOffsetMetadata);
288308
BiConsumer<SharePartitionKey, Throwable> exceptionHandler = mockExceptionHandler();
289309

310+
Partition p0 = mock(Partition.class);
311+
when(p0.isLeader()).thenReturn(true);
312+
313+
Partition p1 = mock(Partition.class);
314+
when(p1.isLeader()).thenReturn(true);
315+
316+
when(replicaManager.getPartitionOrException(tp0.topicPartition())).thenReturn(p0);
317+
when(replicaManager.getPartitionOrException(tp1.topicPartition())).thenReturn(p1);
318+
290319
Uuid fetchId = Uuid.randomUuid();
291320
DelayedShareFetch delayedShareFetch = spy(DelayedShareFetchBuilder.builder()
292321
.withShareFetchData(shareFetch)
@@ -580,6 +609,19 @@ public void testForceCompleteTriggersDelayedActionsQueue() {
580609
List<DelayedOperationKey> delayedShareFetchWatchKeys = new ArrayList<>();
581610
topicIdPartitions1.forEach(topicIdPartition -> delayedShareFetchWatchKeys.add(new DelayedShareFetchGroupKey(groupId, topicIdPartition.topicId(), topicIdPartition.partition())));
582611

612+
Partition p0 = mock(Partition.class);
613+
when(p0.isLeader()).thenReturn(true);
614+
615+
Partition p1 = mock(Partition.class);
616+
when(p1.isLeader()).thenReturn(true);
617+
618+
Partition p2 = mock(Partition.class);
619+
when(p2.isLeader()).thenReturn(true);
620+
621+
when(replicaManager.getPartitionOrException(tp0.topicPartition())).thenReturn(p0);
622+
when(replicaManager.getPartitionOrException(tp1.topicPartition())).thenReturn(p1);
623+
when(replicaManager.getPartitionOrException(tp2.topicPartition())).thenReturn(p2);
624+
583625
Uuid fetchId1 = Uuid.randomUuid();
584626
DelayedShareFetch delayedShareFetch1 = DelayedShareFetchTest.DelayedShareFetchBuilder.builder()
585627
.withShareFetchData(shareFetch1)
@@ -737,6 +779,12 @@ public void testExceptionInMinBytesCalculation() {
737779
when(time.hiResClockMs()).thenReturn(100L).thenReturn(110L).thenReturn(170L);
738780
ShareGroupMetrics shareGroupMetrics = new ShareGroupMetrics(time);
739781
Uuid fetchId = Uuid.randomUuid();
782+
783+
Partition p0 = mock(Partition.class);
784+
when(p0.isLeader()).thenReturn(true);
785+
786+
when(replicaManager.getPartitionOrException(tp0.topicPartition())).thenReturn(p0);
787+
740788
DelayedShareFetch delayedShareFetch = spy(DelayedShareFetchBuilder.builder()
741789
.withShareFetchData(shareFetch)
742790
.withSharePartitions(sharePartitions)
@@ -881,10 +929,18 @@ public void testLocksReleasedAcquireException() {
881929
BROKER_TOPIC_STATS);
882930

883931
Uuid fetchId = Uuid.randomUuid();
932+
933+
Partition p0 = mock(Partition.class);
934+
when(p0.isLeader()).thenReturn(true);
935+
936+
ReplicaManager replicaManager = mock(ReplicaManager.class);
937+
when(replicaManager.getPartitionOrException(tp0.topicPartition())).thenReturn(p0);
938+
884939
DelayedShareFetch delayedShareFetch = DelayedShareFetchTest.DelayedShareFetchBuilder.builder()
885940
.withShareFetchData(shareFetch)
886941
.withSharePartitions(sharePartitions)
887942
.withFetchId(fetchId)
943+
.withReplicaManager(replicaManager)
888944
.build();
889945

890946
when(sp0.maybeAcquireFetchLock(fetchId)).thenReturn(true);
@@ -1263,6 +1319,19 @@ public void testRemoteStorageFetchTryCompleteReturnsFalse() {
12631319
when(remoteLogManager.asyncRead(any(), any())).thenReturn(mock(Future.class));
12641320
when(replicaManager.remoteLogManager()).thenReturn(Option.apply(remoteLogManager));
12651321

1322+
Partition p0 = mock(Partition.class);
1323+
when(p0.isLeader()).thenReturn(true);
1324+
1325+
Partition p1 = mock(Partition.class);
1326+
when(p1.isLeader()).thenReturn(true);
1327+
1328+
Partition p2 = mock(Partition.class);
1329+
when(p2.isLeader()).thenReturn(true);
1330+
1331+
when(replicaManager.getPartitionOrException(tp0.topicPartition())).thenReturn(p0);
1332+
when(replicaManager.getPartitionOrException(tp1.topicPartition())).thenReturn(p1);
1333+
when(replicaManager.getPartitionOrException(tp2.topicPartition())).thenReturn(p2);
1334+
12661335
Uuid fetchId = Uuid.randomUuid();
12671336
DelayedShareFetch delayedShareFetch = spy(DelayedShareFetchBuilder.builder()
12681337
.withShareFetchData(shareFetch)
@@ -1288,6 +1357,70 @@ public void testRemoteStorageFetchTryCompleteReturnsFalse() {
12881357
delayedShareFetch.lock().unlock();
12891358
}
12901359

1360+
@Test
1361+
public void testRemoteStorageFetchPartitionLeaderChanged() {
1362+
ReplicaManager replicaManager = mock(ReplicaManager.class);
1363+
TopicIdPartition tp0 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0));
1364+
1365+
SharePartition sp0 = mock(SharePartition.class);
1366+
1367+
when(sp0.canAcquireRecords()).thenReturn(true);
1368+
1369+
LinkedHashMap<TopicIdPartition, SharePartition> sharePartitions = new LinkedHashMap<>();
1370+
sharePartitions.put(tp0, sp0);
1371+
1372+
ShareFetch shareFetch = new ShareFetch(FETCH_PARAMS, "grp", Uuid.randomUuid().toString(),
1373+
new CompletableFuture<>(), List.of(tp0), BATCH_SIZE, MAX_FETCH_RECORDS,
1374+
BROKER_TOPIC_STATS);
1375+
1376+
when(sp0.nextFetchOffset()).thenReturn(10L);
1377+
1378+
// Fetch offset does not match with the cached entry for sp0, hence, a replica manager fetch will happen for sp0.
1379+
when(sp0.fetchOffsetMetadata(anyLong())).thenReturn(Optional.empty());
1380+
1381+
// Mocking remote storage read result for tp0.
1382+
doAnswer(invocation -> buildLocalAndRemoteFetchResult(Set.of(), Set.of(tp0))).when(replicaManager).readFromLog(any(), any(), any(ReplicaQuota.class), anyBoolean());
1383+
1384+
// Remote fetch related mocks. Remote fetch object does not complete within tryComplete in this mock.
1385+
RemoteLogManager remoteLogManager = mock(RemoteLogManager.class);
1386+
when(remoteLogManager.asyncRead(any(), any())).thenReturn(mock(Future.class));
1387+
when(replicaManager.remoteLogManager()).thenReturn(Option.apply(remoteLogManager));
1388+
1389+
Partition p0 = mock(Partition.class);
1390+
when(p0.isLeader()).thenReturn(false);
1391+
1392+
when(replicaManager.getPartitionOrException(tp0.topicPartition())).thenReturn(p0);
1393+
1394+
Uuid fetchId = Uuid.randomUuid();
1395+
DelayedShareFetch delayedShareFetch = spy(DelayedShareFetchBuilder.builder()
1396+
.withShareFetchData(shareFetch)
1397+
.withSharePartitions(sharePartitions)
1398+
.withReplicaManager(replicaManager)
1399+
.withPartitionMaxBytesStrategy(mockPartitionMaxBytes(Set.of(tp0)))
1400+
.withFetchId(fetchId)
1401+
.build());
1402+
1403+
// All the topic partitions are acquirable.
1404+
when(sp0.maybeAcquireFetchLock(fetchId)).thenReturn(true);
1405+
1406+
// Mock the behaviour of replica manager such that remote storage fetch completion timer task completes on adding it to the watch queue.
1407+
doAnswer(invocationOnMock -> {
1408+
TimerTask timerTask = invocationOnMock.getArgument(0);
1409+
timerTask.run();
1410+
return null;
1411+
}).when(replicaManager).addShareFetchTimerRequest(any());
1412+
1413+
assertFalse(delayedShareFetch.isCompleted());
1414+
assertTrue(delayedShareFetch.tryComplete());
1415+
assertTrue(delayedShareFetch.isCompleted());
1416+
// Remote fetch object gets created for delayed share fetch object.
1417+
assertNotNull(delayedShareFetch.pendingRemoteFetches());
1418+
// Verify the locks are released for local log read topic partitions tp0.
1419+
Mockito.verify(delayedShareFetch, times(1)).releasePartitionLocks(Set.of(tp0));
1420+
assertTrue(delayedShareFetch.lock().tryLock());
1421+
delayedShareFetch.lock().unlock();
1422+
}
1423+
12911424
@Test
12921425
public void testRemoteStorageFetchTryCompleteThrowsException() {
12931426
ReplicaManager replicaManager = mock(ReplicaManager.class);
@@ -1516,6 +1649,16 @@ public void testRemoteStorageFetchRequestCompletionOnFutureCompletionFailure() {
15161649
when(replicaManager.remoteLogManager()).thenReturn(Option.apply(remoteLogManager));
15171650

15181651
Uuid fetchId = Uuid.randomUuid();
1652+
1653+
Partition p0 = mock(Partition.class);
1654+
when(p0.isLeader()).thenReturn(true);
1655+
1656+
Partition p1 = mock(Partition.class);
1657+
when(p1.isLeader()).thenReturn(true);
1658+
1659+
when(replicaManager.getPartitionOrException(tp0.topicPartition())).thenReturn(p0);
1660+
when(replicaManager.getPartitionOrException(tp1.topicPartition())).thenReturn(p1);
1661+
15191662
DelayedShareFetch delayedShareFetch = spy(DelayedShareFetchBuilder.builder()
15201663
.withShareFetchData(shareFetch)
15211664
.withSharePartitions(sharePartitions)
@@ -1586,6 +1729,12 @@ public void testRemoteStorageFetchRequestCompletionOnFutureCompletionSuccessfull
15861729
when(replicaManager.remoteLogManager()).thenReturn(Option.apply(remoteLogManager));
15871730

15881731
Uuid fetchId = Uuid.randomUuid();
1732+
1733+
Partition p0 = mock(Partition.class);
1734+
when(p0.isLeader()).thenReturn(true);
1735+
1736+
when(replicaManager.getPartitionOrException(tp0.topicPartition())).thenReturn(p0);
1737+
15891738
DelayedShareFetch delayedShareFetch = spy(DelayedShareFetchBuilder.builder()
15901739
.withShareFetchData(shareFetch)
15911740
.withSharePartitions(sharePartitions)
@@ -1679,6 +1828,19 @@ public void testRemoteStorageFetchRequestCompletionAlongWithLocalLogRead() {
16791828
}).when(remoteLogManager).asyncRead(any(), any());
16801829
when(replicaManager.remoteLogManager()).thenReturn(Option.apply(remoteLogManager));
16811830

1831+
Partition p0 = mock(Partition.class);
1832+
when(p0.isLeader()).thenReturn(true);
1833+
1834+
Partition p1 = mock(Partition.class);
1835+
when(p1.isLeader()).thenReturn(true);
1836+
1837+
Partition p2 = mock(Partition.class);
1838+
when(p2.isLeader()).thenReturn(true);
1839+
1840+
when(replicaManager.getPartitionOrException(tp0.topicPartition())).thenReturn(p0);
1841+
when(replicaManager.getPartitionOrException(tp1.topicPartition())).thenReturn(p1);
1842+
when(replicaManager.getPartitionOrException(tp2.topicPartition())).thenReturn(p2);
1843+
16821844
Uuid fetchId = Uuid.randomUuid();
16831845
DelayedShareFetch delayedShareFetch = spy(DelayedShareFetchBuilder.builder()
16841846
.withShareFetchData(shareFetch)
@@ -1761,6 +1923,16 @@ public void testRemoteStorageFetchHappensForAllTopicPartitions() {
17611923
when(replicaManager.remoteLogManager()).thenReturn(Option.apply(remoteLogManager));
17621924

17631925
Uuid fetchId = Uuid.randomUuid();
1926+
1927+
Partition p0 = mock(Partition.class);
1928+
when(p0.isLeader()).thenReturn(true);
1929+
1930+
Partition p1 = mock(Partition.class);
1931+
when(p1.isLeader()).thenReturn(true);
1932+
1933+
when(replicaManager.getPartitionOrException(tp0.topicPartition())).thenReturn(p0);
1934+
when(replicaManager.getPartitionOrException(tp1.topicPartition())).thenReturn(p1);
1935+
17641936
DelayedShareFetch delayedShareFetch = spy(DelayedShareFetchBuilder.builder()
17651937
.withShareFetchData(shareFetch)
17661938
.withSharePartitions(sharePartitions)

core/src/test/java/kafka/server/share/SharePartitionManagerTest.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2622,7 +2622,8 @@ public void testSharePartitionPartialInitializationFailure() throws Exception {
26222622
assertEquals(Errors.FENCED_STATE_EPOCH.code(), partitionDataMap.get(tp2).errorCode());
26232623
assertEquals("Fenced state epoch", partitionDataMap.get(tp2).errorMessage());
26242624

2625-
Mockito.verify(replicaManager, times(0)).completeDelayedShareFetchRequest(any());
2625+
Mockito.verify(replicaManager, times(1)).completeDelayedShareFetchRequest(
2626+
new DelayedShareFetchGroupKey(groupId, tp2));
26262627
Mockito.verify(replicaManager, times(1)).readFromLog(
26272628
any(), any(), any(ReplicaQuota.class), anyBoolean());
26282629
// Should have 1 fetch recorded and 1 failure as single topic has multiple partition fetch

0 commit comments

Comments
 (0)