Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,21 +18,23 @@
package com.tencent.polaris.discovery.client.flow;

import com.tencent.polaris.api.exception.PolarisException;
import com.tencent.polaris.api.exception.ServerCodes;
import com.tencent.polaris.api.rpc.InstanceHeartbeatRequest;
import com.tencent.polaris.api.rpc.InstanceRegisterRequest;
import com.tencent.polaris.api.rpc.InstanceRegisterResponse;
import com.tencent.polaris.client.api.SDKContext;
import com.tencent.polaris.client.util.NamedThreadFactory;
import com.tencent.polaris.discovery.client.flow.RegisterStateManager.RegisterState;
import com.tencent.polaris.logging.LoggerFactory;
import com.tencent.polaris.logging.LoggingConsts;
import org.slf4j.Logger;

import java.util.HashMap;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

import com.tencent.polaris.logging.LoggingConsts;
import org.slf4j.Logger;

/**
* 异步注册流
*
Expand Down Expand Up @@ -75,23 +77,32 @@ public InstanceRegisterResponse registerInstance(InstanceRegisterRequest request
return instanceRegisterResponse;
}

private void doRunHeartbeat(RegisterState registerState, RegisterFunction registerFunction,
void doRunHeartbeat(RegisterState registerState, RegisterFunction registerFunction,
HeartbeatFunction heartbeatFunction) {
InstanceRegisterRequest registerRequest = registerState.getInstanceRegisterRequest();
LOG.info("[AsyncHeartbeat]Instance heartbeat task started, namespace:{}, service:{}, host:{}, port:{}",
registerRequest.getNamespace(), registerRequest.getService(), registerRequest.getHost(),
registerRequest.getPort());
try {
heartbeatFunction.doHeartbeat(buildHeartbeatRequest(registerRequest));
LOG.info("[AsyncHeartbeat]Instance heartbeat success, namespace:{}, service:{}, host:{}, port:{}",
registerState.resetFailCount();
LOG.info("[AsyncHeartbeat]Instance heartbeat success. Reset fail count. namespace:{}, service:{}, host:{}, port:{}",
registerRequest.getNamespace(), registerRequest.getService(), registerRequest.getHost(),
registerRequest.getPort());
} catch (PolarisException e) {
registerState.incrementFailCount();
LOG.error(
"[AsyncHeartbeat]Instance heartbeat failed, namespace:{}, service:{}, host:{}, port:{}, heartbeat fail count:{}",
registerRequest.getNamespace(), registerRequest.getService(), registerRequest.getHost(),
registerRequest.getPort(), registerState.getHeartbeatFailCounter(), e);
if (e.getServerErrCode() == ServerCodes.NOT_FOUND_RESOURCE) {
registerState.incrementFailCount();
LOG.error("[AsyncHeartbeat]Instance heartbeat failed because of NOT_FOUND_RESOURCE. Increase fail count. " +
"namespace:{}, service:{}, host:{}, port:{}, heartbeat fail count:{}",
registerRequest.getNamespace(), registerRequest.getService(), registerRequest.getHost(),
registerRequest.getPort(), registerState.getHeartbeatFailCounter(), e);
} else {
registerState.resetFailCount();
LOG.error("[AsyncHeartbeat]Instance heartbeat failed not because of NOT_FOUND_RESOURCE. Reset fail count. " +
"namespace:{}, service:{}, host:{}, port:{}, heartbeat fail count:{}",
registerRequest.getNamespace(), registerRequest.getService(), registerRequest.getHost(),
registerRequest.getPort(), registerState.getHeartbeatFailCounter(), e);
}
}

long minRegisterInterval = sdkContext.getConfig().getProvider().getMinRegisterInterval();
Expand All @@ -100,17 +111,35 @@ private void doRunHeartbeat(RegisterState registerState, RegisterFunction regist
|| registerState.getHeartbeatFailCounter() < HEARTBEAT_FAIL_COUNT_THRESHOLD) {
return;
}
try {
registerFunction.doRegister(registerRequest, createRegisterV2Header());
LOG.info("[AsyncHeartbeat]Re-register instance success, namespace:{}, service:{}, host:{}, port:{}",
registerRequest.getNamespace(), registerRequest.getService(), registerRequest.getHost(),
registerRequest.getPort());
registerState.resetFailCount();
} catch (PolarisException e) {
LOG.error(
"[AsyncHeartbeat]Re-register instance failed, namespace:{}, service:{}, host:{}, port:{}, err:{}",
registerRequest.getNamespace(), registerRequest.getService(), registerRequest.getHost(),
registerRequest.getPort(), e);

synchronized (registerState) {
if (registerState.getReRegisterFuture() == null
|| registerState.getReRegisterFuture().isDone()
|| registerState.getReRegisterFuture().isCancelled()) {
int reRegisterCounter = registerState.getReRegisterCounter();
double base = reRegisterCounter == 0 ? 0 : registerRequest.getTtl() * Math.pow(2, reRegisterCounter - 1);
int offset = reRegisterCounter == 0 ? 0 : new Random().nextInt(registerRequest.getTtl());
long delay = (long) Math.min(base + offset, 60);
LOG.info("[AsyncHeartbeat]Re-register instance, namespace:{}, service:{}, host:{}, port:{}, count:{}, delay:{}s",
registerRequest.getNamespace(), registerRequest.getService(), registerRequest.getHost(),
registerRequest.getPort(), reRegisterCounter, delay);
registerState.setReRegisterFuture(registerState.getReRegisterExecutor().schedule(() -> {
try {
registerFunction.doRegister(registerRequest, createRegisterV2Header());
LOG.info("[AsyncHeartbeat]Re-register instance success, namespace:{}, service:{}, host:{}, port:{}",
registerRequest.getNamespace(), registerRequest.getService(), registerRequest.getHost(),
registerRequest.getPort());
registerState.resetFailCount();
registerState.resetReRegisterCounter();
} catch (PolarisException e) {
LOG.error(
"[AsyncHeartbeat]Re-register instance failed, namespace:{}, service:{}, host:{}, port:{}, re-register count:{}",
registerRequest.getNamespace(), registerRequest.getService(), registerRequest.getHost(),
registerRequest.getPort(), reRegisterCounter, e);
}
}, delay, TimeUnit.SECONDS));
registerState.incrementReRegisterCounter();
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,16 @@
import com.tencent.polaris.api.rpc.InstanceDeregisterRequest;
import com.tencent.polaris.api.rpc.InstanceRegisterRequest;
import com.tencent.polaris.client.api.SDKContext;
import com.tencent.polaris.logging.LoggerFactory;
import org.slf4j.Logger;

import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.atomic.AtomicInteger;

/**
* 注册状态管理器
Expand All @@ -33,6 +39,8 @@
*/
public class RegisterStateManager {

private static final Logger LOG = LoggerFactory.getLogger(RegisterStateManager.class);

private final static Map<String, Map<String, RegisterState>> REGISTER_STATES = new ConcurrentHashMap<>();

/**
Expand Down Expand Up @@ -69,15 +77,15 @@ public static void removeRegisterState(SDKContext sdkContext, InstanceDeregister
.ifPresent(sdkRegisterStates -> {
String registerStateKey = buildRegisterStateKey(instanceDeregisterRequest);
Optional.ofNullable(sdkRegisterStates.remove(registerStateKey))
.ifPresent(registerState -> registerState.getTaskFuture().cancel(false));
.ifPresent(RegisterState::destroy);
});
}

public static void destroy(SDKContext sdkContext) {
Optional.ofNullable(REGISTER_STATES.remove(sdkContext.getValueContext().getClientId()))
.ifPresent(sdkRegisterStates -> {
for (RegisterState registerState : sdkRegisterStates.values()) {
registerState.getTaskFuture().cancel(false);
registerState.destroy();
}
sdkRegisterStates.clear();
});
Expand All @@ -88,26 +96,33 @@ private static String buildRegisterStateKey(CommonProviderBaseEntity baseEntity)
baseEntity.getPort());
}

public static final class RegisterState {
public static class RegisterState {

private InstanceRegisterRequest instanceRegisterRequest;
private long firstRegisterTime;
private ScheduledFuture<?> taskFuture;
private int heartbeatFailCounter = 0;
private final AtomicInteger heartbeatFailCounter = new AtomicInteger(0);
private ScheduledFuture<?> reRegisterFuture;
private final ScheduledExecutorService reRegisterExecutor;
private final AtomicInteger reRegisterCounter = new AtomicInteger(0);

public RegisterState() {
this.reRegisterExecutor = Executors.newSingleThreadScheduledExecutor();
}

/**
* Increment fail count by one
*/
public void incrementFailCount() {
heartbeatFailCounter += 1;
heartbeatFailCounter.incrementAndGet();
}

public int getHeartbeatFailCounter() {
return heartbeatFailCounter;
return heartbeatFailCounter.get();
}

public void resetFailCount() {
heartbeatFailCounter = 0;
heartbeatFailCounter.set(0);
}

public InstanceRegisterRequest getInstanceRegisterRequest() {
Expand All @@ -133,5 +148,41 @@ public ScheduledFuture<?> getTaskFuture() {
public void setTaskFuture(ScheduledFuture<?> taskFuture) {
this.taskFuture = taskFuture;
}

public ScheduledFuture<?> getReRegisterFuture() {
return reRegisterFuture;
}

public void setReRegisterFuture(ScheduledFuture<?> reRegisterFuture) {
this.reRegisterFuture = reRegisterFuture;
}

public ScheduledExecutorService getReRegisterExecutor() {
return reRegisterExecutor;
}

public int getReRegisterCounter() {
return reRegisterCounter.get();
}

public void incrementReRegisterCounter() {
reRegisterCounter.incrementAndGet();
}

public void resetReRegisterCounter() {
reRegisterCounter.set(0);
}

public void destroy() {
try {
getTaskFuture().cancel(false);
getReRegisterFuture().cancel(false);
getReRegisterExecutor().shutdownNow();
} catch (Throwable throwable) {
LOG.warn("[RegisterState] destroy error. namespace:{}, service:{}, host:{}, port:{}.",
getInstanceRegisterRequest().getNamespace(), getInstanceRegisterRequest().getService(),
getInstanceRegisterRequest().getHost(), getInstanceRegisterRequest().getPort(), throwable);
}
}
}
}
Loading
Loading