File tree Expand file tree Collapse file tree 1 file changed +11
-2
lines changed
topic/src/main/java/tech/ydb/topic/read/impl Expand file tree Collapse file tree 1 file changed +11
-2
lines changed Original file line number Diff line number Diff line change 33import java .util .List ;
44import java .util .Map ;
55import java .util .concurrent .ConcurrentHashMap ;
6+ import java .util .concurrent .locks .ReentrantLock ;
67
78import org .slf4j .Logger ;
89import org .slf4j .LoggerFactory ;
@@ -24,15 +25,20 @@ public class DeferredCommitterImpl implements DeferredCommitter {
2425 private static class PartitionRanges {
2526 private final PartitionSessionImpl partitionSession ;
2627 private final DisjointOffsetRangeSet ranges = new DisjointOffsetRangeSet ();
28+ private final ReentrantLock rangesLock = new ReentrantLock ();
2729
2830 private PartitionRanges (PartitionSessionImpl partitionSession ) {
2931 this .partitionSession = partitionSession ;
3032 }
3133
3234 private void add (OffsetsRange offsetRange ) {
3335 try {
34- synchronized (ranges ) {
36+ rangesLock .lock ();
37+
38+ try {
3539 ranges .add (offsetRange );
40+ } finally {
41+ rangesLock .unlock ();
3642 }
3743 } catch (RuntimeException exception ) {
3844 String errorMessage = "Error adding new offset range to DeferredCommitter for partition session " +
@@ -45,8 +51,11 @@ private void add(OffsetsRange offsetRange) {
4551
4652 private void commit () {
4753 List <OffsetsRange > rangesToCommit ;
48- synchronized (ranges ) {
54+ rangesLock .lock ();
55+ try {
4956 rangesToCommit = ranges .getRangesAndClear ();
57+ } finally {
58+ rangesLock .unlock ();
5059 }
5160 partitionSession .commitOffsetRanges (rangesToCommit );
5261 }
You can’t perform that action at this time.
0 commit comments