Skip to content

Commit c8bd9d5

Browse files
fix: use ReentrantLock instead of synchronized in Connection. (#650)
* fix: use ReentrantLock instead of synchronized in Connection. --------- Co-authored-by: shedfreewu <[email protected]>
1 parent b406670 commit c8bd9d5

File tree

2 files changed

+90
-32
lines changed

2 files changed

+90
-32
lines changed

polaris-plugins/polaris-plugins-connector/connector-polaris-grpc/src/main/java/com/tencent/polaris/plugins/connector/grpc/Connection.java

Lines changed: 70 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,8 @@
2828
import java.util.concurrent.TimeUnit;
2929
import java.util.concurrent.atomic.AtomicBoolean;
3030
import java.util.concurrent.atomic.AtomicInteger;
31+
import java.util.concurrent.locks.ReentrantLock;
32+
3133
import org.slf4j.Logger;
3234

3335
/**
@@ -74,7 +76,7 @@ public class Connection {
7476
/**
7577
* 申请锁
7678
*/
77-
private final Object lock = new Object();
79+
private final ReentrantLock lock = new ReentrantLock();
7880

7981
/**
8082
* 连接是否已经关闭
@@ -108,53 +110,90 @@ public static boolean isAvailableConnection(Connection connection) {
108110
*/
109111
public boolean acquire(String opKey) {
110112
if (lazyDestroy.get()) {
113+
LOG.warn("connection {}: acquired for op {} need destroy, curRef is {}", connID, opKey, ref.get());
111114
return false;
112115
}
113-
synchronized (lock) {
114-
if (lazyDestroy.get()) {
116+
boolean locked = false;
117+
try {
118+
// retry outside.
119+
locked = lock.tryLock(1, TimeUnit.SECONDS);
120+
if (locked) {
121+
if (lazyDestroy.get()) {
122+
return false;
123+
}
124+
int curRef = ref.incrementAndGet();
125+
LOG.debug("connection {}: acquired for op {}, curRef is {}", connID, opKey, curRef);
126+
return true;
127+
} else {
128+
LOG.warn("connection {}: acquired for op {} timeout, curRef is {}", connID, opKey, ref.get());
115129
return false;
116130
}
117-
int curRef = ref.incrementAndGet();
118-
LOG.debug("connection {}: acquired for op {}, curRef is {}", connID, opKey, curRef);
119-
return true;
131+
} catch (Exception e) {
132+
LOG.warn("connection {}: acquired for op {} occur exception, curRef is {}, msg:{}", connID, opKey, ref.get(), e.getMessage());
133+
return false;
134+
} finally {
135+
if (locked) {
136+
lock.unlock();
137+
}
120138
}
121139
}
122140

123141
/**
124142
* 关闭连接
125143
*/
126144
public void closeConnection() {
127-
synchronized (lock) {
128-
if (ref.get() <= 0 && !closed) {
129-
LOG.info("connection {}: closed", connID);
130-
closed = true;
131-
// Gracefully shutdown the gRPC managed-channel.
132-
if (channel != null && !channel.isShutdown()) {
133-
try {
134-
channel.shutdown();
135-
if (!channel.awaitTermination(1, TimeUnit.SECONDS)) {
136-
LOG.warn("Timed out gracefully shutting down connection: {}. ", connID);
137-
}
138-
} catch (Exception e) {
139-
LOG.error("Unexpected exception while waiting for channel {} gracefully termination", connID, e);
145+
while (true) {
146+
boolean locked = false;
147+
try {
148+
locked = lock.tryLock(1, TimeUnit.SECONDS);
149+
if (locked) {
150+
doCloseConnection();
151+
break;
152+
} else {
153+
LOG.warn("connection {}: get lock timeout, retry, curRef is {}", connID, ref.get());
154+
}
155+
} catch (Exception e) {
156+
LOG.warn("connection {}: get lock occur exception, retry, curRef is {}, msg:{}", connID, ref.get(), e.getMessage());
157+
} finally {
158+
if (locked) {
159+
lock.unlock();
160+
}
161+
}
162+
}
163+
}
164+
165+
private void doCloseConnection() {
166+
if (ref.get() <= 0 && !closed) {
167+
LOG.info("[doCloseConnection] connection {}: closed", connID);
168+
closed = true;
169+
// Gracefully shutdown the gRPC managed-channel.
170+
if (channel != null && !channel.isShutdown()) {
171+
try {
172+
channel.shutdown();
173+
if (!channel.awaitTermination(1, TimeUnit.SECONDS)) {
174+
LOG.warn("Timed out gracefully shutting down connection: {}. ", connID);
140175
}
176+
} catch (Exception e) {
177+
LOG.error("Unexpected exception while waiting for channel {} gracefully termination", connID, e);
141178
}
179+
}
142180

143-
// Forcefully shutdown if still not terminated.
144-
if (channel != null && !channel.isTerminated()) {
145-
try {
146-
channel.shutdownNow();
147-
if (!channel.awaitTermination(100, TimeUnit.MILLISECONDS)) {
148-
LOG.warn("Timed out forcefully shutting down connection: {}. ", connID);
149-
}
150-
LOG.debug("Success to forcefully shutdown connection: {}. ", connID);
151-
} catch (Exception e) {
152-
LOG.error("Unexpected exception while waiting for channel {} forcefully termination", connID, e);
181+
// Forcefully shutdown if still not terminated.
182+
if (channel != null && !channel.isTerminated()) {
183+
try {
184+
channel.shutdownNow();
185+
if (!channel.awaitTermination(100, TimeUnit.MILLISECONDS)) {
186+
LOG.warn("Timed out forcefully shutting down connection: {}. ", connID);
153187
}
154-
} else {
155-
LOG.debug("Success to gracefully shutdown connection: {}. ", connID);
188+
LOG.debug("Success to forcefully shutdown connection: {}. ", connID);
189+
} catch (Exception e) {
190+
LOG.error("Unexpected exception while waiting for channel {} forcefully termination", connID, e);
156191
}
192+
} else {
193+
LOG.debug("Success to gracefully shutdown connection: {}. ", connID);
157194
}
195+
} else {
196+
LOG.info("[doCloseConnection] connection {}: ref is {}, closed is {}, skip close", connID, ref.get(), closed);
158197
}
159198
}
160199

polaris-plugins/polaris-plugins-connector/connector-polaris-grpc/src/main/java/com/tencent/polaris/plugins/connector/grpc/GrpcConnector.java

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -504,20 +504,34 @@ public void heartbeat(CommonProviderRequest req) throws PolarisException {
504504
ServiceKey serviceKey = new ServiceKey(req.getNamespace(), req.getService());
505505
long startTimestamp = 0L;
506506
try {
507+
long t1 = System.currentTimeMillis();
507508
waitDiscoverReady();
509+
long t2 = System.currentTimeMillis();
508510
connection = connectionManager
509511
.getConnection(GrpcUtil.OP_KEY_INSTANCE_HEARTBEAT, ClusterType.HEALTH_CHECK_CLUSTER);
512+
long t3 = System.currentTimeMillis();
510513
req.setTargetServer(connectionToTargetNode(connection));
514+
long t4 = System.currentTimeMillis();
511515
PolarisGRPCGrpc.PolarisGRPCBlockingStub stub = PolarisGRPCGrpc.newBlockingStub(connection.getChannel());
516+
long t5 = System.currentTimeMillis();
512517
stub = GrpcUtil.attachRequestHeader(stub, GrpcUtil.nextHeartbeatReqId());
518+
long t6 = System.currentTimeMillis();
513519
stub = GrpcUtil.attachAccessToken(connectorConfig.getToken(), stub);
514520
startTimestamp = System.currentTimeMillis();
521+
if (startTimestamp - t1 > 1000) {
522+
LOG.warn("waitDiscoverReady cost {} ms, getConnection cost {} ms, connectionToTargetNode cost {} ms, newBlockingStub cost {} ms, attachRequestHeader cost {} ms, attachAccessToken cost {} ms",
523+
t2 - t1, t3 - t2, t4 - t3, t5 - t4, t6 - t5, startTimestamp - t6);
524+
} else if (LOG.isDebugEnabled()) {
525+
LOG.debug("waitDiscoverReady cost {} ms, getConnection cost {} ms, connectionToTargetNode cost {} ms, newBlockingStub cost {} ms, attachRequestHeader cost {} ms, attachAccessToken cost {} ms",
526+
t2 - t1, t3 - t2, t4 - t3, t5 - t4, t6 - t5, startTimestamp - t6);
527+
}
515528
LOG.debug("start heartbeat at {} ms.", startTimestamp);
516529
ResponseProto.Response heartbeatResponse = stub.withDeadlineAfter(req.getTimeoutMs(),
517530
TimeUnit.MILLISECONDS).heartbeat(buildHeartbeatRequest(req));
518531
GrpcUtil.checkResponse(heartbeatResponse);
519532
LOG.debug("received heartbeat response {}", heartbeatResponse);
520533
} catch (Throwable t) {
534+
LOG.warn("heartbeat fail, req: {}, msg: {}", req, t.getMessage());
521535
if (t instanceof PolarisException) {
522536
//服务端异常不进行重试
523537
throw t;
@@ -531,7 +545,12 @@ public void heartbeat(CommonProviderRequest req) throws PolarisException {
531545
req.getInstanceID(), req.getHost(), req.getPort(), serviceKey), t);
532546
} finally {
533547
long endTimestamp = System.currentTimeMillis();
534-
LOG.debug("end heartbeat at {} ms. Diff {} ms", endTimestamp, endTimestamp - startTimestamp);
548+
long cost = endTimestamp - startTimestamp;
549+
if (cost > 1000) {
550+
LOG.warn("end heartbeat at {} ms. Diff {} ms", endTimestamp, cost);
551+
} else if (LOG.isDebugEnabled()) {
552+
LOG.debug("end heartbeat at {} ms. Diff {} ms", endTimestamp, cost);
553+
}
535554
if (null != connection) {
536555
connection.release(GrpcUtil.OP_KEY_INSTANCE_HEARTBEAT);
537556
}

0 commit comments

Comments
 (0)