79
79
import java .util .Set ;
80
80
import java .util .concurrent .CompletableFuture ;
81
81
import java .util .concurrent .ConcurrentSkipListMap ;
82
- import java .util .concurrent .atomic .AtomicBoolean ;
83
82
import java .util .concurrent .atomic .AtomicReference ;
84
83
import java .util .concurrent .locks .ReadWriteLock ;
85
84
import java .util .concurrent .locks .ReentrantReadWriteLock ;
@@ -237,11 +236,6 @@ private enum DeliveryCountOps {
237
236
*/
238
237
private final ReadWriteLock lock ;
239
238
240
- /**
241
- * The find next fetch offset is used to indicate if the next fetch offset should be recomputed.
242
- */
243
- private final AtomicBoolean findNextFetchOffset ;
244
-
245
239
/**
246
240
* The lock to ensure that the same share partition does not enter a fetch queue
247
241
* while another one is being fetched within the queue. The caller's id that acquires the fetch
@@ -275,6 +269,11 @@ private enum DeliveryCountOps {
275
269
*/
276
270
private final int defaultRecordLockDurationMs ;
277
271
272
+ /**
273
+ * The find next fetch offset is used to indicate if the next fetch offset should be recomputed.
274
+ */
275
+ private boolean findNextFetchOffset ;
276
+
278
277
/**
279
278
* Timer is used to implement acquisition lock on records that guarantees the movement of records from
280
279
* acquired to available/archived state upon timeout
@@ -410,7 +409,7 @@ private enum DeliveryCountOps {
410
409
this .maxDeliveryCount = maxDeliveryCount ;
411
410
this .cachedState = new ConcurrentSkipListMap <>();
412
411
this .lock = new ReentrantReadWriteLock ();
413
- this .findNextFetchOffset = new AtomicBoolean ( false ) ;
412
+ this .findNextFetchOffset = false ;
414
413
this .fetchLock = new AtomicReference <>(null );
415
414
this .defaultRecordLockDurationMs = defaultRecordLockDurationMs ;
416
415
this .timer = timer ;
@@ -536,7 +535,7 @@ public CompletableFuture<Void> maybeInitialize() {
536
535
if (!cachedState .isEmpty ()) {
537
536
// If the cachedState is not empty, findNextFetchOffset flag is set to true so that any AVAILABLE records
538
537
// in the cached state are not missed
539
- findNextFetchOffset . set (true );
538
+ updateFindNextFetchOffset (true );
540
539
endOffset = cachedState .lastEntry ().getValue ().lastOffset ();
541
540
// initialReadGapOffset is not required, if there are no gaps in the read state response
542
541
if (gapStartOffset != -1 ) {
@@ -599,7 +598,7 @@ public long nextFetchOffset() {
599
598
lock .writeLock ().lock ();
600
599
try {
601
600
// When none of the records in the cachedState are in the AVAILABLE state, findNextFetchOffset will be false
602
- if (!findNextFetchOffset . get () ) {
601
+ if (!findNextFetchOffset ) {
603
602
if (cachedState .isEmpty () || startOffset > cachedState .lastEntry ().getValue ().lastOffset ()) {
604
603
// 1. When cachedState is empty, endOffset is set to the next offset of the last
605
604
// offset removed from batch, which is the next offset to be fetched.
@@ -618,7 +617,7 @@ public long nextFetchOffset() {
618
617
// If cachedState is empty, there is no need of re-computing next fetch offset in future fetch requests.
619
618
// Same case when startOffset has moved beyond the in-flight records, startOffset and endOffset point to the LSO
620
619
// and the cached state is fresh.
621
- findNextFetchOffset . set (false );
620
+ updateFindNextFetchOffset (false );
622
621
log .trace ("The next fetch offset for the share partition {}-{} is {}" , groupId , topicIdPartition , endOffset );
623
622
return endOffset ;
624
623
}
@@ -663,7 +662,7 @@ public long nextFetchOffset() {
663
662
// If nextFetchOffset is -1, then no AVAILABLE records are found in the cachedState, so there is no need of
664
663
// re-computing next fetch offset in future fetch requests
665
664
if (nextFetchOffset == -1 ) {
666
- findNextFetchOffset . set (false );
665
+ updateFindNextFetchOffset (false );
667
666
nextFetchOffset = endOffset + 1 ;
668
667
}
669
668
log .trace ("The next fetch offset for the share partition {}-{} is {}" , groupId , topicIdPartition , nextFetchOffset );
@@ -1064,7 +1063,7 @@ private Optional<Throwable> releaseAcquiredRecordsForPerOffsetBatch(String membe
1064
1063
// If the maxDeliveryCount limit has been exceeded, the record will be transitioned to ARCHIVED state.
1065
1064
// This should not change the next fetch offset because the record is not available for acquisition
1066
1065
if (updateResult .state != RecordState .ARCHIVED ) {
1067
- findNextFetchOffset . set (true );
1066
+ updateFindNextFetchOffset (true );
1068
1067
}
1069
1068
}
1070
1069
}
@@ -1109,7 +1108,7 @@ private Optional<Throwable> releaseAcquiredRecordsForCompleteBatch(String member
1109
1108
// If the maxDeliveryCount limit has been exceeded, the record will be transitioned to ARCHIVED state.
1110
1109
// This should not change the next fetch offset because the record is not available for acquisition
1111
1110
if (updateResult .state != RecordState .ARCHIVED ) {
1112
- findNextFetchOffset . set (true );
1111
+ updateFindNextFetchOffset (true );
1113
1112
}
1114
1113
}
1115
1114
return Optional .empty ();
@@ -1144,7 +1143,7 @@ void updateCacheAndOffsets(long logStartOffset) {
1144
1143
// If we have transitioned the state of any batch/offset from AVAILABLE to ARCHIVED,
1145
1144
// then there is a chance that the next fetch offset can change.
1146
1145
if (anyRecordArchived ) {
1147
- findNextFetchOffset . set (true );
1146
+ updateFindNextFetchOffset (true );
1148
1147
}
1149
1148
1150
1149
// The new startOffset will be the log start offset.
@@ -1206,7 +1205,7 @@ private void maybeArchiveStaleBatches(long fetchOffset, long baseOffset) {
1206
1205
// If we have transitioned the state of any batch/offset from AVAILABLE to ARCHIVED,
1207
1206
// then there is a chance that the next fetch offset can change.
1208
1207
if (anyRecordArchived ) {
1209
- findNextFetchOffset . set (true );
1208
+ updateFindNextFetchOffset (true );
1210
1209
}
1211
1210
} finally {
1212
1211
lock .writeLock ().unlock ();
@@ -1944,7 +1943,7 @@ private Optional<Throwable> acknowledgePerOffsetBatchRecords(
1944
1943
// This should not change the next fetch offset because the record is not available for acquisition
1945
1944
if (recordState == RecordState .AVAILABLE
1946
1945
&& updateResult .state != RecordState .ARCHIVED ) {
1947
- findNextFetchOffset . set (true );
1946
+ updateFindNextFetchOffset (true );
1948
1947
}
1949
1948
}
1950
1949
} finally {
@@ -2000,7 +1999,7 @@ private Optional<Throwable> acknowledgeCompleteBatch(
2000
1999
// This should not change the nextFetchOffset because the record is not available for acquisition
2001
2000
if (recordState == RecordState .AVAILABLE
2002
2001
&& updateResult .state != RecordState .ARCHIVED ) {
2003
- findNextFetchOffset . set (true );
2002
+ updateFindNextFetchOffset (true );
2004
2003
}
2005
2004
} finally {
2006
2005
lock .writeLock ().unlock ();
@@ -2086,11 +2085,11 @@ void rollbackOrProcessStateUpdates(
2086
2085
// Cancel the acquisition lock timeout task for the state since it is acknowledged/released successfully.
2087
2086
state .cancelAndClearAcquisitionLockTimeoutTask ();
2088
2087
if (state .state == RecordState .AVAILABLE ) {
2089
- findNextFetchOffset . set (true );
2088
+ updateFindNextFetchOffset (true );
2090
2089
}
2091
2090
});
2092
2091
// Update the cached state and start and end offsets after acknowledging/releasing the acquired records.
2093
- cacheStateUpdated = maybeUpdateCachedStateAndOffsets ();
2092
+ cacheStateUpdated = maybeUpdateCachedStateAndOffsets ();
2094
2093
future .complete (null );
2095
2094
} finally {
2096
2095
lock .writeLock ().unlock ();
@@ -2467,7 +2466,7 @@ private void releaseAcquisitionLockOnTimeoutForCompleteBatch(InFlightBatch inFli
2467
2466
// Cancel the acquisition lock timeout task for the batch since it is completed now.
2468
2467
updateResult .cancelAndClearAcquisitionLockTimeoutTask ();
2469
2468
if (updateResult .state != RecordState .ARCHIVED ) {
2470
- findNextFetchOffset . set (true );
2469
+ updateFindNextFetchOffset (true );
2471
2470
}
2472
2471
return ;
2473
2472
}
@@ -2514,7 +2513,7 @@ private void releaseAcquisitionLockOnTimeoutForPerOffsetBatch(InFlightBatch inFl
2514
2513
// Cancel the acquisition lock timeout task for the offset since it is completed now.
2515
2514
updateResult .cancelAndClearAcquisitionLockTimeoutTask ();
2516
2515
if (updateResult .state != RecordState .ARCHIVED ) {
2517
- findNextFetchOffset . set (true );
2516
+ updateFindNextFetchOffset (true );
2518
2517
}
2519
2518
}
2520
2519
}
@@ -2748,12 +2747,22 @@ NavigableMap<Long, InFlightBatch> cachedState() {
2748
2747
2749
2748
// Visible for testing.
2750
2749
boolean findNextFetchOffset () {
2751
- return findNextFetchOffset .get ();
2750
+ lock .readLock ().lock ();
2751
+ try {
2752
+ return findNextFetchOffset ;
2753
+ } finally {
2754
+ lock .readLock ().unlock ();
2755
+ }
2752
2756
}
2753
2757
2754
- // Visible for testing. Should only be used for testing purposes.
2755
- void findNextFetchOffset (boolean findNextOffset ) {
2756
- findNextFetchOffset .getAndSet (findNextOffset );
2758
+ // Visible for testing.
2759
+ void updateFindNextFetchOffset (boolean value ) {
2760
+ lock .writeLock ().lock ();
2761
+ try {
2762
+ findNextFetchOffset = value ;
2763
+ } finally {
2764
+ lock .writeLock ().unlock ();
2765
+ }
2757
2766
}
2758
2767
2759
2768
// Visible for testing
0 commit comments