Skip to content

Commit def5f16

Browse files
KAFKA-19630: Reordered OR operands in archiveRecords method for SharePartiton (#20391)
As per the current implementation in archiveRecords, when LSO is updated, if we have multiple record batches before the new LSO, then only the first one gets archived. This is because of the following lines of code -> `isAnyOffsetArchived = isAnyOffsetArchived || archivePerOffsetBatchRecords(inFlightBatch, startOffset, endOffset - 1, initialState);` `isAnyBatchArchived = isAnyBatchArchived || archiveCompleteBatch(inFlightBatch, initialState);` The first record / batch will make `isAnyOffsetArchived` / `isAnyBatchArchived` true, after which this line of code will short-circuit and the methods `archivePerOffsetBatchRecords` / `archiveCompleteBatch` will not be called again. This PR changes the order of the expressions so that the short-circuit does not prevent from archiving all the required batches. Reviewers: Apoorv Mittal <[email protected]>
1 parent eeb6a0d commit def5f16

File tree

2 files changed

+118
-2
lines changed

2 files changed

+118
-2
lines changed

core/src/main/java/kafka/server/share/SharePartition.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1202,11 +1202,11 @@ private boolean archiveRecords(long startOffset, long endOffset, NavigableMap<Lo
12021202
}
12031203
inFlightBatch.maybeInitializeOffsetStateUpdate();
12041204
}
1205-
isAnyOffsetArchived = isAnyOffsetArchived || archivePerOffsetBatchRecords(inFlightBatch, startOffset, endOffset - 1, initialState);
1205+
isAnyOffsetArchived = archivePerOffsetBatchRecords(inFlightBatch, startOffset, endOffset - 1, initialState) || isAnyOffsetArchived;
12061206
continue;
12071207
}
12081208
// The in-flight batch is a full match hence change the state of the complete batch.
1209-
isAnyBatchArchived = isAnyBatchArchived || archiveCompleteBatch(inFlightBatch, initialState);
1209+
isAnyBatchArchived = archiveCompleteBatch(inFlightBatch, initialState) || isAnyBatchArchived;
12101210
}
12111211
return isAnyOffsetArchived || isAnyBatchArchived;
12121212
} finally {

core/src/test/java/kafka/server/share/SharePartitionTest.java

Lines changed: 116 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4493,6 +4493,122 @@ public void testLsoMovementForArchivingBatches() {
44934493
assertNotNull(sharePartition.cachedState().get(32L).batchAcquisitionLockTimeoutTask());
44944494
}
44954495

4496+
@Test
4497+
public void testLsoMovementForArchivingAllAvailableBatches() {
4498+
SharePartition sharePartition = SharePartitionBuilder.builder().withState(SharePartitionState.ACTIVE).build();
4499+
4500+
// A client acquires 4 batches, 11 -> 20, 21 -> 30, 31 -> 40, 41 -> 50.
4501+
fetchAcquiredRecords(sharePartition, memoryRecords(10, 11), 10);
4502+
fetchAcquiredRecords(sharePartition, memoryRecords(10, 21), 10);
4503+
fetchAcquiredRecords(sharePartition, memoryRecords(10, 31), 10);
4504+
fetchAcquiredRecords(sharePartition, memoryRecords(10, 41), 10);
4505+
4506+
// After the acknowledgements, the state of share partition will be:
4507+
// 1. 11 -> 20: AVAILABLE
4508+
// 2. 21 -> 30: ACQUIRED
4509+
// 3. 31 -> 40: AVAILABLE
4510+
// 4. 41 -> 50: ACQUIRED
4511+
sharePartition.acknowledge(MEMBER_ID, List.of(
4512+
new ShareAcknowledgementBatch(11, 20, List.of((byte) 2)),
4513+
new ShareAcknowledgementBatch(31, 40, List.of((byte) 2))
4514+
));
4515+
4516+
// Move the LSO to 41. When the LSO moves ahead, all batches that are AVAILABLE before the new LSO will be ARCHIVED.
4517+
// Thus, the state of the share partition will be:
4518+
// 1. 11 -> 20: ARCHIVED
4519+
// 2. 21 -> 30: ACQUIRED
4520+
// 3. 31 -> 40: ARCHIVED
4521+
// 4. 41 -> 50: ACQUIRED
4522+
// Note, the records that are in ACQUIRED state will remain in ACQUIRED state and will be transitioned to a Terminal
4523+
// state when the corresponding acquisition lock timer task expires.
4524+
sharePartition.updateCacheAndOffsets(41);
4525+
4526+
assertEquals(51, sharePartition.nextFetchOffset());
4527+
assertEquals(41, sharePartition.startOffset());
4528+
assertEquals(50, sharePartition.endOffset());
4529+
4530+
assertEquals(4, sharePartition.cachedState().size());
4531+
assertEquals(RecordState.ARCHIVED, sharePartition.cachedState().get(11L).batchState());
4532+
assertEquals(RecordState.ACQUIRED, sharePartition.cachedState().get(21L).batchState());
4533+
assertEquals(RecordState.ARCHIVED, sharePartition.cachedState().get(31L).batchState());
4534+
assertEquals(RecordState.ACQUIRED, sharePartition.cachedState().get(41L).batchState());
4535+
4536+
// The client acknowledges the batch 21 -> 30. Since this batch is before the LSO, nothing will be done and these
4537+
// records will remain in the ACQUIRED state.
4538+
sharePartition.acknowledge(MEMBER_ID, List.of(new ShareAcknowledgementBatch(21L, 30L, List.of((byte) 2))));
4539+
4540+
// The batch is still in ACQUIRED state.
4541+
assertEquals(RecordState.ACQUIRED, sharePartition.cachedState().get(21L).batchState());
4542+
4543+
// Once the acquisition lock timer task for the batch 21 -> 30 is expired, these records will directly be
4544+
// ARCHIVED.
4545+
sharePartition.cachedState().get(21L).batchAcquisitionLockTimeoutTask().run();
4546+
assertEquals(RecordState.ARCHIVED, sharePartition.cachedState().get(21L).batchState());
4547+
}
4548+
4549+
@Test
4550+
public void testLsoMovementForArchivingAllAvailableOffsets() {
4551+
SharePartition sharePartition = SharePartitionBuilder.builder().withState(SharePartitionState.ACTIVE).build();
4552+
4553+
// A client acquires 4 batches, 11 -> 20, 21 -> 30, 31 -> 40, 41 -> 50.
4554+
fetchAcquiredRecords(sharePartition, memoryRecords(10, 11), 10);
4555+
fetchAcquiredRecords(sharePartition, memoryRecords(10, 21), 10);
4556+
fetchAcquiredRecords(sharePartition, memoryRecords(10, 31), 10);
4557+
fetchAcquiredRecords(sharePartition, memoryRecords(10, 41), 10);
4558+
4559+
// After the acknowledgements, the share partition state will be:
4560+
// 1. 11 -> 20: AVAILABLE
4561+
// 2. 21 -> 30: ACQUIRED
4562+
// 3. 31 -> 40: AVAILABLE
4563+
// 4. 41 -> 50: ACQUIRED
4564+
sharePartition.acknowledge(MEMBER_ID, List.of(
4565+
new ShareAcknowledgementBatch(11, 20, List.of((byte) 2)),
4566+
new ShareAcknowledgementBatch(31, 40, List.of((byte) 2))
4567+
));
4568+
4569+
// Move the LSO to 36. When the LSO moves ahead, all records that are AVAILABLE before the new LSO will be ARCHIVED.
4570+
// Thus, the state of the share partition will be:
4571+
// 1. 11 -> 20: ARCHIVED
4572+
// 2. 21 -> 30: ACQUIRED
4573+
// 3. 31 -> 35: ARCHIVED
4574+
// 3. 36 -> 40: AVAILABLE
4575+
// 4. 41 -> 50: ACQUIRED
4576+
// Note, the records that are in ACQUIRED state will remain in ACQUIRED state and will be transitioned to a Terminal
4577+
// state when the corresponding acquisition lock timer task expires.
4578+
sharePartition.updateCacheAndOffsets(36);
4579+
4580+
assertEquals(36, sharePartition.nextFetchOffset());
4581+
assertEquals(36, sharePartition.startOffset());
4582+
assertEquals(50, sharePartition.endOffset());
4583+
4584+
assertEquals(4, sharePartition.cachedState().size());
4585+
assertEquals(RecordState.ARCHIVED, sharePartition.cachedState().get(11L).batchState());
4586+
assertEquals(RecordState.ACQUIRED, sharePartition.cachedState().get(21L).batchState());
4587+
assertEquals(RecordState.ARCHIVED, sharePartition.cachedState().get(31L).offsetState().get(31L).state());
4588+
assertEquals(RecordState.ARCHIVED, sharePartition.cachedState().get(31L).offsetState().get(32L).state());
4589+
assertEquals(RecordState.ARCHIVED, sharePartition.cachedState().get(31L).offsetState().get(33L).state());
4590+
assertEquals(RecordState.ARCHIVED, sharePartition.cachedState().get(31L).offsetState().get(34L).state());
4591+
assertEquals(RecordState.ARCHIVED, sharePartition.cachedState().get(31L).offsetState().get(35L).state());
4592+
assertEquals(RecordState.AVAILABLE, sharePartition.cachedState().get(31L).offsetState().get(36L).state());
4593+
assertEquals(RecordState.AVAILABLE, sharePartition.cachedState().get(31L).offsetState().get(37L).state());
4594+
assertEquals(RecordState.AVAILABLE, sharePartition.cachedState().get(31L).offsetState().get(38L).state());
4595+
assertEquals(RecordState.AVAILABLE, sharePartition.cachedState().get(31L).offsetState().get(39L).state());
4596+
assertEquals(RecordState.AVAILABLE, sharePartition.cachedState().get(31L).offsetState().get(40L).state());
4597+
assertEquals(RecordState.ACQUIRED, sharePartition.cachedState().get(41L).batchState());
4598+
4599+
// The client acknowledges the batch 21 -> 30. Since this batch is before the LSO, nothing will be done and these
4600+
// records will remain in the ACQUIRED state.
4601+
sharePartition.acknowledge(MEMBER_ID, List.of(new ShareAcknowledgementBatch(21L, 30L, List.of((byte) 2))));
4602+
4603+
// The batch is still in ACQUIRED state.
4604+
assertEquals(RecordState.ACQUIRED, sharePartition.cachedState().get(21L).batchState());
4605+
4606+
// Once the acquisition lock timer task for the batch 21 -> 30 is expired, these records will directly be
4607+
// ARCHIVED.
4608+
sharePartition.cachedState().get(21L).batchAcquisitionLockTimeoutTask().run();
4609+
assertEquals(RecordState.ARCHIVED, sharePartition.cachedState().get(21L).batchState());
4610+
}
4611+
44964612
@Test
44974613
public void testLsoMovementForArchivingOffsets() {
44984614
SharePartition sharePartition = SharePartitionBuilder.builder().withState(SharePartitionState.ACTIVE).build();

0 commit comments

Comments
 (0)