1818package com .tencent .polaris .discovery .client .flow ;
1919
2020import com .tencent .polaris .api .exception .PolarisException ;
21+ import com .tencent .polaris .api .exception .ServerCodes ;
2122import com .tencent .polaris .api .rpc .InstanceHeartbeatRequest ;
2223import com .tencent .polaris .api .rpc .InstanceRegisterRequest ;
2324import com .tencent .polaris .api .rpc .InstanceRegisterResponse ;
2425import com .tencent .polaris .client .api .SDKContext ;
2526import com .tencent .polaris .client .util .NamedThreadFactory ;
2627import com .tencent .polaris .discovery .client .flow .RegisterStateManager .RegisterState ;
2728import com .tencent .polaris .logging .LoggerFactory ;
29+ import com .tencent .polaris .logging .LoggingConsts ;
30+ import org .slf4j .Logger ;
31+
2832import java .util .HashMap ;
2933import java .util .Map ;
34+ import java .util .Random ;
3035import java .util .concurrent .ScheduledThreadPoolExecutor ;
3136import java .util .concurrent .TimeUnit ;
3237
33- import com .tencent .polaris .logging .LoggingConsts ;
34- import org .slf4j .Logger ;
35-
3638/**
3739 * 异步注册流
3840 *
@@ -75,23 +77,32 @@ public InstanceRegisterResponse registerInstance(InstanceRegisterRequest request
7577 return instanceRegisterResponse ;
7678 }
7779
78- private void doRunHeartbeat (RegisterState registerState , RegisterFunction registerFunction ,
80+ void doRunHeartbeat (RegisterState registerState , RegisterFunction registerFunction ,
7981 HeartbeatFunction heartbeatFunction ) {
8082 InstanceRegisterRequest registerRequest = registerState .getInstanceRegisterRequest ();
8183 LOG .info ("[AsyncHeartbeat]Instance heartbeat task started, namespace:{}, service:{}, host:{}, port:{}" ,
8284 registerRequest .getNamespace (), registerRequest .getService (), registerRequest .getHost (),
8385 registerRequest .getPort ());
8486 try {
8587 heartbeatFunction .doHeartbeat (buildHeartbeatRequest (registerRequest ));
86- LOG .info ("[AsyncHeartbeat]Instance heartbeat success, namespace:{}, service:{}, host:{}, port:{}" ,
88+ registerState .resetFailCount ();
89+ LOG .info ("[AsyncHeartbeat]Instance heartbeat success. Reset fail count. namespace:{}, service:{}, host:{}, port:{}" ,
8790 registerRequest .getNamespace (), registerRequest .getService (), registerRequest .getHost (),
8891 registerRequest .getPort ());
8992 } catch (PolarisException e ) {
90- registerState .incrementFailCount ();
91- LOG .error (
92- "[AsyncHeartbeat]Instance heartbeat failed, namespace:{}, service:{}, host:{}, port:{}, heartbeat fail count:{}" ,
93- registerRequest .getNamespace (), registerRequest .getService (), registerRequest .getHost (),
94- registerRequest .getPort (), registerState .getHeartbeatFailCounter (), e );
93+ if (e .getServerErrCode () == ServerCodes .NOT_FOUND_RESOURCE ) {
94+ registerState .incrementFailCount ();
95+ LOG .error ("[AsyncHeartbeat]Instance heartbeat failed because of NOT_FOUND_RESOURCE. Increase fail count. " +
96+ "namespace:{}, service:{}, host:{}, port:{}, heartbeat fail count:{}" ,
97+ registerRequest .getNamespace (), registerRequest .getService (), registerRequest .getHost (),
98+ registerRequest .getPort (), registerState .getHeartbeatFailCounter (), e );
99+ } else {
100+ registerState .resetFailCount ();
101+ LOG .error ("[AsyncHeartbeat]Instance heartbeat failed not because of NOT_FOUND_RESOURCE. Reset fail count. " +
102+ "namespace:{}, service:{}, host:{}, port:{}, heartbeat fail count:{}" ,
103+ registerRequest .getNamespace (), registerRequest .getService (), registerRequest .getHost (),
104+ registerRequest .getPort (), registerState .getHeartbeatFailCounter (), e );
105+ }
95106 }
96107
97108 long minRegisterInterval = sdkContext .getConfig ().getProvider ().getMinRegisterInterval ();
@@ -100,17 +111,35 @@ private void doRunHeartbeat(RegisterState registerState, RegisterFunction regist
100111 || registerState .getHeartbeatFailCounter () < HEARTBEAT_FAIL_COUNT_THRESHOLD ) {
101112 return ;
102113 }
103- try {
104- registerFunction .doRegister (registerRequest , createRegisterV2Header ());
105- LOG .info ("[AsyncHeartbeat]Re-register instance success, namespace:{}, service:{}, host:{}, port:{}" ,
106- registerRequest .getNamespace (), registerRequest .getService (), registerRequest .getHost (),
107- registerRequest .getPort ());
108- registerState .resetFailCount ();
109- } catch (PolarisException e ) {
110- LOG .error (
111- "[AsyncHeartbeat]Re-register instance failed, namespace:{}, service:{}, host:{}, port:{}, err:{}" ,
112- registerRequest .getNamespace (), registerRequest .getService (), registerRequest .getHost (),
113- registerRequest .getPort (), e );
114+
115+ synchronized (registerState ) {
116+ if (registerState .getReRegisterFuture () == null
117+ || registerState .getReRegisterFuture ().isDone ()
118+ || registerState .getReRegisterFuture ().isCancelled ()) {
119+ int reRegisterCounter = registerState .getReRegisterCounter ();
120+ double base = reRegisterCounter == 0 ? 0 : registerRequest .getTtl () * Math .pow (2 , reRegisterCounter - 1 );
121+ int offset = reRegisterCounter == 0 ? 0 : new Random ().nextInt (registerRequest .getTtl ());
122+ long delay = (long ) Math .min (base + offset , 60 );
123+ LOG .info ("[AsyncHeartbeat]Re-register instance, namespace:{}, service:{}, host:{}, port:{}, count:{}, delay:{}s" ,
124+ registerRequest .getNamespace (), registerRequest .getService (), registerRequest .getHost (),
125+ registerRequest .getPort (), reRegisterCounter , delay );
126+ registerState .setReRegisterFuture (registerState .getReRegisterExecutor ().schedule (() -> {
127+ try {
128+ registerFunction .doRegister (registerRequest , createRegisterV2Header ());
129+ LOG .info ("[AsyncHeartbeat]Re-register instance success, namespace:{}, service:{}, host:{}, port:{}" ,
130+ registerRequest .getNamespace (), registerRequest .getService (), registerRequest .getHost (),
131+ registerRequest .getPort ());
132+ registerState .resetFailCount ();
133+ registerState .resetReRegisterCounter ();
134+ } catch (PolarisException e ) {
135+ LOG .error (
136+ "[AsyncHeartbeat]Re-register instance failed, namespace:{}, service:{}, host:{}, port:{}, re-register count:{}" ,
137+ registerRequest .getNamespace (), registerRequest .getService (), registerRequest .getHost (),
138+ registerRequest .getPort (), reRegisterCounter , e );
139+ }
140+ }, delay , TimeUnit .SECONDS ));
141+ registerState .incrementReRegisterCounter ();
142+ }
114143 }
115144 }
116145
0 commit comments