@@ -884,8 +884,7 @@ public CompletableFuture<Void> acknowledge(
884
884
885
885
CompletableFuture <Void > future = new CompletableFuture <>();
886
886
Throwable throwable = null ;
887
- List <InFlightState > updatedStates = new ArrayList <>();
888
- List <PersisterStateBatch > stateBatches = new ArrayList <>();
887
+ List <PersisterBatch > persisterBatches = new ArrayList <>();
889
888
lock .writeLock ().lock ();
890
889
try {
891
890
// Avoided using enhanced for loop as need to check if the last batch have offsets
@@ -925,8 +924,7 @@ public CompletableFuture<Void> acknowledge(
925
924
batch ,
926
925
recordStateMap ,
927
926
subMap ,
928
- updatedStates ,
929
- stateBatches
927
+ persisterBatches
930
928
);
931
929
932
930
if (ackThrowable .isPresent ()) {
@@ -939,7 +937,7 @@ public CompletableFuture<Void> acknowledge(
939
937
}
940
938
// If the acknowledgement is successful then persist state, complete the state transition
941
939
// and update the cached state for start offset. Else rollback the state transition.
942
- rollbackOrProcessStateUpdates (future , throwable , updatedStates , stateBatches );
940
+ rollbackOrProcessStateUpdates (future , throwable , persisterBatches );
943
941
return future ;
944
942
}
945
943
@@ -955,8 +953,7 @@ public CompletableFuture<Void> releaseAcquiredRecords(String memberId) {
955
953
956
954
CompletableFuture <Void > future = new CompletableFuture <>();
957
955
Throwable throwable = null ;
958
- List <InFlightState > updatedStates = new ArrayList <>();
959
- List <PersisterStateBatch > stateBatches = new ArrayList <>();
956
+ List <PersisterBatch > persisterBatches = new ArrayList <>();
960
957
961
958
lock .writeLock ().lock ();
962
959
try {
@@ -975,14 +972,14 @@ && checkForStartOffsetWithinBatch(inFlightBatch.firstOffset(), inFlightBatch.las
975
972
}
976
973
977
974
if (inFlightBatch .offsetState () != null ) {
978
- Optional <Throwable > releaseAcquiredRecordsThrowable = releaseAcquiredRecordsForPerOffsetBatch (memberId , inFlightBatch , recordState , updatedStates , stateBatches );
975
+ Optional <Throwable > releaseAcquiredRecordsThrowable = releaseAcquiredRecordsForPerOffsetBatch (memberId , inFlightBatch , recordState , persisterBatches );
979
976
if (releaseAcquiredRecordsThrowable .isPresent ()) {
980
977
throwable = releaseAcquiredRecordsThrowable .get ();
981
978
break ;
982
979
}
983
980
continue ;
984
981
}
985
- Optional <Throwable > releaseAcquiredRecordsThrowable = releaseAcquiredRecordsForCompleteBatch (memberId , inFlightBatch , recordState , updatedStates , stateBatches );
982
+ Optional <Throwable > releaseAcquiredRecordsThrowable = releaseAcquiredRecordsForCompleteBatch (memberId , inFlightBatch , recordState , persisterBatches );
986
983
if (releaseAcquiredRecordsThrowable .isPresent ()) {
987
984
throwable = releaseAcquiredRecordsThrowable .get ();
988
985
break ;
@@ -993,7 +990,7 @@ && checkForStartOffsetWithinBatch(inFlightBatch.firstOffset(), inFlightBatch.las
993
990
}
994
991
// If the release acquired records is successful then persist state, complete the state transition
995
992
// and update the cached state for start offset. Else rollback the state transition.
996
- rollbackOrProcessStateUpdates (future , throwable , updatedStates , stateBatches );
993
+ rollbackOrProcessStateUpdates (future , throwable , persisterBatches );
997
994
return future ;
998
995
}
999
996
@@ -1004,8 +1001,7 @@ long loadStartTimeMs() {
1004
1001
private Optional <Throwable > releaseAcquiredRecordsForPerOffsetBatch (String memberId ,
1005
1002
InFlightBatch inFlightBatch ,
1006
1003
RecordState recordState ,
1007
- List <InFlightState > updatedStates ,
1008
- List <PersisterStateBatch > stateBatches ) {
1004
+ List <PersisterBatch > persisterBatches ) {
1009
1005
1010
1006
log .trace ("Offset tracked batch record found, batch: {} for the share partition: {}-{}" , inFlightBatch ,
1011
1007
groupId , topicIdPartition );
@@ -1032,10 +1028,9 @@ private Optional<Throwable> releaseAcquiredRecordsForPerOffsetBatch(String membe
1032
1028
return Optional .of (new InvalidRecordStateException ("Unable to release acquired records for the offset" ));
1033
1029
}
1034
1030
1035
- // Successfully updated the state of the offset.
1036
- updatedStates .add (updateResult );
1037
- stateBatches .add (new PersisterStateBatch (offsetState .getKey (), offsetState .getKey (),
1038
- updateResult .state ().id (), (short ) updateResult .deliveryCount ()));
1031
+ // Successfully updated the state of the offset and created a persister state batch for write to persister.
1032
+ persisterBatches .add (new PersisterBatch (updateResult , new PersisterStateBatch (offsetState .getKey (),
1033
+ offsetState .getKey (), updateResult .state ().id (), (short ) updateResult .deliveryCount ())));
1039
1034
// Do not update the next fetch offset as the offset has not completed the transition yet.
1040
1035
}
1041
1036
}
@@ -1045,8 +1040,7 @@ private Optional<Throwable> releaseAcquiredRecordsForPerOffsetBatch(String membe
1045
1040
private Optional <Throwable > releaseAcquiredRecordsForCompleteBatch (String memberId ,
1046
1041
InFlightBatch inFlightBatch ,
1047
1042
RecordState recordState ,
1048
- List <InFlightState > updatedStates ,
1049
- List <PersisterStateBatch > stateBatches ) {
1043
+ List <PersisterBatch > persisterBatches ) {
1050
1044
1051
1045
// Check if member id is the owner of the batch.
1052
1046
if (!inFlightBatch .batchMemberId ().equals (memberId ) && !inFlightBatch .batchMemberId ().equals (EMPTY_MEMBER_ID )) {
@@ -1072,10 +1066,9 @@ private Optional<Throwable> releaseAcquiredRecordsForCompleteBatch(String member
1072
1066
return Optional .of (new InvalidRecordStateException ("Unable to release acquired records for the batch" ));
1073
1067
}
1074
1068
1075
- // Successfully updated the state of the batch.
1076
- updatedStates .add (updateResult );
1077
- stateBatches .add (new PersisterStateBatch (inFlightBatch .firstOffset (), inFlightBatch .lastOffset (),
1078
- updateResult .state ().id (), (short ) updateResult .deliveryCount ()));
1069
+ // Successfully updated the state of the batch and created a persister state batch for write to persister.
1070
+ persisterBatches .add (new PersisterBatch (updateResult , new PersisterStateBatch (inFlightBatch .firstOffset (),
1071
+ inFlightBatch .lastOffset (), updateResult .state ().id (), (short ) updateResult .deliveryCount ())));
1079
1072
// Do not update the next fetch offset as the batch has not completed the transition yet.
1080
1073
}
1081
1074
return Optional .empty ();
@@ -1826,8 +1819,7 @@ private Optional<Throwable> acknowledgeBatchRecords(
1826
1819
ShareAcknowledgementBatch batch ,
1827
1820
Map <Long , RecordState > recordStateMap ,
1828
1821
NavigableMap <Long , InFlightBatch > subMap ,
1829
- final List <InFlightState > updatedStates ,
1830
- List <PersisterStateBatch > stateBatches
1822
+ List <PersisterBatch > persisterBatches
1831
1823
) {
1832
1824
Optional <Throwable > throwable ;
1833
1825
lock .writeLock ().lock ();
@@ -1889,11 +1881,11 @@ private Optional<Throwable> acknowledgeBatchRecords(
1889
1881
}
1890
1882
1891
1883
throwable = acknowledgePerOffsetBatchRecords (memberId , batch , inFlightBatch ,
1892
- recordStateMap , updatedStates , stateBatches );
1884
+ recordStateMap , persisterBatches );
1893
1885
} else {
1894
1886
// The in-flight batch is a full match hence change the state of the complete batch.
1895
1887
throwable = acknowledgeCompleteBatch (batch , inFlightBatch ,
1896
- recordStateMap .get (batch .firstOffset ()), updatedStates , stateBatches );
1888
+ recordStateMap .get (batch .firstOffset ()), persisterBatches );
1897
1889
}
1898
1890
1899
1891
if (throwable .isPresent ()) {
@@ -1930,8 +1922,7 @@ private Optional<Throwable> acknowledgePerOffsetBatchRecords(
1930
1922
ShareAcknowledgementBatch batch ,
1931
1923
InFlightBatch inFlightBatch ,
1932
1924
Map <Long , RecordState > recordStateMap ,
1933
- List <InFlightState > updatedStates ,
1934
- List <PersisterStateBatch > stateBatches
1925
+ List <PersisterBatch > persisterBatches
1935
1926
) {
1936
1927
lock .writeLock ().lock ();
1937
1928
try {
@@ -1995,10 +1986,9 @@ private Optional<Throwable> acknowledgePerOffsetBatchRecords(
1995
1986
return Optional .of (new InvalidRecordStateException (
1996
1987
"Unable to acknowledge records for the batch" ));
1997
1988
}
1998
- // Successfully updated the state of the offset.
1999
- updatedStates .add (updateResult );
2000
- stateBatches .add (new PersisterStateBatch (offsetState .getKey (), offsetState .getKey (),
2001
- updateResult .state ().id (), (short ) updateResult .deliveryCount ()));
1989
+ // Successfully updated the state of the offset and created a persister state batch for write to persister.
1990
+ persisterBatches .add (new PersisterBatch (updateResult , new PersisterStateBatch (offsetState .getKey (),
1991
+ offsetState .getKey (), updateResult .state ().id (), (short ) updateResult .deliveryCount ())));
2002
1992
// Do not update the nextFetchOffset as the offset has not completed the transition yet.
2003
1993
}
2004
1994
} finally {
@@ -2011,8 +2001,7 @@ private Optional<Throwable> acknowledgeCompleteBatch(
2011
2001
ShareAcknowledgementBatch batch ,
2012
2002
InFlightBatch inFlightBatch ,
2013
2003
RecordState recordState ,
2014
- List <InFlightState > updatedStates ,
2015
- List <PersisterStateBatch > stateBatches
2004
+ List <PersisterBatch > persisterBatches
2016
2005
) {
2017
2006
lock .writeLock ().lock ();
2018
2007
try {
@@ -2044,11 +2033,9 @@ private Optional<Throwable> acknowledgeCompleteBatch(
2044
2033
new InvalidRecordStateException ("Unable to acknowledge records for the batch" ));
2045
2034
}
2046
2035
2047
- // Successfully updated the state of the batch.
2048
- updatedStates .add (updateResult );
2049
- stateBatches .add (
2050
- new PersisterStateBatch (inFlightBatch .firstOffset (), inFlightBatch .lastOffset (),
2051
- updateResult .state ().id (), (short ) updateResult .deliveryCount ()));
2036
+ // Successfully updated the state of the batch and created a persister state batch for write to persister.
2037
+ persisterBatches .add (new PersisterBatch (updateResult , new PersisterStateBatch (inFlightBatch .firstOffset (),
2038
+ inFlightBatch .lastOffset (), updateResult .state ().id (), (short ) updateResult .deliveryCount ())));
2052
2039
// Do not update the next fetch offset as the batch has not completed the transition yet.
2053
2040
} finally {
2054
2041
lock .writeLock ().unlock ();
@@ -2090,74 +2077,74 @@ SharePartitionState partitionState() {
2090
2077
void rollbackOrProcessStateUpdates (
2091
2078
CompletableFuture <Void > future ,
2092
2079
Throwable throwable ,
2093
- List <InFlightState > updatedStates ,
2094
- List <PersisterStateBatch > stateBatches
2080
+ List <PersisterBatch > persisterBatches
2095
2081
) {
2096
2082
lock .writeLock ().lock ();
2097
2083
try {
2098
2084
if (throwable != null ) {
2099
2085
// Log in DEBUG to avoid flooding of logs for a faulty client.
2100
2086
log .debug ("Request failed for updating state, rollback any changed state"
2101
2087
+ " for the share partition: {}-{}" , groupId , topicIdPartition );
2102
- updatedStates .forEach (state -> {
2103
- state .completeStateTransition (false );
2104
- if (state .state () == RecordState .AVAILABLE ) {
2088
+ persisterBatches .forEach (persisterBatch -> {
2089
+ persisterBatch . updatedState .completeStateTransition (false );
2090
+ if (persisterBatch . updatedState .state () == RecordState .AVAILABLE ) {
2105
2091
updateFindNextFetchOffset (true );
2106
2092
}
2107
2093
});
2108
2094
future .completeExceptionally (throwable );
2109
2095
return ;
2110
2096
}
2111
2097
2112
- if (stateBatches . isEmpty () && updatedStates .isEmpty ()) {
2098
+ if (persisterBatches .isEmpty ()) {
2113
2099
future .complete (null );
2114
2100
return ;
2115
2101
}
2116
2102
} finally {
2117
2103
lock .writeLock ().unlock ();
2118
2104
}
2119
2105
2120
- writeShareGroupState (stateBatches ).whenComplete ((result , exception ) -> {
2121
- // There can be a pending delayed share fetch requests for the share partition which are waiting
2122
- // on the startOffset to move ahead, hence track if the state is updated in the cache. If
2123
- // yes, then notify the delayed share fetch purgatory to complete the pending requests.
2124
- boolean cacheStateUpdated = false ;
2125
- lock .writeLock ().lock ();
2126
- try {
2127
- if (exception != null ) {
2128
- log .debug ("Failed to write state to persister for the share partition: {}-{}" ,
2129
- groupId , topicIdPartition , exception );
2130
- // In case of failure when transition state is rolled back then it should be rolled
2131
- // back to ACQUIRED state, unless acquisition lock for the state has expired.
2132
- updatedStates .forEach (state -> {
2133
- state .completeStateTransition (false );
2134
- if (state .state () == RecordState .AVAILABLE ) {
2106
+ writeShareGroupState (persisterBatches .stream ().map (PersisterBatch ::stateBatch ).toList ())
2107
+ .whenComplete ((result , exception ) -> {
2108
+ // There can be a pending delayed share fetch requests for the share partition which are waiting
2109
+ // on the startOffset to move ahead, hence track if the state is updated in the cache. If
2110
+ // yes, then notify the delayed share fetch purgatory to complete the pending requests.
2111
+ boolean cacheStateUpdated = false ;
2112
+ lock .writeLock ().lock ();
2113
+ try {
2114
+ if (exception != null ) {
2115
+ log .debug ("Failed to write state to persister for the share partition: {}-{}" ,
2116
+ groupId , topicIdPartition , exception );
2117
+ // In case of failure when transition state is rolled back then it should be rolled
2118
+ // back to ACQUIRED state, unless acquisition lock for the state has expired.
2119
+ persisterBatches .forEach (persisterBatch -> {
2120
+ persisterBatch .updatedState .completeStateTransition (false );
2121
+ if (persisterBatch .updatedState .state () == RecordState .AVAILABLE ) {
2122
+ updateFindNextFetchOffset (true );
2123
+ }
2124
+ });
2125
+ future .completeExceptionally (exception );
2126
+ return ;
2127
+ }
2128
+
2129
+ log .trace ("State change request successful for share partition: {}-{}" ,
2130
+ groupId , topicIdPartition );
2131
+ persisterBatches .forEach (persisterBatch -> {
2132
+ persisterBatch .updatedState .completeStateTransition (true );
2133
+ if (persisterBatch .updatedState .state () == RecordState .AVAILABLE ) {
2135
2134
updateFindNextFetchOffset (true );
2136
2135
}
2137
2136
});
2138
- future .completeExceptionally (exception );
2139
- return ;
2137
+ // Update the cached state and start and end offsets after acknowledging/releasing the acquired records.
2138
+ cacheStateUpdated = maybeUpdateCachedStateAndOffsets ();
2139
+ future .complete (null );
2140
+ } finally {
2141
+ lock .writeLock ().unlock ();
2142
+ // Maybe complete the delayed share fetch request if the state has been changed in cache
2143
+ // which might have moved start offset ahead. Hence, the pending delayed share fetch
2144
+ // request can be completed. The call should be made outside the lock to avoid deadlock.
2145
+ maybeCompleteDelayedShareFetchRequest (cacheStateUpdated );
2140
2146
}
2141
-
2142
- log .trace ("State change request successful for share partition: {}-{}" ,
2143
- groupId , topicIdPartition );
2144
- updatedStates .forEach (state -> {
2145
- state .completeStateTransition (true );
2146
- if (state .state () == RecordState .AVAILABLE ) {
2147
- updateFindNextFetchOffset (true );
2148
- }
2149
- });
2150
- // Update the cached state and start and end offsets after acknowledging/releasing the acquired records.
2151
- cacheStateUpdated = maybeUpdateCachedStateAndOffsets ();
2152
- future .complete (null );
2153
- } finally {
2154
- lock .writeLock ().unlock ();
2155
- // Maybe complete the delayed share fetch request if the state has been changed in cache
2156
- // which might have moved start offset ahead. Hence, the pending delayed share fetch
2157
- // request can be completed. The call should be made outside the lock to avoid deadlock.
2158
- maybeCompleteDelayedShareFetchRequest (cacheStateUpdated );
2159
- }
2160
- });
2147
+ });
2161
2148
}
2162
2149
2163
2150
private boolean maybeUpdateCachedStateAndOffsets () {
@@ -2929,6 +2916,15 @@ void updateOffsetMetadata(long offset, LogOffsetMetadata offsetMetadata) {
2929
2916
}
2930
2917
}
2931
2918
2919
+ /**
2920
+ * PersisterBatch class is used to record the state updates for a batch or an offset.
2921
+ * It contains the updated in-flight state and the persister state batch to be sent to persister.
2922
+ */
2923
+ private record PersisterBatch (
2924
+ InFlightState updatedState ,
2925
+ PersisterStateBatch stateBatch
2926
+ ) { }
2927
+
2932
2928
/**
2933
2929
* LastOffsetAndMaxRecords class is used to track the last offset to acquire and the maximum number
2934
2930
* of records that can be acquired in a fetch request.
0 commit comments