Skip to content

Commit 7b017fa

Browse files
MINOR: Correcting the throttling condition for batch in Share Partition (#20978)
The PR adds a check for throttling condition to not consider ongoing state transition for batch and offsets. Tests has been added and corrected which verifies the behaviour. Reviewers: Andrew Schofield <[email protected]>
1 parent 8fe1157 commit 7b017fa

File tree

2 files changed

+125
-6
lines changed

2 files changed

+125
-6
lines changed

core/src/main/java/kafka/server/share/SharePartition.java

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2027,7 +2027,14 @@ private boolean checkForStartOffsetWithinBatch(long batchFirstOffset, long batch
20272027
*/
20282028
private boolean shouldThrottleRecordsDelivery(InFlightBatch inFlightBatch, long requestFirstOffset, long requestLastOffset) {
20292029
if (inFlightBatch.offsetState() == null) {
2030-
return inFlightBatch.batchDeliveryCount() >= throttleRecordsDeliveryLimit;
2030+
// If offsetState is null, it means the batch is not split and represents a single batch.
2031+
// Check if the batch is in AVAILABLE state and has no ongoing transition.
2032+
// The requested batch shall always be within the request first and last offset as the sub
2033+
// map batches are only fetched to consider.
2034+
if (inFlightBatch.batchState() == RecordState.AVAILABLE && !inFlightBatch.batchHasOngoingStateTransition()) {
2035+
return inFlightBatch.batchDeliveryCount() >= throttleRecordsDeliveryLimit;
2036+
}
2037+
return false;
20312038
}
20322039

20332040
return inFlightBatch.offsetState().entrySet().stream().filter(entry -> {
@@ -2037,10 +2044,7 @@ private boolean shouldThrottleRecordsDelivery(InFlightBatch inFlightBatch, long
20372044
if (entry.getKey() > requestLastOffset) {
20382045
return false;
20392046
}
2040-
if (entry.getValue().state() != RecordState.AVAILABLE) {
2041-
return false;
2042-
}
2043-
return true;
2047+
return entry.getValue().state() == RecordState.AVAILABLE && !entry.getValue().hasOngoingStateTransition();
20442048
}).mapToInt(entry -> entry.getValue().deliveryCount()).max().orElse(0) >= throttleRecordsDeliveryLimit;
20452049
}
20462050

core/src/test/java/kafka/server/share/SharePartitionTest.java

Lines changed: 116 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11221,7 +11221,7 @@ public void testThrottleRecordsWhenPendingDeliveriesExist() {
1122111221
PartitionFactory.newPartitionAllData(0, 3, 5L, Errors.NONE.code(), Errors.NONE.message(),
1122211222
List.of(
1122311223
new PersisterStateBatch(15L, 19L, RecordState.AVAILABLE.id, (short) 2),
11224-
new PersisterStateBatch(20L, 22L, RecordState.ARCHIVED.id, (short) 2),
11224+
new PersisterStateBatch(20L, 22L, RecordState.ARCHIVED.id, (short) 3),
1122511225
new PersisterStateBatch(26L, 30L, RecordState.AVAILABLE.id, (short) 3)))))));
1122611226
Mockito.when(persister.readState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(readShareGroupStateResult));
1122711227
SharePartition sharePartition = SharePartitionBuilder.builder().withPersister(persister).build();
@@ -11610,6 +11610,121 @@ public void testAcquisitionNotThrottledIfHighDeliveryCountRecordNotAcquired() {
1161011610
assertArrayEquals(expectedAcquiredRecords.toArray(), acquiredRecordsList.toArray());
1161111611
}
1161211612

11613+
@Test
11614+
public void testAcquisitionThrottlingWithOngoingStateTransition() {
11615+
Persister persister = Mockito.mock(Persister.class);
11616+
ReadShareGroupStateResult readShareGroupStateResult = Mockito.mock(ReadShareGroupStateResult.class);
11617+
Mockito.when(readShareGroupStateResult.topicsData()).thenReturn(List.of(
11618+
new TopicData<>(TOPIC_ID_PARTITION.topicId(), List.of(
11619+
PartitionFactory.newPartitionAllData(0, 3, 15L, Errors.NONE.code(), Errors.NONE.message(),
11620+
List.of(
11621+
new PersisterStateBatch(15L, 19L, RecordState.AVAILABLE.id, (short) 1),
11622+
// Batch of 20-24 has been set to delivery count of 2 so in next acquisition it will be 3,
11623+
// and post that it should be throttled but because of pending state transition it
11624+
// should not be throttled.
11625+
new PersisterStateBatch(20L, 24L, RecordState.AVAILABLE.id, (short) 2),
11626+
new PersisterStateBatch(25L, 29L, RecordState.AVAILABLE.id, (short) 2),
11627+
new PersisterStateBatch(30L, 34L, RecordState.AVAILABLE.id, (short) 2),
11628+
// Similarly, batch of 35-39 has been set to delivery count of 2 so in next offset
11629+
// acquisition, some offsets will be at 3 delivery count, and post that offsets
11630+
// should be throttled but because of pending state transition they will not be throttled.
11631+
new PersisterStateBatch(35, 39L, RecordState.AVAILABLE.id, (short) 2),
11632+
new PersisterStateBatch(40, 44L, RecordState.ARCHIVED.id, (short) 5),
11633+
new PersisterStateBatch(45, 49L, RecordState.AVAILABLE.id, (short) 1)))))));
11634+
Mockito.when(persister.readState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(readShareGroupStateResult));
11635+
SharePartition sharePartition = SharePartitionBuilder.builder().withPersister(persister).build();
11636+
11637+
CompletableFuture<Void> result = sharePartition.maybeInitialize();
11638+
assertTrue(result.isDone());
11639+
assertFalse(result.isCompletedExceptionally());
11640+
11641+
// Acquire batches 20-24 and 36-37 (offset based) and create a pending state transition.
11642+
fetchAcquiredRecords(sharePartition, memoryRecords(20, 5), 5);
11643+
assertEquals(RecordState.ACQUIRED, sharePartition.cachedState().get(20L).batchState());
11644+
fetchAcquiredRecords(sharePartition, memoryRecords(36, 2), 2);
11645+
assertEquals(RecordState.AVAILABLE, sharePartition.cachedState().get(35L).offsetState().get(35L).state());
11646+
assertEquals(RecordState.ACQUIRED, sharePartition.cachedState().get(35L).offsetState().get(36L).state());
11647+
assertEquals(RecordState.ACQUIRED, sharePartition.cachedState().get(35L).offsetState().get(37L).state());
11648+
assertEquals(RecordState.AVAILABLE, sharePartition.cachedState().get(35L).offsetState().get(38L).state());
11649+
11650+
// Create a pending future which will block state updates.
11651+
CompletableFuture<WriteShareGroupStateResult> future = new CompletableFuture<>();
11652+
Mockito.when(persister.writeState(Mockito.any())).thenReturn(future);
11653+
11654+
// Release batch of 20-24 and offset 36-37, which will have pending state transition.
11655+
sharePartition.acknowledge(
11656+
MEMBER_ID,
11657+
List.of(new ShareAcknowledgementBatch(20, 24, List.of(AcknowledgeType.RELEASE.id)),
11658+
new ShareAcknowledgementBatch(36, 37, List.of(AcknowledgeType.RELEASE.id))));
11659+
11660+
assertEquals(RecordState.AVAILABLE, sharePartition.cachedState().get(20L).batchState());
11661+
assertEquals(RecordState.AVAILABLE, sharePartition.cachedState().get(35L).offsetState().get(36L).state());
11662+
assertEquals(RecordState.AVAILABLE, sharePartition.cachedState().get(35L).offsetState().get(37L).state());
11663+
11664+
assertTrue(sharePartition.cachedState().get(20L).batchHasOngoingStateTransition());
11665+
assertFalse(sharePartition.cachedState().get(35L).offsetState().get(35L).hasOngoingStateTransition());
11666+
assertTrue(sharePartition.cachedState().get(35L).offsetState().get(36L).hasOngoingStateTransition());
11667+
assertTrue(sharePartition.cachedState().get(35L).offsetState().get(37L).hasOngoingStateTransition());
11668+
assertFalse(sharePartition.cachedState().get(35L).offsetState().get(38L).hasOngoingStateTransition());
11669+
11670+
ByteBuffer buffer = ByteBuffer.allocate(4096);
11671+
memoryRecordsBuilder(buffer, 15, 5).close();
11672+
memoryRecordsBuilder(buffer, 20, 5).close();
11673+
memoryRecordsBuilder(buffer, 25, 5).close();
11674+
memoryRecordsBuilder(buffer, 30, 5).close();
11675+
memoryRecordsBuilder(buffer, 35, 5).close();
11676+
memoryRecordsBuilder(buffer, 40, 5).close();
11677+
memoryRecordsBuilder(buffer, 45, 5).close();
11678+
buffer.flip();
11679+
MemoryRecords records = MemoryRecords.readableRecords(buffer);
11680+
11681+
// Acquire batches and batch 15-19, 25-29 will be acquired as batch 20-24 has pending state transition.
11682+
// Without pending transition, the acquisition would have happened only for 20-24 batch as the batch
11683+
// 20-24 would have marked to be throttled but eventually couldn't be acquired because of state transition.
11684+
fetchAcquiredRecords(sharePartition.acquire(
11685+
MEMBER_ID,
11686+
ShareAcquireMode.BATCH_OPTIMIZED,
11687+
BATCH_SIZE,
11688+
10,
11689+
15,
11690+
fetchPartitionData(records),
11691+
FETCH_ISOLATION_HWM),
11692+
10);
11693+
11694+
assertEquals(7, sharePartition.cachedState().size());
11695+
assertEquals(RecordState.ACQUIRED, sharePartition.cachedState().get(15L).batchState());
11696+
assertEquals(RecordState.AVAILABLE, sharePartition.cachedState().get(20L).batchState());
11697+
assertTrue(sharePartition.cachedState().get(20L).batchHasOngoingStateTransition());
11698+
assertEquals(RecordState.ACQUIRED, sharePartition.cachedState().get(25L).batchState());
11699+
assertEquals(RecordState.AVAILABLE, sharePartition.cachedState().get(30L).batchState());
11700+
assertEquals(RecordState.AVAILABLE, sharePartition.cachedState().get(35L).offsetState().get(35L).state());
11701+
assertEquals(RecordState.AVAILABLE, sharePartition.cachedState().get(35L).offsetState().get(36L).state());
11702+
assertEquals(RecordState.AVAILABLE, sharePartition.cachedState().get(35L).offsetState().get(37L).state());
11703+
assertEquals(RecordState.AVAILABLE, sharePartition.cachedState().get(35L).offsetState().get(38L).state());
11704+
assertEquals(RecordState.ARCHIVED, sharePartition.cachedState().get(40L).batchState());
11705+
assertEquals(RecordState.AVAILABLE, sharePartition.cachedState().get(45L).batchState());
11706+
11707+
// Re-trigger the acquisition and rest all the records will be acquired, including the offsets
11708+
// ones. The throttling should not happen because of pending state transition.
11709+
List<AcquiredRecords> acquiredRecordsList = fetchAcquiredRecords(sharePartition.acquire(
11710+
MEMBER_ID,
11711+
ShareAcquireMode.BATCH_OPTIMIZED,
11712+
BATCH_SIZE,
11713+
500,
11714+
15,
11715+
fetchPartitionData(records),
11716+
FETCH_ISOLATION_HWM),
11717+
13);
11718+
11719+
List<AcquiredRecords> expectedAcquiredRecords = new ArrayList<>(expectedAcquiredRecord(30, 34, 3));
11720+
expectedAcquiredRecords.addAll(expectedAcquiredRecord(35, 35, 3));
11721+
expectedAcquiredRecords.addAll(expectedAcquiredRecord(38, 38, 3));
11722+
expectedAcquiredRecords.addAll(expectedAcquiredRecord(39, 39, 3));
11723+
expectedAcquiredRecords.addAll(expectedAcquiredRecord(45, 49, 2));
11724+
11725+
assertArrayEquals(expectedAcquiredRecords.toArray(), acquiredRecordsList.toArray());
11726+
}
11727+
1161311728
/**
1161411729
* This function produces transactional data of a given no. of records followed by a transactional marker (COMMIT/ABORT).
1161511730
*/

0 commit comments

Comments
 (0)