1212import java .util .concurrent .ConcurrentSkipListMap ;
1313import java .util .concurrent .Executor ;
1414import java .util .concurrent .atomic .AtomicBoolean ;
15+ import java .util .concurrent .locks .ReentrantLock ;
1516import java .util .function .Consumer ;
1617import java .util .function .Function ;
1718import java .util .stream .Collectors ;
@@ -44,11 +45,13 @@ public class PartitionSessionImpl {
4445 private final AtomicBoolean isWorking = new AtomicBoolean (true );
4546
4647 private final Queue <Batch > decodingBatches = new LinkedList <>();
48+ private final ReentrantLock decodingBatchesLock = new ReentrantLock ();
4749 private final Queue <Batch > readingQueue = new ConcurrentLinkedQueue <>();
4850 private final Function <DataReceivedEvent , CompletableFuture <Void >> dataEventCallback ;
4951 private final AtomicBoolean isReadingNow = new AtomicBoolean ();
5052 private final Consumer <List <OffsetsRange >> commitFunction ;
5153 private final NavigableMap <Long , CompletableFuture <Void >> commitFutures = new ConcurrentSkipListMap <>();
54+ private final ReentrantLock commitFuturesLock = new ReentrantLock ();
5255 // Offset of the last read message + 1
5356 private long lastReadOffset ;
5457 private long lastCommittedOffset ;
@@ -149,14 +152,21 @@ public CompletableFuture<Void> addBatches(List<YdbTopic.StreamReadMessage.ReadRe
149152 );
150153 });
151154 batchFutures .add (newBatch .getReadFuture ());
152- synchronized (decodingBatches ) {
155+
156+ decodingBatchesLock .lock ();
157+
158+ try {
153159 decodingBatches .add (newBatch );
160+ } finally {
161+ decodingBatchesLock .unlock ();
154162 }
155163
156164 CompletableFuture .runAsync (() -> decode (newBatch ), decompressionExecutor )
157165 .thenRun (() -> {
158166 boolean haveNewBatchesReady = false ;
159- synchronized (decodingBatches ) {
167+ decodingBatchesLock .lock ();
168+
169+ try {
160170 // Taking all encoded messages to sending queue
161171 while (true ) {
162172 Batch decodingBatch = decodingBatches .peek ();
@@ -176,7 +186,10 @@ public CompletableFuture<Void> addBatches(List<YdbTopic.StreamReadMessage.ReadRe
176186 break ;
177187 }
178188 }
189+ } finally {
190+ decodingBatchesLock .unlock ();
179191 }
192+
180193 if (haveNewBatchesReady ) {
181194 sendDataToReadersIfNeeded ();
182195 }
@@ -185,10 +198,12 @@ public CompletableFuture<Void> addBatches(List<YdbTopic.StreamReadMessage.ReadRe
185198 return CompletableFuture .allOf (batchFutures .toArray (new CompletableFuture <?>[0 ]));
186199 }
187200
188- // Сommit single offset range with result future
201+ // Commit single offset range with result future
189202 public CompletableFuture <Void > commitOffsetRange (OffsetsRange rangeToCommit ) {
190203 CompletableFuture <Void > resultFuture = new CompletableFuture <>();
191- synchronized (commitFutures ) {
204+ commitFuturesLock .lock ();
205+
206+ try {
192207 if (isWorking .get ()) {
193208 if (logger .isDebugEnabled ()) {
194209 logger .debug ("[{}] Offset range [{}, {}) is requested to be committed for partition session {} " +
@@ -205,6 +220,8 @@ public CompletableFuture<Void> commitOffsetRange(OffsetsRange rangeToCommit) {
205220 partitionId + ") for " + path + " is already closed" ));
206221 return resultFuture ;
207222 }
223+ } finally {
224+ commitFuturesLock .unlock ();
208225 }
209226 List <OffsetsRange > rangeWrapper = new ArrayList <>(1 );
210227 rangeWrapper .add (rangeToCommit );
@@ -334,16 +351,25 @@ private void sendDataToReadersIfNeeded() {
334351 }
335352
336353 public void shutdown () {
337- synchronized (commitFutures ) {
354+ commitFuturesLock .lock ();
355+
356+ try {
338357 isWorking .set (false );
339358 logger .info ("[{}] Partition session {} (partition {}) is shutting down. Failing {} commit futures..." , path ,
340359 id , partitionId , commitFutures .size ());
341360 commitFutures .values ().forEach (f -> f .completeExceptionally (new RuntimeException ("Partition session " + id +
342361 " (partition " + partitionId + ") for " + path + " is closed" )));
362+ } finally {
363+ commitFuturesLock .unlock ();
343364 }
344- synchronized (decodingBatches ) {
365+
366+ decodingBatchesLock .lock ();
367+
368+ try {
345369 decodingBatches .forEach (Batch ::complete );
346370 readingQueue .forEach (Batch ::complete );
371+ } finally {
372+ decodingBatchesLock .unlock ();
347373 }
348374 }
349375
0 commit comments