Skip to content

Commit fe3af55

Browse files
authored
Merge pull request #341 from Myllyenko/replacing_synchronized_with_locks
Replacing synchronized blocks with ReentrantLocks in core part of SDK
2 parents 9a7adfa + 863193b commit fe3af55

File tree

2 files changed

+31
-17
lines changed

2 files changed

+31
-17
lines changed

core/src/main/java/tech/ydb/core/impl/YdbDiscovery.java

Lines changed: 29 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@
77
import java.util.concurrent.Future;
88
import java.util.concurrent.ScheduledExecutorService;
99
import java.util.concurrent.TimeUnit;
10+
import java.util.concurrent.locks.Condition;
11+
import java.util.concurrent.locks.ReentrantLock;
1012
import java.util.stream.Collectors;
1113

1214
import org.slf4j.Logger;
@@ -52,7 +54,8 @@ public interface Handler {
5254
private final ScheduledExecutorService scheduler;
5355
private final String discoveryDatabase;
5456
private final Duration discoveryTimeout;
55-
private final Object readyObj = new Object();
57+
private final ReentrantLock readyLock = new ReentrantLock();
58+
private final Condition readyCondition = readyLock.newCondition();
5659

5760
private volatile Instant lastUpdateTime;
5861
private volatile Future<?> currentSchedule = null;
@@ -90,18 +93,20 @@ public void waitReady(long millis) throws IllegalStateException {
9093
return;
9194
}
9295

93-
synchronized (readyObj) {
94-
try {
95-
if (isStarted) {
96-
return;
97-
}
98-
99-
long timeout = millis > 0 ? millis : discoveryTimeout.toMillis();
100-
readyObj.wait(timeout);
101-
} catch (InterruptedException ex) {
102-
Thread.currentThread().interrupt();
103-
lastException = new IllegalStateException("Discovery waiting interrupted", ex);
96+
readyLock.lock();
97+
98+
try {
99+
if (isStarted) {
100+
return;
104101
}
102+
103+
long timeout = millis > 0 ? millis : discoveryTimeout.toMillis();
104+
readyCondition.await(timeout, TimeUnit.MILLISECONDS);
105+
} catch (InterruptedException ex) {
106+
Thread.currentThread().interrupt();
107+
lastException = new IllegalStateException("Discovery waiting interrupted", ex);
108+
} finally {
109+
readyLock.unlock();
105110
}
106111

107112
if (!isStarted) {
@@ -170,19 +175,27 @@ private void runDiscovery() {
170175
}
171176

172177
private void handleThrowable(Throwable th) {
173-
synchronized (readyObj) {
178+
readyLock.lock();
179+
180+
try {
174181
lastException = th;
175182
scheduleNextTick();
176-
readyObj.notifyAll();
183+
readyCondition.signalAll();
184+
} finally {
185+
readyLock.unlock();
177186
}
178187
}
179188

180189
private void handleOk(String selfLocation, List<EndpointRecord> endpoints) {
181-
synchronized (readyObj) {
190+
readyLock.lock();
191+
192+
try {
182193
isStarted = true;
183194
lastException = null;
184195
handler.handleEndpoints(endpoints, selfLocation).whenComplete((res, th) -> scheduleNextTick());
185-
readyObj.notifyAll();
196+
readyCondition.signalAll();
197+
} finally {
198+
readyLock.unlock();
186199
}
187200
}
188201

core/src/main/java/tech/ydb/core/impl/call/ReadWriteStreamCall.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
* @param <W> type of message to be sent to the server
2929
*/
3030
public class ReadWriteStreamCall<R, W> extends ClientCall.Listener<R> implements GrpcReadWriteStream<R, W> {
31+
// GrpcTransport's logger is used intentionally
3132
private static final Logger logger = LoggerFactory.getLogger(GrpcTransport.class);
3233

3334
private final String traceId;
@@ -159,7 +160,7 @@ public void close() {
159160
@Override
160161
public void onClose(io.grpc.Status status, @Nullable Metadata trailers) {
161162
if (logger.isTraceEnabled()) {
162-
logger.trace("ReadWriteStreamCall[{}] closed with status {}", status);
163+
logger.trace("ReadWriteStreamCall[{}] closed with status {}", traceId, status);
163164
}
164165
statusConsumer.accept(status, trailers);
165166

0 commit comments

Comments
 (0)