Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 3 additions & 5 deletions core/src/main/java/kafka/server/share/SharePartition.java
Original file line number Diff line number Diff line change
Expand Up @@ -2086,7 +2086,7 @@ static Map<Long, Byte> fetchAckTypeMapForBatch(ShareAcknowledgementBatch batch)
Map<Long, Byte> ackTypeMap = new HashMap<>();
for (int index = 0; index < batch.acknowledgeTypes().size(); index++) {
byte ackType = batch.acknowledgeTypes().get(index);
// Validate
// Validate ackType except values 0, since 0 stands for gap.
if (ackType != 0) {
AcknowledgeType.forId(ackType);
}
Expand Down Expand Up @@ -2304,7 +2304,7 @@ private Optional<Throwable> acknowledgePerOffsetBatchRecords(
// Only valid for ACQUIRED offsets; the check above ensures this.
long key = offsetState.getKey();
InFlightState state = offsetState.getValue();
log.debug("Renewing acq lock for {}-{} with offset {} in batch {} for member {}.",
log.debug("Renewing acquisition lock for {}-{} with offset {} in batch {} for member {}.",
groupId, topicIdPartition, key, inFlightBatch, memberId);
state.cancelAndClearAcquisitionLockTimeoutTask();
AcquisitionLockTimerTask renewalTask = scheduleAcquisitionLockTimeout(memberId, key, key);
Expand All @@ -2315,7 +2315,6 @@ private Optional<Throwable> acknowledgePerOffsetBatchRecords(
// mapping between bytes and record state type. All ack types have been added except for RENEW which
// has been handled above.
RecordState recordState = ACK_TYPE_TO_RECORD_STATE.get(ackType);
Objects.requireNonNull(recordState);

InFlightState updateResult = offsetState.getValue().startStateTransition(
recordState,
Expand Down Expand Up @@ -2372,7 +2371,7 @@ private Optional<Throwable> acknowledgeCompleteBatch(
if (ackType == AcknowledgeType.RENEW.id) {
// Renew the acquisition lock timer for the complete batch. We have already
// checked that the batchState is ACQUIRED above.
log.debug("Renewing acq lock for {}-{} with batch {}-{} for member {}.",
log.debug("Renewing acquisition lock for {}-{} with batch {}-{} for member {}.",
groupId, topicIdPartition, inFlightBatch.firstOffset(), inFlightBatch.lastOffset(), memberId);
inFlightBatch.cancelAndClearAcquisitionLockTimeoutTask();
AcquisitionLockTimerTask renewalTask = scheduleAcquisitionLockTimeout(memberId,
Expand All @@ -2387,7 +2386,6 @@ private Optional<Throwable> acknowledgeCompleteBatch(
// either released or moved to a state where member id existence is not important. The member id
// is only important when the batch is acquired.
RecordState recordState = ACK_TYPE_TO_RECORD_STATE.get(ackType);
Objects.requireNonNull(recordState);
Comment on lines 2391 to -2390
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

RecordState recordState = Objects.requireNonNull(ACK_TYPE_TO_RECORD_STATE.get(ackType));

Copy link
Contributor Author

@adixitconfluent adixitconfluent Jan 2, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we require Objects.requireNonNull()since this code would be covered when we perform fetchAckTypeMapForBatch and it should be covered there. Hence, I thought to remove it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For now ofcourse yes, but the check was placed so one do not forget to update the ACK_TYPE_TO_RECORD_STATE map while adding new values.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

then maybe lets not have Objects.requireNonNull since it throws a NullPointerException, rather we throw an IllegalRequestException or InvalidRequestException, wdyt?

Copy link
Contributor

@apoorvmittal10 apoorvmittal10 Jan 2, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Anything is fine but some exception should highlight the map is missing the required value.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done


InFlightState updateResult = inFlightBatch.startBatchStateTransition(
recordState,
Expand Down
40 changes: 40 additions & 0 deletions core/src/test/java/kafka/server/share/SharePartitionTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -12069,6 +12069,46 @@ public void testReleaseAcquiredRecordsPerOffsetWithDifferentMemberId() {
assertEquals(EMPTY_MEMBER_ID, sharePartition.cachedState().get(5L).offsetState().get(14L).memberId());
}

@Test
public void testInvalidAcknowledgeTypeInBatchAcknowledgement() {
ReplicaManager replicaManager = Mockito.mock(ReplicaManager.class);
SharePartition sharePartition = SharePartitionBuilder.builder()
.withReplicaManager(replicaManager)
.withState(SharePartitionState.ACTIVE)
.build();

MemoryRecords records1 = memoryRecords(1, 10);
List<AcquiredRecords> acquiredRecordsList = fetchAcquiredRecords(sharePartition, records1, 10);
assertEquals(1, acquiredRecordsList.size());

// Invalid acknowledge type 5.
CompletableFuture<Void> ackResult = sharePartition.acknowledge(
MEMBER_ID,
List.of(new ShareAcknowledgementBatch(1, 10, List.of((byte) 5))));
assertTrue(ackResult.isCompletedExceptionally());
assertFutureThrows(InvalidRequestException.class, ackResult);
}

@Test
public void testInvalidAcknowledgeTypeInSubsetAcknowledgement() {
ReplicaManager replicaManager = Mockito.mock(ReplicaManager.class);
SharePartition sharePartition = SharePartitionBuilder.builder()
.withReplicaManager(replicaManager)
.withState(SharePartitionState.ACTIVE)
.build();

MemoryRecords records1 = memoryRecords(1, 10);
List<AcquiredRecords> acquiredRecordsList = fetchAcquiredRecords(sharePartition, records1, 10);
assertEquals(1, acquiredRecordsList.size());

// Invalid acknowledge type -1.
CompletableFuture<Void> ackResult = sharePartition.acknowledge(
MEMBER_ID,
List.of(new ShareAcknowledgementBatch(2, 3, List.of((byte) -1))));
assertTrue(ackResult.isCompletedExceptionally());
assertFutureThrows(InvalidRequestException.class, ackResult);
}

/**
* This function produces transactional data of a given no. of records followed by a transactional marker (COMMIT/ABORT).
*/
Expand Down