Skip to content

Commit 9b5082a

Browse files
committed
fix: use ReentrantLock instead of synchronized in Connection.
1 parent b406670 commit 9b5082a

File tree

2 files changed

+48
-8
lines changed

2 files changed

+48
-8
lines changed

polaris-plugins/polaris-plugins-connector/connector-polaris-grpc/src/main/java/com/tencent/polaris/plugins/connector/grpc/Connection.java

Lines changed: 28 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,8 @@
2828
import java.util.concurrent.TimeUnit;
2929
import java.util.concurrent.atomic.AtomicBoolean;
3030
import java.util.concurrent.atomic.AtomicInteger;
31+
import java.util.concurrent.locks.ReentrantLock;
32+
3133
import org.slf4j.Logger;
3234

3335
/**
@@ -74,7 +76,7 @@ public class Connection {
7476
/**
7577
* 申请锁
7678
*/
77-
private final Object lock = new Object();
79+
private final ReentrantLock lock = new ReentrantLock();
7880

7981
/**
8082
* 连接是否已经关闭
@@ -108,23 +110,40 @@ public static boolean isAvailableConnection(Connection connection) {
108110
*/
109111
public boolean acquire(String opKey) {
110112
if (lazyDestroy.get()) {
113+
LOG.warn("connection {}: acquired for op {} need destroy, curRef is {}", connID, opKey, ref.get());
111114
return false;
112115
}
113-
synchronized (lock) {
114-
if (lazyDestroy.get()) {
116+
boolean locked = false;
117+
try {
118+
// retry outside.
119+
locked = lock.tryLock(1, TimeUnit.SECONDS);
120+
if (locked) {
121+
if (lazyDestroy.get()) {
122+
return false;
123+
}
124+
int curRef = ref.incrementAndGet();
125+
LOG.debug("connection {}: acquired for op {}, curRef is {}", connID, opKey, curRef);
126+
return true;
127+
} else {
128+
LOG.warn("connection {}: acquired for op {} timeout, curRef is {}", connID, opKey, ref.get());
115129
return false;
116130
}
117-
int curRef = ref.incrementAndGet();
118-
LOG.debug("connection {}: acquired for op {}, curRef is {}", connID, opKey, curRef);
119-
return true;
131+
} catch (Exception e) {
132+
LOG.warn("connection {}: acquired for op {} occur exception, curRef is {}, msg:{}", connID, opKey, ref.get(), e.getMessage());
133+
return false;
134+
} finally {
135+
if (locked) {
136+
lock.unlock();
137+
}
120138
}
121139
}
122140

123141
/**
124142
* 关闭连接
125143
*/
126144
public void closeConnection() {
127-
synchronized (lock) {
145+
lock.lock();
146+
try {
128147
if (ref.get() <= 0 && !closed) {
129148
LOG.info("connection {}: closed", connID);
130149
closed = true;
@@ -155,6 +174,8 @@ public void closeConnection() {
155174
LOG.debug("Success to gracefully shutdown connection: {}. ", connID);
156175
}
157176
}
177+
} finally {
178+
lock.unlock();
158179
}
159180
}
160181

polaris-plugins/polaris-plugins-connector/connector-polaris-grpc/src/main/java/com/tencent/polaris/plugins/connector/grpc/GrpcConnector.java

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -504,20 +504,34 @@ public void heartbeat(CommonProviderRequest req) throws PolarisException {
504504
ServiceKey serviceKey = new ServiceKey(req.getNamespace(), req.getService());
505505
long startTimestamp = 0L;
506506
try {
507+
long t1 = System.currentTimeMillis();
507508
waitDiscoverReady();
509+
long t2 = System.currentTimeMillis();
508510
connection = connectionManager
509511
.getConnection(GrpcUtil.OP_KEY_INSTANCE_HEARTBEAT, ClusterType.HEALTH_CHECK_CLUSTER);
512+
long t3 = System.currentTimeMillis();
510513
req.setTargetServer(connectionToTargetNode(connection));
514+
long t4 = System.currentTimeMillis();
511515
PolarisGRPCGrpc.PolarisGRPCBlockingStub stub = PolarisGRPCGrpc.newBlockingStub(connection.getChannel());
516+
long t5 = System.currentTimeMillis();
512517
stub = GrpcUtil.attachRequestHeader(stub, GrpcUtil.nextHeartbeatReqId());
518+
long t6 = System.currentTimeMillis();
513519
stub = GrpcUtil.attachAccessToken(connectorConfig.getToken(), stub);
514520
startTimestamp = System.currentTimeMillis();
521+
if (startTimestamp - t1 > 1000) {
522+
LOG.warn("waitDiscoverReady cost {} ms, getConnection cost {} ms, connectionToTargetNode cost {} ms, newBlockingStub cost {} ms, attachRequestHeader cost {} ms, attachAccessToken cost {} ms",
523+
t2 - t1, t3 - t2, t4 - t3, t5 - t4, t6 - t5, startTimestamp - t6);
524+
} else if (LOG.isDebugEnabled()) {
525+
LOG.debug("waitDiscoverReady cost {} ms, getConnection cost {} ms, connectionToTargetNode cost {} ms, newBlockingStub cost {} ms, attachRequestHeader cost {} ms, attachAccessToken cost {} ms",
526+
t2 - t1, t3 - t2, t4 - t3, t5 - t4, t6 - t5, startTimestamp - t6);
527+
}
515528
LOG.debug("start heartbeat at {} ms.", startTimestamp);
516529
ResponseProto.Response heartbeatResponse = stub.withDeadlineAfter(req.getTimeoutMs(),
517530
TimeUnit.MILLISECONDS).heartbeat(buildHeartbeatRequest(req));
518531
GrpcUtil.checkResponse(heartbeatResponse);
519532
LOG.debug("received heartbeat response {}", heartbeatResponse);
520533
} catch (Throwable t) {
534+
LOG.warn("heartbeat fail, req: {}, msg: {}", req, t.getMessage());
521535
if (t instanceof PolarisException) {
522536
//服务端异常不进行重试
523537
throw t;
@@ -531,7 +545,12 @@ public void heartbeat(CommonProviderRequest req) throws PolarisException {
531545
req.getInstanceID(), req.getHost(), req.getPort(), serviceKey), t);
532546
} finally {
533547
long endTimestamp = System.currentTimeMillis();
534-
LOG.debug("end heartbeat at {} ms. Diff {} ms", endTimestamp, endTimestamp - startTimestamp);
548+
long cost = endTimestamp - startTimestamp;
549+
if (cost > 1000) {
550+
LOG.warn("end heartbeat at {} ms. Diff {} ms", endTimestamp, cost);
551+
} else if (LOG.isDebugEnabled()) {
552+
LOG.debug("end heartbeat at {} ms. Diff {} ms", endTimestamp, cost);
553+
}
535554
if (null != connection) {
536555
connection.release(GrpcUtil.OP_KEY_INSTANCE_HEARTBEAT);
537556
}

0 commit comments

Comments
 (0)