55import java .util .Map ;
66import java .util .concurrent .CompletableFuture ;
77import java .util .concurrent .ConcurrentHashMap ;
8+ import java .util .concurrent .locks .ReentrantLock ;
89import java .util .stream .Collectors ;
910
1011import org .slf4j .Logger ;
2627 * @author Nikolay Perfilov
2728 */
2829public class TransactionMessageAccumulatorImpl implements TransactionMessageAccumulator {
29- private static final Logger logger = LoggerFactory .getLogger (DeferredCommitterImpl .class );
30+ private static final Logger logger = LoggerFactory .getLogger (TransactionMessageAccumulatorImpl .class );
3031
3132 private final AsyncReader reader ;
3233 private final Map <String , Map <PartitionSession , PartitionRanges >> rangesByTopic = new ConcurrentHashMap <>();
3334
3435 private static class PartitionRanges {
3536 private final PartitionSession partitionSession ;
3637 private final DisjointOffsetRangeSet ranges = new DisjointOffsetRangeSet ();
38+ private final ReentrantLock rangesLock = new ReentrantLock ();
3739
3840 private PartitionRanges (PartitionSession partitionSession ) {
3941 this .partitionSession = partitionSession ;
4042 }
4143
4244 private void add (OffsetsRange offsetRange ) {
4345 try {
44- synchronized (ranges ) {
46+ rangesLock .lock ();
47+
48+ try {
4549 ranges .add (offsetRange );
50+ } finally {
51+ rangesLock .unlock ();
4652 }
4753 } catch (RuntimeException exception ) {
4854 String errorMessage = "Error adding new offset range to DeferredCommitter for partition session " +
@@ -54,8 +60,12 @@ private void add(OffsetsRange offsetRange) {
5460 }
5561
5662 private List <OffsetsRange > getOffsetsRanges () {
57- synchronized (ranges ) {
63+ rangesLock .lock ();
64+
65+ try {
5866 return ranges .getRangesAndClear ();
67+ } finally {
68+ rangesLock .unlock ();
5969 }
6070 }
6171 }
0 commit comments