Skip to content

Commit 7c41728

Browse files
committed
feat:optimize heartbeat and re-register.
1 parent 98a300f commit 7c41728

File tree

3 files changed

+292
-24
lines changed

3 files changed

+292
-24
lines changed

polaris-discovery/polaris-discovery-client/src/main/java/com/tencent/polaris/discovery/client/flow/RegisterFlow.java

Lines changed: 48 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -18,21 +18,23 @@
1818
package com.tencent.polaris.discovery.client.flow;
1919

2020
import com.tencent.polaris.api.exception.PolarisException;
21+
import com.tencent.polaris.api.exception.ServerCodes;
2122
import com.tencent.polaris.api.rpc.InstanceHeartbeatRequest;
2223
import com.tencent.polaris.api.rpc.InstanceRegisterRequest;
2324
import com.tencent.polaris.api.rpc.InstanceRegisterResponse;
2425
import com.tencent.polaris.client.api.SDKContext;
2526
import com.tencent.polaris.client.util.NamedThreadFactory;
2627
import com.tencent.polaris.discovery.client.flow.RegisterStateManager.RegisterState;
2728
import com.tencent.polaris.logging.LoggerFactory;
29+
import com.tencent.polaris.logging.LoggingConsts;
30+
import org.slf4j.Logger;
31+
2832
import java.util.HashMap;
2933
import java.util.Map;
34+
import java.util.Random;
3035
import java.util.concurrent.ScheduledThreadPoolExecutor;
3136
import java.util.concurrent.TimeUnit;
3237

33-
import com.tencent.polaris.logging.LoggingConsts;
34-
import org.slf4j.Logger;
35-
3638
/**
3739
* 异步注册流
3840
*
@@ -75,19 +77,30 @@ public InstanceRegisterResponse registerInstance(InstanceRegisterRequest request
7577
return instanceRegisterResponse;
7678
}
7779

78-
private void doRunHeartbeat(RegisterState registerState, RegisterFunction registerFunction,
80+
void doRunHeartbeat(RegisterState registerState, RegisterFunction registerFunction,
7981
HeartbeatFunction heartbeatFunction) {
8082
InstanceRegisterRequest registerRequest = registerState.getInstanceRegisterRequest();
8183
LOG.info("[AsyncHeartbeat]Instance heartbeat task started, namespace:{}, service:{}, host:{}, port:{}",
8284
registerRequest.getNamespace(), registerRequest.getService(), registerRequest.getHost(),
8385
registerRequest.getPort());
8486
try {
8587
heartbeatFunction.doHeartbeat(buildHeartbeatRequest(registerRequest));
86-
LOG.info("[AsyncHeartbeat]Instance heartbeat success, namespace:{}, service:{}, host:{}, port:{}",
88+
registerState.resetFailCount();
89+
LOG.info("[AsyncHeartbeat]Instance heartbeat success. Reset fail count. namespace:{}, service:{}, host:{}, port:{}",
8790
registerRequest.getNamespace(), registerRequest.getService(), registerRequest.getHost(),
8891
registerRequest.getPort());
8992
} catch (PolarisException e) {
90-
registerState.incrementFailCount();
93+
if (e.getServerErrCode() == ServerCodes.NOT_FOUND_RESOURCE) {
94+
registerState.incrementFailCount();
95+
LOG.debug("[AsyncHeartbeat]Instance heartbeat failed because of NOT_FOUND_RESOURCE. Increase fail count. namespace:{}, service:{}, host:{}, port:{}, heartbeat fail count:{}",
96+
registerRequest.getNamespace(), registerRequest.getService(), registerRequest.getHost(),
97+
registerRequest.getPort(), registerState.getHeartbeatFailCounter());
98+
} else {
99+
registerState.resetFailCount();
100+
LOG.debug("[AsyncHeartbeat]Instance heartbeat failed not because of NOT_FOUND_RESOURCE. Reset fail count. namespace:{}, service:{}, host:{}, port:{}, heartbeat fail count:{}",
101+
registerRequest.getNamespace(), registerRequest.getService(), registerRequest.getHost(),
102+
registerRequest.getPort(), registerState.getHeartbeatFailCounter());
103+
}
91104
LOG.error(
92105
"[AsyncHeartbeat]Instance heartbeat failed, namespace:{}, service:{}, host:{}, port:{}, heartbeat fail count:{}",
93106
registerRequest.getNamespace(), registerRequest.getService(), registerRequest.getHost(),
@@ -100,17 +113,35 @@ private void doRunHeartbeat(RegisterState registerState, RegisterFunction regist
100113
|| registerState.getHeartbeatFailCounter() < HEARTBEAT_FAIL_COUNT_THRESHOLD) {
101114
return;
102115
}
103-
try {
104-
registerFunction.doRegister(registerRequest, createRegisterV2Header());
105-
LOG.info("[AsyncHeartbeat]Re-register instance success, namespace:{}, service:{}, host:{}, port:{}",
106-
registerRequest.getNamespace(), registerRequest.getService(), registerRequest.getHost(),
107-
registerRequest.getPort());
108-
registerState.resetFailCount();
109-
} catch (PolarisException e) {
110-
LOG.error(
111-
"[AsyncHeartbeat]Re-register instance failed, namespace:{}, service:{}, host:{}, port:{}, err:{}",
112-
registerRequest.getNamespace(), registerRequest.getService(), registerRequest.getHost(),
113-
registerRequest.getPort(), e);
116+
117+
synchronized (registerState) {
118+
if (registerState.getReRegisterFuture() == null
119+
|| registerState.getReRegisterFuture().isDone()
120+
|| registerState.getReRegisterFuture().isCancelled()) {
121+
int reRegisterCounter = registerState.getReRegisterCounter();
122+
double base = reRegisterCounter == 0 ? 0 : registerRequest.getTtl() * Math.pow(2, reRegisterCounter - 1);
123+
int offset = reRegisterCounter == 0 ? 0 : new Random().nextInt(registerRequest.getTtl());
124+
long delay = (long) Math.min(base + offset, 60);
125+
LOG.info("[AsyncHeartbeat]Re-register instance, namespace:{}, service:{}, host:{}, port:{}, count:{}, delay:{}s",
126+
registerRequest.getNamespace(), registerRequest.getService(), registerRequest.getHost(),
127+
registerRequest.getPort(), reRegisterCounter, delay);
128+
registerState.setReRegisterFuture(registerState.getReRegisterExecutor().schedule(() -> {
129+
try {
130+
registerFunction.doRegister(registerRequest, createRegisterV2Header());
131+
LOG.info("[AsyncHeartbeat]Re-register instance success, namespace:{}, service:{}, host:{}, port:{}",
132+
registerRequest.getNamespace(), registerRequest.getService(), registerRequest.getHost(),
133+
registerRequest.getPort());
134+
registerState.resetFailCount();
135+
registerState.resetReRegisterCounter();
136+
} catch (PolarisException e) {
137+
LOG.error(
138+
"[AsyncHeartbeat]Re-register instance failed, namespace:{}, service:{}, host:{}, port:{}, re-register count:{}",
139+
registerRequest.getNamespace(), registerRequest.getService(), registerRequest.getHost(),
140+
registerRequest.getPort(), reRegisterCounter, e);
141+
}
142+
}, delay, TimeUnit.SECONDS));
143+
registerState.incrementReRegisterCounter();
144+
}
114145
}
115146
}
116147

polaris-discovery/polaris-discovery-client/src/main/java/com/tencent/polaris/discovery/client/flow/RegisterStateManager.java

Lines changed: 58 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -21,10 +21,16 @@
2121
import com.tencent.polaris.api.rpc.InstanceDeregisterRequest;
2222
import com.tencent.polaris.api.rpc.InstanceRegisterRequest;
2323
import com.tencent.polaris.client.api.SDKContext;
24+
import com.tencent.polaris.logging.LoggerFactory;
25+
import org.slf4j.Logger;
26+
2427
import java.util.Map;
2528
import java.util.Optional;
2629
import java.util.concurrent.ConcurrentHashMap;
30+
import java.util.concurrent.Executors;
31+
import java.util.concurrent.ScheduledExecutorService;
2732
import java.util.concurrent.ScheduledFuture;
33+
import java.util.concurrent.atomic.AtomicInteger;
2834

2935
/**
3036
* 注册状态管理器
@@ -33,6 +39,8 @@
3339
*/
3440
public class RegisterStateManager {
3541

42+
private static final Logger LOG = LoggerFactory.getLogger(RegisterStateManager.class);
43+
3644
private final static Map<String, Map<String, RegisterState>> REGISTER_STATES = new ConcurrentHashMap<>();
3745

3846
/**
@@ -69,15 +77,15 @@ public static void removeRegisterState(SDKContext sdkContext, InstanceDeregister
6977
.ifPresent(sdkRegisterStates -> {
7078
String registerStateKey = buildRegisterStateKey(instanceDeregisterRequest);
7179
Optional.ofNullable(sdkRegisterStates.remove(registerStateKey))
72-
.ifPresent(registerState -> registerState.getTaskFuture().cancel(false));
80+
.ifPresent(RegisterState::destroy);
7381
});
7482
}
7583

7684
public static void destroy(SDKContext sdkContext) {
7785
Optional.ofNullable(REGISTER_STATES.remove(sdkContext.getValueContext().getClientId()))
7886
.ifPresent(sdkRegisterStates -> {
7987
for (RegisterState registerState : sdkRegisterStates.values()) {
80-
registerState.getTaskFuture().cancel(false);
88+
registerState.destroy();
8189
}
8290
sdkRegisterStates.clear();
8391
});
@@ -88,26 +96,33 @@ private static String buildRegisterStateKey(CommonProviderBaseEntity baseEntity)
8896
baseEntity.getPort());
8997
}
9098

91-
public static final class RegisterState {
99+
public static class RegisterState {
92100

93101
private InstanceRegisterRequest instanceRegisterRequest;
94102
private long firstRegisterTime;
95103
private ScheduledFuture<?> taskFuture;
96-
private int heartbeatFailCounter = 0;
104+
private final AtomicInteger heartbeatFailCounter = new AtomicInteger(0);
105+
private ScheduledFuture<?> reRegisterFuture;
106+
private final ScheduledExecutorService reRegisterExecutor;
107+
private final AtomicInteger reRegisterCounter = new AtomicInteger(0);
108+
109+
public RegisterState() {
110+
this.reRegisterExecutor = Executors.newSingleThreadScheduledExecutor();
111+
}
97112

98113
/**
99114
* Increment fail count by one
100115
*/
101116
public void incrementFailCount() {
102-
heartbeatFailCounter += 1;
117+
heartbeatFailCounter.incrementAndGet();
103118
}
104119

105120
public int getHeartbeatFailCounter() {
106-
return heartbeatFailCounter;
121+
return heartbeatFailCounter.get();
107122
}
108123

109124
public void resetFailCount() {
110-
heartbeatFailCounter = 0;
125+
heartbeatFailCounter.set(0);
111126
}
112127

113128
public InstanceRegisterRequest getInstanceRegisterRequest() {
@@ -133,5 +148,41 @@ public ScheduledFuture<?> getTaskFuture() {
133148
public void setTaskFuture(ScheduledFuture<?> taskFuture) {
134149
this.taskFuture = taskFuture;
135150
}
151+
152+
public ScheduledFuture<?> getReRegisterFuture() {
153+
return reRegisterFuture;
154+
}
155+
156+
public void setReRegisterFuture(ScheduledFuture<?> reRegisterFuture) {
157+
this.reRegisterFuture = reRegisterFuture;
158+
}
159+
160+
public ScheduledExecutorService getReRegisterExecutor() {
161+
return reRegisterExecutor;
162+
}
163+
164+
public int getReRegisterCounter() {
165+
return reRegisterCounter.get();
166+
}
167+
168+
public void incrementReRegisterCounter() {
169+
reRegisterCounter.incrementAndGet();
170+
}
171+
172+
public void resetReRegisterCounter() {
173+
reRegisterCounter.set(0);
174+
}
175+
176+
public void destroy() {
177+
try {
178+
getTaskFuture().cancel(false);
179+
getReRegisterFuture().cancel(false);
180+
getReRegisterExecutor().shutdownNow();
181+
} catch (Throwable throwable) {
182+
LOG.warn("[RegisterState] destroy error. namespace:{}, service:{}, host:{}, port:{}.",
183+
getInstanceRegisterRequest().getNamespace(), getInstanceRegisterRequest().getService(),
184+
getInstanceRegisterRequest().getHost(), getInstanceRegisterRequest().getPort(), throwable);
185+
}
186+
}
136187
}
137188
}

0 commit comments

Comments
 (0)