Skip to content

Commit 8fe1157

Browse files
MINOR: Adding check for invalid batch from persister in Share Partition (#20979)
While writing unit tests, encounterd incorrect further batches when persister provides corrupt batches. Added a check to fail early rather sending incorrect batches to clients later. Reviewers: Andrew Schofield <[email protected]>
1 parent 7d54f7b commit 8fe1157

File tree

2 files changed

+29
-0
lines changed

2 files changed

+29
-0
lines changed

core/src/main/java/kafka/server/share/SharePartition.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -490,6 +490,15 @@ public CompletableFuture<Void> maybeInitialize() {
490490
throwable = new IllegalStateException(String.format("Failed to initialize the share partition %s-%s", groupId, topicIdPartition));
491491
return;
492492
}
493+
494+
if (stateBatch.lastOffset() < stateBatch.firstOffset()) {
495+
log.error("Invalid state batch found for the share partition: {}-{}. The first offset: {}"
496+
+ " is less than the last offset of the batch: {}.", groupId, topicIdPartition,
497+
stateBatch.firstOffset(), stateBatch.lastOffset());
498+
throwable = new IllegalStateException(String.format("Failed to initialize the share partition %s-%s", groupId, topicIdPartition));
499+
return;
500+
}
501+
493502
if (gapStartOffset == -1 && stateBatch.firstOffset() > previousBatchLastOffset + 1) {
494503
gapStartOffset = previousBatchLastOffset + 1;
495504
}

core/src/test/java/kafka/server/share/SharePartitionTest.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1804,6 +1804,26 @@ public void testMaybeInitializeAndAcquireWithMultipleBatchesPriorStartOffset() {
18041804
assertEquals(3, sharePartition.deliveryCompleteCount());
18051805
}
18061806

1807+
@Test
1808+
public void testMaybeInitializeWithInvalidOffsetInBatch() {
1809+
Persister persister = Mockito.mock(Persister.class);
1810+
ReadShareGroupStateResult readShareGroupStateResult = Mockito.mock(ReadShareGroupStateResult.class);
1811+
Mockito.when(readShareGroupStateResult.topicsData()).thenReturn(List.of(
1812+
new TopicData<>(TOPIC_ID_PARTITION.topicId(), List.of(
1813+
PartitionFactory.newPartitionAllData(0, 3, 5L, Errors.NONE.code(), Errors.NONE.message(),
1814+
List.of(
1815+
new PersisterStateBatch(5L, 10L, RecordState.AVAILABLE.id, (short) 2),
1816+
new PersisterStateBatch(11L, 10L, RecordState.ARCHIVED.id, (short) 3)))))));
1817+
Mockito.when(persister.readState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(readShareGroupStateResult));
1818+
SharePartition sharePartition = SharePartitionBuilder.builder().withPersister(persister).build();
1819+
1820+
CompletableFuture<Void> result = sharePartition.maybeInitialize();
1821+
assertTrue(result.isDone());
1822+
assertTrue(result.isCompletedExceptionally());
1823+
assertFutureThrows(IllegalStateException.class, result);
1824+
assertEquals(SharePartitionState.FAILED, sharePartition.partitionState());
1825+
}
1826+
18071827
@Test
18081828
public void testAcquireSingleRecord() throws InterruptedException {
18091829
SharePartition sharePartition = SharePartitionBuilder.builder()

0 commit comments

Comments
 (0)