Skip to content

Commit dc96e29

Browse files
KAFKA-19476: Correcting max delivery on write state failure and lock timeout (#20310)
Fixing max delivery check on acquisition lock timeout and write state RPC failure. When acquisition lock is already timed out and write state RPC failure occurs then we need to check if records need to be archived. However with the fix we do not persist the information, which is relevant as some records may be archived or delivery count is bumped. The information will be persisted eventually. The persister call has failed already hence issuing another persister call due to a failed persister call earlier is not correct. Rather let the data persist in future persister calls. Reviewers: Manikumar Reddy <[email protected]>, Abhinav Dixit <[email protected]>
1 parent cfe483b commit dc96e29

File tree

2 files changed

+120
-6
lines changed

2 files changed

+120
-6
lines changed

core/src/test/java/kafka/server/share/SharePartitionTest.java

Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7878,6 +7878,105 @@ public void testAcquisitionLockTimeoutWithWriteStateRPCFailure() throws Interrup
78787878
assertNull(sharePartition.cachedState().get(0L).batchAcquisitionLockTimeoutTask());
78797879
}
78807880

7881+
@Test
7882+
public void testRecordArchivedWithWriteStateRPCFailure() throws InterruptedException {
7883+
Persister persister = Mockito.mock(Persister.class);
7884+
SharePartition sharePartition = SharePartitionBuilder.builder()
7885+
.withState(SharePartitionState.ACTIVE)
7886+
.withDefaultAcquisitionLockTimeoutMs(ACQUISITION_LOCK_TIMEOUT_MS)
7887+
.withMaxDeliveryCount(2)
7888+
.withPersister(persister)
7889+
.build();
7890+
7891+
fetchAcquiredRecords(sharePartition, memoryRecords(5, 2), 5);
7892+
fetchAcquiredRecords(sharePartition, memoryRecords(5, 7), 5);
7893+
7894+
// Futures which will be completed later, so the batch state has ongoing transition.
7895+
CompletableFuture<WriteShareGroupStateResult> future1 = new CompletableFuture<>();
7896+
CompletableFuture<WriteShareGroupStateResult> future2 = new CompletableFuture<>();
7897+
Mockito.when(persister.writeState(Mockito.any())).thenReturn(future1).thenReturn(future2);
7898+
7899+
// Acknowledge batches.
7900+
sharePartition.acknowledge(MEMBER_ID, List.of(new ShareAcknowledgementBatch(3, 3, List.of(AcknowledgeType.ACCEPT.id))));
7901+
sharePartition.acknowledge(MEMBER_ID, List.of(new ShareAcknowledgementBatch(7, 11, List.of(AcknowledgeType.ACCEPT.id))));
7902+
7903+
assertEquals(RecordState.ACKNOWLEDGED, sharePartition.cachedState().get(2L).offsetState().get(3L).state());
7904+
assertEquals(1, sharePartition.cachedState().get(2L).offsetState().get(3L).deliveryCount());
7905+
assertEquals(RecordState.ACKNOWLEDGED, sharePartition.cachedState().get(7L).batchState());
7906+
assertEquals(1, sharePartition.cachedState().get(7L).batchDeliveryCount());
7907+
7908+
WriteShareGroupStateResult writeShareGroupStateResult = Mockito.mock(WriteShareGroupStateResult.class);
7909+
Mockito.when(writeShareGroupStateResult.topicsData()).thenReturn(List.of(
7910+
new TopicData<>(TOPIC_ID_PARTITION.topicId(), List.of(
7911+
PartitionFactory.newPartitionErrorData(0, Errors.GROUP_ID_NOT_FOUND.code(), Errors.GROUP_ID_NOT_FOUND.message())))));
7912+
7913+
future1.complete(writeShareGroupStateResult);
7914+
assertEquals(12, sharePartition.nextFetchOffset());
7915+
assertEquals(RecordState.ACQUIRED, sharePartition.cachedState().get(2L).offsetState().get(3L).state());
7916+
assertEquals(1, sharePartition.cachedState().get(2L).offsetState().get(3L).deliveryCount());
7917+
assertEquals(RecordState.ACKNOWLEDGED, sharePartition.cachedState().get(7L).batchState());
7918+
assertEquals(1, sharePartition.cachedState().get(7L).batchDeliveryCount());
7919+
7920+
future2.complete(writeShareGroupStateResult);
7921+
assertEquals(12L, sharePartition.nextFetchOffset());
7922+
assertEquals(RecordState.ACQUIRED, sharePartition.cachedState().get(2L).offsetState().get(3L).state());
7923+
assertEquals(1, sharePartition.cachedState().get(2L).offsetState().get(3L).deliveryCount());
7924+
assertEquals(RecordState.ACQUIRED, sharePartition.cachedState().get(7L).batchState());
7925+
assertEquals(1, sharePartition.cachedState().get(7L).batchDeliveryCount());
7926+
7927+
// Allowing acquisition lock to expire. This will also ensure that acquisition lock timeout task
7928+
// is run successfully post write state RPC failure.
7929+
mockTimer.advanceClock(DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS);
7930+
TestUtils.waitForCondition(
7931+
() -> sharePartition.cachedState().get(2L).offsetState().get(3L).state() == RecordState.AVAILABLE &&
7932+
sharePartition.cachedState().get(7L).batchState() == RecordState.AVAILABLE &&
7933+
sharePartition.cachedState().get(2L).offsetState().get(3L).deliveryCount() == 1 &&
7934+
sharePartition.cachedState().get(7L).batchDeliveryCount() == 1 &&
7935+
sharePartition.timer().size() == 0,
7936+
DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS,
7937+
() -> assertionFailedMessage(sharePartition, Map.of(2L, List.of(3L), 7L, List.of())));
7938+
// Acquisition lock timeout task has run already and next fetch offset is moved to 2.
7939+
assertEquals(2, sharePartition.nextFetchOffset());
7940+
// Send the same batches again.
7941+
fetchAcquiredRecords(sharePartition, memoryRecords(5, 2), 5);
7942+
fetchAcquiredRecords(sharePartition, memoryRecords(5, 7), 5);
7943+
7944+
future1 = new CompletableFuture<>();
7945+
future2 = new CompletableFuture<>();
7946+
Mockito.when(persister.writeState(Mockito.any())).thenReturn(future1).thenReturn(future2);
7947+
7948+
sharePartition.acknowledge(MEMBER_ID, List.of(new ShareAcknowledgementBatch(3, 3, List.of(AcknowledgeType.ACCEPT.id))));
7949+
sharePartition.acknowledge(MEMBER_ID, List.of(new ShareAcknowledgementBatch(7, 11, List.of(AcknowledgeType.ACCEPT.id))));
7950+
7951+
mockTimer.advanceClock(DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS);
7952+
// Verify the timer tasks have run and the state is archived for the offsets which are not acknowledged,
7953+
// but the acquisition lock timeout task should be just expired for acknowledged offsets, though
7954+
// the state should not be archived.
7955+
TestUtils.waitForCondition(
7956+
() -> sharePartition.cachedState().get(2L).offsetState().get(2L).state() == RecordState.ARCHIVED &&
7957+
sharePartition.cachedState().get(2L).offsetState().get(3L).state() == RecordState.ACKNOWLEDGED &&
7958+
sharePartition.cachedState().get(2L).offsetState().get(3L).acquisitionLockTimeoutTask().hasExpired() &&
7959+
sharePartition.cachedState().get(7L).batchState() == RecordState.ACKNOWLEDGED &&
7960+
sharePartition.cachedState().get(7L).batchAcquisitionLockTimeoutTask().hasExpired(),
7961+
DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS,
7962+
() -> assertionFailedMessage(sharePartition, Map.of(2L, List.of(3L), 7L, List.of())));
7963+
7964+
future1.complete(writeShareGroupStateResult);
7965+
// Now the state should be archived for the offsets despite the write state RPC failure, as the
7966+
// delivery count has reached the max delivery count and the acquisition lock timeout task
7967+
// has already expired for the offsets which were acknowledged.
7968+
assertEquals(12, sharePartition.nextFetchOffset());
7969+
assertEquals(RecordState.ARCHIVED, sharePartition.cachedState().get(2L).offsetState().get(3L).state());
7970+
assertEquals(2, sharePartition.cachedState().get(2L).offsetState().get(3L).deliveryCount());
7971+
assertEquals(RecordState.ACKNOWLEDGED, sharePartition.cachedState().get(7L).batchState());
7972+
assertEquals(2, sharePartition.cachedState().get(7L).batchDeliveryCount());
7973+
7974+
future2.complete(writeShareGroupStateResult);
7975+
assertEquals(12L, sharePartition.nextFetchOffset());
7976+
assertEquals(RecordState.ARCHIVED, sharePartition.cachedState().get(7L).batchState());
7977+
assertEquals(2, sharePartition.cachedState().get(7L).batchDeliveryCount());
7978+
}
7979+
78817980
/**
78827981
* This function produces transactional data of a given no. of records followed by a transactional marker (COMMIT/ABORT).
78837982
*/

server/src/main/java/org/apache/kafka/server/share/fetch/InFlightState.java

Lines changed: 21 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ public class InFlightState {
4848
private String memberId;
4949
// The state of the records before the transition. In case we need to revert an in-flight state, we revert the above
5050
// attributes of InFlightState to this state, namely - state, deliveryCount and memberId.
51-
private InFlightState rollbackState;
51+
private RollbackState rollbackState;
5252
// The timer task for the acquisition lock timeout.
5353
private AcquisitionLockTimerTask acquisitionLockTimeoutTask;
5454
// The boolean determines if the record has achieved a terminal state of ARCHIVED from which it cannot transition
@@ -205,7 +205,7 @@ public InFlightState startStateTransition(RecordState newState, DeliveryCountOps
205205
InFlightState currentState = new InFlightState(state, deliveryCount, memberId, acquisitionLockTimeoutTask);
206206
InFlightState updatedState = tryUpdateState(newState, ops, maxDeliveryCount, newMemberId);
207207
if (updatedState != null) {
208-
rollbackState = currentState;
208+
rollbackState = new RollbackState(currentState, maxDeliveryCount);
209209
}
210210
return updatedState;
211211
}
@@ -224,16 +224,23 @@ public void completeStateTransition(boolean commit) {
224224
rollbackState = null;
225225
return;
226226
}
227+
InFlightState previousState = rollbackState.state();
227228
// Check is acquisition lock timeout task is expired then mark the message as Available.
228229
if (acquisitionLockTimeoutTask != null && acquisitionLockTimeoutTask.hasExpired()) {
229-
state = RecordState.AVAILABLE;
230+
// If the acquisition lock timeout task has expired, we should mark the record as available.
231+
// However, if the delivery count has reached the maximum delivery count, we should archive the record.
232+
state = previousState.deliveryCount() >= rollbackState.maxDeliveryCount ?
233+
RecordState.ARCHIVED : RecordState.AVAILABLE;
230234
memberId = EMPTY_MEMBER_ID;
231235
cancelAndClearAcquisitionLockTimeoutTask();
232236
} else {
233-
state = rollbackState.state;
234-
memberId = rollbackState.memberId;
237+
state = previousState.state();
238+
memberId = previousState.memberId();
235239
}
236-
deliveryCount = rollbackState.deliveryCount();
240+
// Do not revert the delivery count as the delivery count should not be reverted for the failed
241+
// state transition. However, in the current implementation, the delivery count is only incremented
242+
// when the state is updated to Acquired, hence reverting the delivery count is not needed when
243+
// the state transition fails.
237244
rollbackState = null;
238245
}
239246

@@ -271,4 +278,12 @@ public String toString() {
271278
", memberId=" + memberId +
272279
")";
273280
}
281+
282+
/**
283+
* This record is used to store the state before the transition. It is used to revert the state if the transition fails.
284+
* @param state The state of the records before the transition.
285+
* @param maxDeliveryCount The maximum delivery count for the record.
286+
*/
287+
private record RollbackState(InFlightState state, int maxDeliveryCount) {
288+
}
274289
}

0 commit comments

Comments
 (0)