Skip to content

Commit 0c8b504

Browse files
author
Igor Melnichenko
committed
synchronized blocks were replaced with ReentrantLock in WriterImpl
1 parent 83bb141 commit 0c8b504

File tree

1 file changed

+23
-5
lines changed

1 file changed

+23
-5
lines changed

topic/src/main/java/tech/ydb/topic/write/impl/WriterImpl.java

Lines changed: 23 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
import java.util.concurrent.atomic.AtomicBoolean;
1313
import java.util.concurrent.atomic.AtomicLong;
1414
import java.util.concurrent.atomic.AtomicReference;
15+
import java.util.concurrent.locks.ReentrantLock;
1516

1617
import org.slf4j.Logger;
1718
import org.slf4j.LoggerFactory;
@@ -44,6 +45,7 @@ public abstract class WriterImpl extends GrpcStreamRetrier {
4445
private final AtomicReference<CompletableFuture<InitResult>> initResultFutureRef = new AtomicReference<>(null);
4546
// Messages that are waiting for being put into sending queue due to queue overflow
4647
private final Queue<IncomingMessage> incomingQueue = new LinkedList<>();
48+
private final ReentrantLock incomingQueueLock = new ReentrantLock();
4749
// Messages that are currently encoding
4850
private final Queue<EnqueuedMessage> encodingMessages = new LinkedList<>();
4951
// Messages that are taken into send buffer, are already compressed and are waiting for being sent
@@ -99,7 +101,9 @@ private IncomingMessage(EnqueuedMessage message) {
99101
}
100102

101103
public CompletableFuture<Void> tryToEnqueue(EnqueuedMessage message, boolean instant) {
102-
synchronized (incomingQueue) {
104+
incomingQueueLock.lock();
105+
106+
try {
103107
if (currentInFlightCount >= settings.getMaxSendBufferMessagesCount()) {
104108
if (instant) {
105109
logger.info("[{}] Rejecting a message due to reaching message queue in-flight limit of {}", id,
@@ -137,10 +141,12 @@ public CompletableFuture<Void> tryToEnqueue(EnqueuedMessage message, boolean ins
137141
IncomingMessage incomingMessage = new IncomingMessage(message);
138142
incomingQueue.add(incomingMessage);
139143
return incomingMessage.future;
144+
} finally {
145+
incomingQueueLock.unlock();
140146
}
141147
}
142148

143-
// should be done under synchronized incomingQueue
149+
// should be done under incomingQueueLock
144150
private void acceptMessageIntoSendingQueue(EnqueuedMessage message) {
145151
this.lastAcceptedMessageFuture = message.getFuture();
146152
this.currentInFlightCount++;
@@ -187,7 +193,9 @@ private void moveEncodedMessagesToSendingQueue() {
187193
boolean haveNewMessagesToSend = false;
188194
// Working with encodingMessages under synchronized incomingQueue to prevent deadlocks
189195
// while working with free method
190-
synchronized (incomingQueue) {
196+
incomingQueueLock.lock();
197+
198+
try {
191199
// Taking all encoded messages to sending queue
192200
while (true) {
193201
EnqueuedMessage encodedMessage = encodingMessages.peek();
@@ -217,6 +225,8 @@ private void moveEncodedMessagesToSendingQueue() {
217225
break;
218226
}
219227
}
228+
} finally {
229+
incomingQueueLock.unlock();
220230
}
221231
if (haveNewMessagesToSend) {
222232
session.sendDataRequestIfNeeded();
@@ -264,15 +274,21 @@ protected CompletableFuture<Void> flushImpl() {
264274
if (this.lastAcceptedMessageFuture == null) {
265275
return CompletableFuture.completedFuture(null);
266276
}
267-
synchronized (incomingQueue) {
277+
incomingQueueLock.lock();
278+
279+
try {
268280
return this.lastAcceptedMessageFuture.isDone()
269281
? CompletableFuture.completedFuture(null)
270282
: this.lastAcceptedMessageFuture.thenApply(v -> null);
283+
} finally {
284+
incomingQueueLock.unlock();
271285
}
272286
}
273287

274288
private void free(int messageCount, long sizeBytes) {
275-
synchronized (incomingQueue) {
289+
incomingQueueLock.lock();
290+
291+
try {
276292
currentInFlightCount -= messageCount;
277293
availableSizeBytes += sizeBytes;
278294
if (logger.isTraceEnabled()) {
@@ -301,6 +317,8 @@ private void free(int messageCount, long sizeBytes) {
301317
}
302318
logger.trace("[{}] All messages from incomingQueue are accepted into send buffer", id);
303319
}
320+
} finally {
321+
incomingQueueLock.unlock();
304322
}
305323
}
306324

0 commit comments

Comments
 (0)