From 3f1fde4496e21d48f4ba10872a9817af5d023a0d Mon Sep 17 00:00:00 2001 From: Haotian Zhang <928016560@qq.com> Date: Mon, 31 Mar 2025 20:02:22 +0800 Subject: [PATCH 1/2] feat:optimize heartbeat and re-register. --- .../discovery/client/flow/RegisterFlow.java | 65 ++++-- .../client/flow/RegisterStateManager.java | 65 +++++- .../client/flow/RegisterFlowTest.java | 186 ++++++++++++++++++ 3 files changed, 292 insertions(+), 24 deletions(-) create mode 100644 polaris-discovery/polaris-discovery-client/src/test/java/com/tencent/polaris/discovery/client/flow/RegisterFlowTest.java diff --git a/polaris-discovery/polaris-discovery-client/src/main/java/com/tencent/polaris/discovery/client/flow/RegisterFlow.java b/polaris-discovery/polaris-discovery-client/src/main/java/com/tencent/polaris/discovery/client/flow/RegisterFlow.java index dedbac247..4e731607f 100644 --- a/polaris-discovery/polaris-discovery-client/src/main/java/com/tencent/polaris/discovery/client/flow/RegisterFlow.java +++ b/polaris-discovery/polaris-discovery-client/src/main/java/com/tencent/polaris/discovery/client/flow/RegisterFlow.java @@ -18,6 +18,7 @@ package com.tencent.polaris.discovery.client.flow; import com.tencent.polaris.api.exception.PolarisException; +import com.tencent.polaris.api.exception.ServerCodes; import com.tencent.polaris.api.rpc.InstanceHeartbeatRequest; import com.tencent.polaris.api.rpc.InstanceRegisterRequest; import com.tencent.polaris.api.rpc.InstanceRegisterResponse; @@ -25,14 +26,15 @@ import com.tencent.polaris.client.util.NamedThreadFactory; import com.tencent.polaris.discovery.client.flow.RegisterStateManager.RegisterState; import com.tencent.polaris.logging.LoggerFactory; +import com.tencent.polaris.logging.LoggingConsts; +import org.slf4j.Logger; + import java.util.HashMap; import java.util.Map; +import java.util.Random; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; -import com.tencent.polaris.logging.LoggingConsts; -import org.slf4j.Logger; - /** * 异步注册流 * @@ -75,7 +77,7 @@ public InstanceRegisterResponse registerInstance(InstanceRegisterRequest request return instanceRegisterResponse; } - private void doRunHeartbeat(RegisterState registerState, RegisterFunction registerFunction, + void doRunHeartbeat(RegisterState registerState, RegisterFunction registerFunction, HeartbeatFunction heartbeatFunction) { InstanceRegisterRequest registerRequest = registerState.getInstanceRegisterRequest(); LOG.info("[AsyncHeartbeat]Instance heartbeat task started, namespace:{}, service:{}, host:{}, port:{}", @@ -83,11 +85,22 @@ private void doRunHeartbeat(RegisterState registerState, RegisterFunction regist registerRequest.getPort()); try { heartbeatFunction.doHeartbeat(buildHeartbeatRequest(registerRequest)); - LOG.info("[AsyncHeartbeat]Instance heartbeat success, namespace:{}, service:{}, host:{}, port:{}", + registerState.resetFailCount(); + LOG.info("[AsyncHeartbeat]Instance heartbeat success. Reset fail count. namespace:{}, service:{}, host:{}, port:{}", registerRequest.getNamespace(), registerRequest.getService(), registerRequest.getHost(), registerRequest.getPort()); } catch (PolarisException e) { - registerState.incrementFailCount(); + if (e.getServerErrCode() == ServerCodes.NOT_FOUND_RESOURCE) { + registerState.incrementFailCount(); + LOG.debug("[AsyncHeartbeat]Instance heartbeat failed because of NOT_FOUND_RESOURCE. Increase fail count. namespace:{}, service:{}, host:{}, port:{}, heartbeat fail count:{}", + registerRequest.getNamespace(), registerRequest.getService(), registerRequest.getHost(), + registerRequest.getPort(), registerState.getHeartbeatFailCounter()); + } else { + registerState.resetFailCount(); + LOG.debug("[AsyncHeartbeat]Instance heartbeat failed not because of NOT_FOUND_RESOURCE. Reset fail count. namespace:{}, service:{}, host:{}, port:{}, heartbeat fail count:{}", + registerRequest.getNamespace(), registerRequest.getService(), registerRequest.getHost(), + registerRequest.getPort(), registerState.getHeartbeatFailCounter()); + } LOG.error( "[AsyncHeartbeat]Instance heartbeat failed, namespace:{}, service:{}, host:{}, port:{}, heartbeat fail count:{}", registerRequest.getNamespace(), registerRequest.getService(), registerRequest.getHost(), @@ -100,17 +113,35 @@ private void doRunHeartbeat(RegisterState registerState, RegisterFunction regist || registerState.getHeartbeatFailCounter() < HEARTBEAT_FAIL_COUNT_THRESHOLD) { return; } - try { - registerFunction.doRegister(registerRequest, createRegisterV2Header()); - LOG.info("[AsyncHeartbeat]Re-register instance success, namespace:{}, service:{}, host:{}, port:{}", - registerRequest.getNamespace(), registerRequest.getService(), registerRequest.getHost(), - registerRequest.getPort()); - registerState.resetFailCount(); - } catch (PolarisException e) { - LOG.error( - "[AsyncHeartbeat]Re-register instance failed, namespace:{}, service:{}, host:{}, port:{}, err:{}", - registerRequest.getNamespace(), registerRequest.getService(), registerRequest.getHost(), - registerRequest.getPort(), e); + + synchronized (registerState) { + if (registerState.getReRegisterFuture() == null + || registerState.getReRegisterFuture().isDone() + || registerState.getReRegisterFuture().isCancelled()) { + int reRegisterCounter = registerState.getReRegisterCounter(); + double base = reRegisterCounter == 0 ? 0 : registerRequest.getTtl() * Math.pow(2, reRegisterCounter - 1); + int offset = reRegisterCounter == 0 ? 0 : new Random().nextInt(registerRequest.getTtl()); + long delay = (long) Math.min(base + offset, 60); + LOG.info("[AsyncHeartbeat]Re-register instance, namespace:{}, service:{}, host:{}, port:{}, count:{}, delay:{}s", + registerRequest.getNamespace(), registerRequest.getService(), registerRequest.getHost(), + registerRequest.getPort(), reRegisterCounter, delay); + registerState.setReRegisterFuture(registerState.getReRegisterExecutor().schedule(() -> { + try { + registerFunction.doRegister(registerRequest, createRegisterV2Header()); + LOG.info("[AsyncHeartbeat]Re-register instance success, namespace:{}, service:{}, host:{}, port:{}", + registerRequest.getNamespace(), registerRequest.getService(), registerRequest.getHost(), + registerRequest.getPort()); + registerState.resetFailCount(); + registerState.resetReRegisterCounter(); + } catch (PolarisException e) { + LOG.error( + "[AsyncHeartbeat]Re-register instance failed, namespace:{}, service:{}, host:{}, port:{}, re-register count:{}", + registerRequest.getNamespace(), registerRequest.getService(), registerRequest.getHost(), + registerRequest.getPort(), reRegisterCounter, e); + } + }, delay, TimeUnit.SECONDS)); + registerState.incrementReRegisterCounter(); + } } } diff --git a/polaris-discovery/polaris-discovery-client/src/main/java/com/tencent/polaris/discovery/client/flow/RegisterStateManager.java b/polaris-discovery/polaris-discovery-client/src/main/java/com/tencent/polaris/discovery/client/flow/RegisterStateManager.java index 97e26096b..46917cd76 100644 --- a/polaris-discovery/polaris-discovery-client/src/main/java/com/tencent/polaris/discovery/client/flow/RegisterStateManager.java +++ b/polaris-discovery/polaris-discovery-client/src/main/java/com/tencent/polaris/discovery/client/flow/RegisterStateManager.java @@ -21,10 +21,16 @@ import com.tencent.polaris.api.rpc.InstanceDeregisterRequest; import com.tencent.polaris.api.rpc.InstanceRegisterRequest; import com.tencent.polaris.client.api.SDKContext; +import com.tencent.polaris.logging.LoggerFactory; +import org.slf4j.Logger; + import java.util.Map; import java.util.Optional; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.atomic.AtomicInteger; /** * 注册状态管理器 @@ -33,6 +39,8 @@ */ public class RegisterStateManager { + private static final Logger LOG = LoggerFactory.getLogger(RegisterStateManager.class); + private final static Map> REGISTER_STATES = new ConcurrentHashMap<>(); /** @@ -69,7 +77,7 @@ public static void removeRegisterState(SDKContext sdkContext, InstanceDeregister .ifPresent(sdkRegisterStates -> { String registerStateKey = buildRegisterStateKey(instanceDeregisterRequest); Optional.ofNullable(sdkRegisterStates.remove(registerStateKey)) - .ifPresent(registerState -> registerState.getTaskFuture().cancel(false)); + .ifPresent(RegisterState::destroy); }); } @@ -77,7 +85,7 @@ public static void destroy(SDKContext sdkContext) { Optional.ofNullable(REGISTER_STATES.remove(sdkContext.getValueContext().getClientId())) .ifPresent(sdkRegisterStates -> { for (RegisterState registerState : sdkRegisterStates.values()) { - registerState.getTaskFuture().cancel(false); + registerState.destroy(); } sdkRegisterStates.clear(); }); @@ -88,26 +96,33 @@ private static String buildRegisterStateKey(CommonProviderBaseEntity baseEntity) baseEntity.getPort()); } - public static final class RegisterState { + public static class RegisterState { private InstanceRegisterRequest instanceRegisterRequest; private long firstRegisterTime; private ScheduledFuture taskFuture; - private int heartbeatFailCounter = 0; + private final AtomicInteger heartbeatFailCounter = new AtomicInteger(0); + private ScheduledFuture reRegisterFuture; + private final ScheduledExecutorService reRegisterExecutor; + private final AtomicInteger reRegisterCounter = new AtomicInteger(0); + + public RegisterState() { + this.reRegisterExecutor = Executors.newSingleThreadScheduledExecutor(); + } /** * Increment fail count by one */ public void incrementFailCount() { - heartbeatFailCounter += 1; + heartbeatFailCounter.incrementAndGet(); } public int getHeartbeatFailCounter() { - return heartbeatFailCounter; + return heartbeatFailCounter.get(); } public void resetFailCount() { - heartbeatFailCounter = 0; + heartbeatFailCounter.set(0); } public InstanceRegisterRequest getInstanceRegisterRequest() { @@ -133,5 +148,41 @@ public ScheduledFuture getTaskFuture() { public void setTaskFuture(ScheduledFuture taskFuture) { this.taskFuture = taskFuture; } + + public ScheduledFuture getReRegisterFuture() { + return reRegisterFuture; + } + + public void setReRegisterFuture(ScheduledFuture reRegisterFuture) { + this.reRegisterFuture = reRegisterFuture; + } + + public ScheduledExecutorService getReRegisterExecutor() { + return reRegisterExecutor; + } + + public int getReRegisterCounter() { + return reRegisterCounter.get(); + } + + public void incrementReRegisterCounter() { + reRegisterCounter.incrementAndGet(); + } + + public void resetReRegisterCounter() { + reRegisterCounter.set(0); + } + + public void destroy() { + try { + getTaskFuture().cancel(false); + getReRegisterFuture().cancel(false); + getReRegisterExecutor().shutdownNow(); + } catch (Throwable throwable) { + LOG.warn("[RegisterState] destroy error. namespace:{}, service:{}, host:{}, port:{}.", + getInstanceRegisterRequest().getNamespace(), getInstanceRegisterRequest().getService(), + getInstanceRegisterRequest().getHost(), getInstanceRegisterRequest().getPort(), throwable); + } + } } } diff --git a/polaris-discovery/polaris-discovery-client/src/test/java/com/tencent/polaris/discovery/client/flow/RegisterFlowTest.java b/polaris-discovery/polaris-discovery-client/src/test/java/com/tencent/polaris/discovery/client/flow/RegisterFlowTest.java new file mode 100644 index 000000000..ff1ca0cf5 --- /dev/null +++ b/polaris-discovery/polaris-discovery-client/src/test/java/com/tencent/polaris/discovery/client/flow/RegisterFlowTest.java @@ -0,0 +1,186 @@ +/* + * Tencent is pleased to support the open source community by making polaris-java available. + * + * Copyright (C) 2021 THL A29 Limited, a Tencent company. All rights reserved. + * + * Licensed under the BSD 3-Clause License (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://opensource.org/licenses/BSD-3-Clause + * + * Unless required by applicable law or agreed to in writing, software distributed + * under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR + * CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package com.tencent.polaris.discovery.client.flow; + +import com.tencent.polaris.api.config.Configuration; +import com.tencent.polaris.api.config.provider.ProviderConfig; +import com.tencent.polaris.api.exception.ErrorCode; +import com.tencent.polaris.api.exception.PolarisException; +import com.tencent.polaris.api.exception.ServerCodes; +import com.tencent.polaris.api.rpc.InstanceHeartbeatRequest; +import com.tencent.polaris.api.rpc.InstanceRegisterRequest; +import com.tencent.polaris.client.api.SDKContext; +import com.tencent.polaris.discovery.client.flow.RegisterFlow.HeartbeatFunction; +import com.tencent.polaris.discovery.client.flow.RegisterFlow.RegisterFunction; +import com.tencent.polaris.discovery.client.flow.RegisterStateManager.RegisterState; +import org.junit.Before; +import org.junit.Test; + +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.Mockito.*; + +/** + * Test for {@link RegisterFlow}. + * + * @author Haotian Zhang + */ +public class RegisterFlowTest { + + private RegisterFlow registerFlow; + private RegisterState registerState; + private RegisterFunction registerFunction; + private HeartbeatFunction heartbeatFunction; + private ScheduledExecutorService reRegisterExecutor; + private ScheduledFuture reRegisterFuture; + + @Before + public void setUp() { + ProviderConfig providerConfig = mock(ProviderConfig.class); + when(providerConfig.getHeartbeatWorkerSize()).thenReturn(4); + when(providerConfig.getMinRegisterInterval()).thenReturn(30000L); + Configuration configuration = mock(Configuration.class); + when(configuration.getProvider()).thenReturn(providerConfig); + SDKContext sdkContext = mock(SDKContext.class); + when(sdkContext.getConfig()).thenReturn(configuration); + registerFlow = new RegisterFlow(sdkContext); + + registerState = mock(RegisterState.class); + registerFunction = mock(RegisterFunction.class); + heartbeatFunction = mock(HeartbeatFunction.class); + InstanceRegisterRequest registerRequest = mock(InstanceRegisterRequest.class); + reRegisterExecutor = mock(ScheduledExecutorService.class); + reRegisterFuture = mock(ScheduledFuture.class); + + when(registerState.getInstanceRegisterRequest()).thenReturn(registerRequest); + when(registerState.getReRegisterExecutor()).thenReturn(reRegisterExecutor); + when(registerState.getFirstRegisterTime()).thenReturn(System.currentTimeMillis()); + when(registerState.getHeartbeatFailCounter()).thenReturn(0); + when(registerState.getReRegisterCounter()).thenReturn(0); + + when(registerRequest.getNamespace()).thenReturn("test-namespace"); + when(registerRequest.getService()).thenReturn("test-service"); + when(registerRequest.getHost()).thenReturn("127.0.0.1"); + when(registerRequest.getPort()).thenReturn(8080); + when(registerRequest.getTtl()).thenReturn(5); + } + + @Test + public void testDoRunHeartbeat_Success() throws PolarisException { + // 准备 + doNothing().when(heartbeatFunction).doHeartbeat(any(InstanceHeartbeatRequest.class)); + + // 执行 + registerFlow.doRunHeartbeat(registerState, registerFunction, heartbeatFunction); + + // 验证 + verify(heartbeatFunction).doHeartbeat(any(InstanceHeartbeatRequest.class)); + verify(registerState).resetFailCount(); + verify(registerState, never()).incrementFailCount(); + } + + @Test + public void testDoRunHeartbeat_Failure_NotFoundResource() throws PolarisException { + // 准备 + PolarisException notFoundException = new PolarisException(ErrorCode.SERVER_USER_ERROR, "Not Found Resource"); + notFoundException.setServerErrCode(ServerCodes.NOT_FOUND_RESOURCE); + doThrow(notFoundException).when(heartbeatFunction).doHeartbeat(any(InstanceHeartbeatRequest.class)); + + // 执行 + registerFlow.doRunHeartbeat(registerState, registerFunction, heartbeatFunction); + + // 验证 + verify(heartbeatFunction).doHeartbeat(any(InstanceHeartbeatRequest.class)); + verify(registerState).incrementFailCount(); + verify(registerState, never()).resetFailCount(); + } + + @Test + public void testDoRunHeartbeat_Failure_OtherError() throws PolarisException { + // 准备 + PolarisException otherException = new PolarisException(ErrorCode.SERVER_USER_ERROR, "Internal error"); + doThrow(otherException).when(heartbeatFunction).doHeartbeat(any(InstanceHeartbeatRequest.class)); + + // 执行 + registerFlow.doRunHeartbeat(registerState, registerFunction, heartbeatFunction); + + // 验证 + verify(heartbeatFunction).doHeartbeat(any(InstanceHeartbeatRequest.class)); + verify(registerState).resetFailCount(); + verify(registerState, never()).incrementFailCount(); + } + + @Test + public void testDoRunHeartbeat_ReRegister_WhenFailCountExceedsThreshold() throws PolarisException { + // 准备 + PolarisException notFoundException = new PolarisException(ErrorCode.SERVER_USER_ERROR, "Not Found Resource"); + notFoundException.setServerErrCode(ServerCodes.NOT_FOUND_RESOURCE); + doThrow(notFoundException).when(heartbeatFunction).doHeartbeat(any(InstanceHeartbeatRequest.class)); + when(registerState.getHeartbeatFailCounter()).thenReturn(2); + when(registerState.getReRegisterFuture()).thenReturn(null); + when(registerState.getFirstRegisterTime()).thenReturn(System.currentTimeMillis() - 30000); // 30s前注册 + when(reRegisterExecutor.schedule(any(Runnable.class), anyLong(), any(TimeUnit.class))).thenReturn(reRegisterFuture); + + // 执行 + registerFlow.doRunHeartbeat(registerState, registerFunction, heartbeatFunction); + + // 验证 + verify(registerState).setReRegisterFuture(any(ScheduledFuture.class)); + verify(registerState).incrementReRegisterCounter(); + } + + @Test + public void testDoRunHeartbeat_NoReRegister_WhenMinIntervalNotReached() throws PolarisException { + // 准备 + PolarisException notFoundException = new PolarisException(ErrorCode.SERVER_USER_ERROR, "Not Found Resource"); + notFoundException.setServerErrCode(ServerCodes.NOT_FOUND_RESOURCE); + doThrow(notFoundException).when(heartbeatFunction).doHeartbeat(any(InstanceHeartbeatRequest.class)); + when(registerState.getHeartbeatFailCounter()).thenReturn(2); + when(registerState.getFirstRegisterTime()).thenReturn(System.currentTimeMillis()); // 刚刚注册 + + // 执行 + registerFlow.doRunHeartbeat(registerState, registerFunction, heartbeatFunction); + + // 验证 + verify(registerState, never()).setReRegisterFuture(any(ScheduledFuture.class)); + verify(registerState, never()).incrementReRegisterCounter(); + } + + @Test + public void testDoRunHeartbeat_NoReRegister_WhenFutureNotDone() throws PolarisException { + // 准备 + PolarisException notFoundException = new PolarisException(ErrorCode.SERVER_USER_ERROR, "Not Found Resource"); + notFoundException.setServerErrCode(ServerCodes.NOT_FOUND_RESOURCE); + doThrow(notFoundException).when(heartbeatFunction).doHeartbeat(any(InstanceHeartbeatRequest.class)); + when(registerState.getHeartbeatFailCounter()).thenReturn(2); + when(registerState.getReRegisterFuture()).thenReturn(reRegisterFuture); + when(reRegisterFuture.isDone()).thenReturn(false); + when(reRegisterFuture.isCancelled()).thenReturn(false); + + // 执行 + registerFlow.doRunHeartbeat(registerState, registerFunction, heartbeatFunction); + + // 验证 + verify(registerState, never()).setReRegisterFuture(any(ScheduledFuture.class)); + verify(registerState, never()).incrementReRegisterCounter(); + } +} From 50f7ec5047898d6754eef88ca42dd6a4a6f346c4 Mon Sep 17 00:00:00 2001 From: Haotian Zhang <928016560@qq.com> Date: Mon, 7 Apr 2025 16:31:29 +0800 Subject: [PATCH 2/2] feat:optimize heartbeat and re-register. --- .../discovery/client/flow/RegisterFlow.java | 14 ++++++-------- 1 file changed, 6 insertions(+), 8 deletions(-) diff --git a/polaris-discovery/polaris-discovery-client/src/main/java/com/tencent/polaris/discovery/client/flow/RegisterFlow.java b/polaris-discovery/polaris-discovery-client/src/main/java/com/tencent/polaris/discovery/client/flow/RegisterFlow.java index 4e731607f..34de38789 100644 --- a/polaris-discovery/polaris-discovery-client/src/main/java/com/tencent/polaris/discovery/client/flow/RegisterFlow.java +++ b/polaris-discovery/polaris-discovery-client/src/main/java/com/tencent/polaris/discovery/client/flow/RegisterFlow.java @@ -92,19 +92,17 @@ void doRunHeartbeat(RegisterState registerState, RegisterFunction registerFuncti } catch (PolarisException e) { if (e.getServerErrCode() == ServerCodes.NOT_FOUND_RESOURCE) { registerState.incrementFailCount(); - LOG.debug("[AsyncHeartbeat]Instance heartbeat failed because of NOT_FOUND_RESOURCE. Increase fail count. namespace:{}, service:{}, host:{}, port:{}, heartbeat fail count:{}", + LOG.error("[AsyncHeartbeat]Instance heartbeat failed because of NOT_FOUND_RESOURCE. Increase fail count. " + + "namespace:{}, service:{}, host:{}, port:{}, heartbeat fail count:{}", registerRequest.getNamespace(), registerRequest.getService(), registerRequest.getHost(), - registerRequest.getPort(), registerState.getHeartbeatFailCounter()); + registerRequest.getPort(), registerState.getHeartbeatFailCounter(), e); } else { registerState.resetFailCount(); - LOG.debug("[AsyncHeartbeat]Instance heartbeat failed not because of NOT_FOUND_RESOURCE. Reset fail count. namespace:{}, service:{}, host:{}, port:{}, heartbeat fail count:{}", + LOG.error("[AsyncHeartbeat]Instance heartbeat failed not because of NOT_FOUND_RESOURCE. Reset fail count. " + + "namespace:{}, service:{}, host:{}, port:{}, heartbeat fail count:{}", registerRequest.getNamespace(), registerRequest.getService(), registerRequest.getHost(), - registerRequest.getPort(), registerState.getHeartbeatFailCounter()); + registerRequest.getPort(), registerState.getHeartbeatFailCounter(), e); } - LOG.error( - "[AsyncHeartbeat]Instance heartbeat failed, namespace:{}, service:{}, host:{}, port:{}, heartbeat fail count:{}", - registerRequest.getNamespace(), registerRequest.getService(), registerRequest.getHost(), - registerRequest.getPort(), registerState.getHeartbeatFailCounter(), e); } long minRegisterInterval = sdkContext.getConfig().getProvider().getMinRegisterInterval();