Skip to content

Commit baa8dea

Browse files
author
Igor Melnichenko
committed
synchronized blocks were replaced with ReentrantLock in SessionBase
1 parent 959538e commit baa8dea

File tree

1 file changed

+56
-37
lines changed

1 file changed

+56
-37
lines changed

topic/src/main/java/tech/ydb/topic/impl/SessionBase.java

Lines changed: 56 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import java.util.Objects;
44
import java.util.concurrent.CompletableFuture;
55
import java.util.concurrent.atomic.AtomicBoolean;
6+
import java.util.concurrent.locks.ReentrantLock;
67

78
import org.slf4j.Logger;
89

@@ -19,6 +20,7 @@ public abstract class SessionBase<R, W> implements Session {
1920

2021
protected final GrpcReadWriteStream<R, W> streamConnection;
2122
protected final AtomicBoolean isWorking = new AtomicBoolean(true);
23+
private final ReentrantLock lock = new ReentrantLock();
2224
private String token;
2325

2426
public SessionBase(GrpcReadWriteStream<R, W> streamConnection) {
@@ -32,57 +34,74 @@ public SessionBase(GrpcReadWriteStream<R, W> streamConnection) {
3234

3335
protected abstract void onStop();
3436

35-
protected synchronized CompletableFuture<Status> start(GrpcReadStream.Observer<R> streamObserver) {
36-
getLogger().info("Session start");
37-
return streamConnection.start(message -> {
38-
if (getLogger().isTraceEnabled()) {
39-
getLogger().trace("Message received:\n{}", message);
40-
} else {
41-
getLogger().debug("Message received");
42-
}
37+
protected CompletableFuture<Status> start(GrpcReadStream.Observer<R> streamObserver) {
38+
lock.lock();
39+
40+
try {
41+
getLogger().info("Session start");
42+
return streamConnection.start(message -> {
43+
if (getLogger().isTraceEnabled()) {
44+
getLogger().trace("Message received:\n{}", message);
45+
} else {
46+
getLogger().debug("Message received");
47+
}
48+
49+
if (isWorking.get()) {
50+
streamObserver.onNext(message);
51+
}
52+
});
53+
} finally {
54+
lock.unlock();
55+
}
56+
}
57+
58+
public void send(W request) {
59+
lock.lock();
4360

44-
if (isWorking.get()) {
45-
streamObserver.onNext(message);
61+
try {
62+
if (!isWorking.get()) {
63+
if (getLogger().isTraceEnabled()) {
64+
getLogger().trace("Session is already closed. This message is NOT sent:\n{}", request);
65+
}
66+
return;
67+
}
68+
String currentToken = streamConnection.authToken();
69+
if (!Objects.equals(token, currentToken)) {
70+
token = currentToken;
71+
getLogger().info("Sending new token");
72+
sendUpdateTokenRequest(token);
4673
}
47-
});
48-
}
4974

50-
public synchronized void send(W request) {
51-
if (!isWorking.get()) {
5275
if (getLogger().isTraceEnabled()) {
53-
getLogger().trace("Session is already closed. This message is NOT sent:\n{}", request);
76+
getLogger().trace("Sending request:\n{}", request);
77+
} else {
78+
getLogger().debug("Sending request");
5479
}
55-
return;
80+
streamConnection.sendNext(request);
81+
} finally {
82+
lock.unlock();
5683
}
57-
String currentToken = streamConnection.authToken();
58-
if (!Objects.equals(token, currentToken)) {
59-
token = currentToken;
60-
getLogger().info("Sending new token");
61-
sendUpdateTokenRequest(token);
62-
}
63-
64-
if (getLogger().isTraceEnabled()) {
65-
getLogger().trace("Sending request:\n{}", request);
66-
} else {
67-
getLogger().debug("Sending request");
68-
}
69-
streamConnection.sendNext(request);
7084
}
7185

7286
private boolean stop() {
7387
getLogger().info("Session stop");
7488
return isWorking.compareAndSet(true, false);
7589
}
7690

77-
7891
@Override
79-
public synchronized boolean shutdown() {
80-
getLogger().info("Session shutdown");
81-
if (stop()) {
82-
onStop();
83-
streamConnection.close();
84-
return true;
92+
public boolean shutdown() {
93+
lock.lock();
94+
95+
try {
96+
getLogger().info("Session shutdown");
97+
if (stop()) {
98+
onStop();
99+
streamConnection.close();
100+
return true;
101+
}
102+
return false;
103+
} finally {
104+
lock.unlock();
85105
}
86-
return false;
87106
}
88107
}

0 commit comments

Comments
 (0)