@@ -803,7 +803,7 @@ public ShareAcquiredRecords acquire(
803803 }
804804
805805 InFlightState updateResult = inFlightBatch .tryUpdateBatchState (RecordState .ACQUIRED , DeliveryCountOps .INCREASE , maxDeliveryCount , memberId );
806- if (updateResult == null ) {
806+ if (updateResult == null || updateResult . state () != RecordState . ACQUIRED ) {
807807 log .info ("Unable to acquire records for the batch: {} in share partition: {}-{}" ,
808808 inFlightBatch , groupId , topicIdPartition );
809809 continue ;
@@ -1009,12 +1009,7 @@ private Optional<Throwable> releaseAcquiredRecordsForPerOffsetBatch(String membe
10091009 updatedStates .add (updateResult );
10101010 stateBatches .add (new PersisterStateBatch (offsetState .getKey (), offsetState .getKey (),
10111011 updateResult .state ().id (), (short ) updateResult .deliveryCount ()));
1012-
1013- // If the maxDeliveryCount limit has been exceeded, the record will be transitioned to ARCHIVED state.
1014- // This should not change the next fetch offset because the record is not available for acquisition
1015- if (updateResult .state () != RecordState .ARCHIVED ) {
1016- updateFindNextFetchOffset (true );
1017- }
1012+ // Do not update the next fetch offset as the offset has not completed the transition yet.
10181013 }
10191014 }
10201015 return Optional .empty ();
@@ -1054,12 +1049,7 @@ private Optional<Throwable> releaseAcquiredRecordsForCompleteBatch(String member
10541049 updatedStates .add (updateResult );
10551050 stateBatches .add (new PersisterStateBatch (inFlightBatch .firstOffset (), inFlightBatch .lastOffset (),
10561051 updateResult .state ().id (), (short ) updateResult .deliveryCount ()));
1057-
1058- // If the maxDeliveryCount limit has been exceeded, the record will be transitioned to ARCHIVED state.
1059- // This should not change the next fetch offset because the record is not available for acquisition
1060- if (updateResult .state () != RecordState .ARCHIVED ) {
1061- updateFindNextFetchOffset (true );
1062- }
1052+ // Do not update the next fetch offset as the batch has not completed the transition yet.
10631053 }
10641054 return Optional .empty ();
10651055 }
@@ -1641,7 +1631,7 @@ private int acquireSubsetBatchRecords(
16411631
16421632 InFlightState updateResult = offsetState .getValue ().tryUpdateState (RecordState .ACQUIRED , DeliveryCountOps .INCREASE ,
16431633 maxDeliveryCount , memberId );
1644- if (updateResult == null ) {
1634+ if (updateResult == null || updateResult . state () != RecordState . ACQUIRED ) {
16451635 log .trace ("Unable to acquire records for the offset: {} in batch: {}"
16461636 + " for the share partition: {}-{}" , offsetState .getKey (), inFlightBatch ,
16471637 groupId , topicIdPartition );
@@ -1941,12 +1931,7 @@ private Optional<Throwable> acknowledgePerOffsetBatchRecords(
19411931 updatedStates .add (updateResult );
19421932 stateBatches .add (new PersisterStateBatch (offsetState .getKey (), offsetState .getKey (),
19431933 updateResult .state ().id (), (short ) updateResult .deliveryCount ()));
1944- // If the maxDeliveryCount limit has been exceeded, the record will be transitioned to ARCHIVED state.
1945- // This should not change the next fetch offset because the record is not available for acquisition
1946- if (recordState == RecordState .AVAILABLE
1947- && updateResult .state () != RecordState .ARCHIVED ) {
1948- updateFindNextFetchOffset (true );
1949- }
1934+ // Do not update the nextFetchOffset as the offset has not completed the transition yet.
19501935 }
19511936 } finally {
19521937 lock .writeLock ().unlock ();
@@ -1996,13 +1981,7 @@ private Optional<Throwable> acknowledgeCompleteBatch(
19961981 stateBatches .add (
19971982 new PersisterStateBatch (inFlightBatch .firstOffset (), inFlightBatch .lastOffset (),
19981983 updateResult .state ().id (), (short ) updateResult .deliveryCount ()));
1999-
2000- // If the maxDeliveryCount limit has been exceeded, the record will be transitioned to ARCHIVED state.
2001- // This should not change the nextFetchOffset because the record is not available for acquisition
2002- if (recordState == RecordState .AVAILABLE
2003- && updateResult .state () != RecordState .ARCHIVED ) {
2004- updateFindNextFetchOffset (true );
2005- }
1984+ // Do not update the next fetch offset as the batch has not completed the transition yet.
20061985 } finally {
20071986 lock .writeLock ().unlock ();
20081987 }
0 commit comments