88import java .util .Queue ;
99import java .util .concurrent .CompletableFuture ;
1010import java .util .concurrent .TimeUnit ;
11+ import java .util .concurrent .locks .Condition ;
12+ import java .util .concurrent .locks .ReentrantLock ;
1113import java .util .function .Consumer ;
1214
1315import javax .annotation .Nullable ;
@@ -34,6 +36,8 @@ public class SyncReaderImpl extends ReaderImpl implements SyncReader {
3436 private static final Logger logger = LoggerFactory .getLogger (SyncReaderImpl .class );
3537 private static final int POLL_INTERVAL_SECONDS = 5 ;
3638 private final Queue <MessageBatchWrapper > batchesInQueue = new LinkedList <>();
39+ private final ReentrantLock queueLock = new ReentrantLock ();
40+ private final Condition queueIsNotEmptyCondition = queueLock .newCondition ();
3741 private int currentMessageIndex = 0 ;
3842
3943 public SyncReaderImpl (TopicRpc topicRpc , ReaderSettings settings ) {
@@ -66,22 +70,21 @@ public Message receiveInternal(ReceiveSettings receiveSettings, long timeout, Ti
6670 if (isStopped .get ()) {
6771 throw new RuntimeException ("Reader was stopped" );
6872 }
69- synchronized (batchesInQueue ) {
73+
74+ queueLock .lock ();
75+
76+ try {
7077 if (batchesInQueue .isEmpty ()) {
7178 long millisToWait = TimeUnit .MILLISECONDS .convert (timeout , unit );
7279 Instant deadline = Instant .now ().plusMillis (millisToWait );
73- while (true ) {
74- if (!batchesInQueue .isEmpty ()) {
75- break ;
76- }
77- Instant now = Instant .now ();
78- if (now .isAfter (deadline )) {
80+ while (batchesInQueue .isEmpty ()) {
81+ millisToWait = Duration .between (Instant .now (), deadline ).toMillis ();
82+ if (millisToWait <= 0 ) {
7983 break ;
8084 }
81- // Using Math.max to prevent rounding duration to 0 which would lead to infinite wait
82- millisToWait = Math .max (1 , Duration .between (now , deadline ).toMillis ());
85+
8386 logger .trace ("No messages in queue. Waiting for {} ms..." , millisToWait );
84- batchesInQueue . wait (millisToWait );
87+ queueIsNotEmptyCondition . await (millisToWait , TimeUnit . MILLISECONDS );
8588 }
8689
8790 if (batchesInQueue .isEmpty ()) {
@@ -112,6 +115,8 @@ public Message receiveInternal(ReceiveSettings receiveSettings, long timeout, Ti
112115 }
113116 }
114117 return result ;
118+ } finally {
119+ queueLock .unlock ();
115120 }
116121 }
117122
@@ -143,10 +148,14 @@ protected CompletableFuture<Void> handleDataReceivedEvent(DataReceivedEvent even
143148 return resultFuture ;
144149 }
145150
146- synchronized (batchesInQueue ) {
151+ queueLock .lock ();
152+
153+ try {
147154 logger .debug ("Putting a message batch into queue and notifying in case receive method is waiting" );
148155 batchesInQueue .add (new MessageBatchWrapper (event .getMessages (), resultFuture ));
149- batchesInQueue .notify ();
156+ queueIsNotEmptyCondition .signal ();
157+ } finally {
158+ queueLock .unlock ();
150159 }
151160 return resultFuture ;
152161 }
0 commit comments