Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ public static Instance commonGetOneInstance(Extensions extensions, ServiceKey se
ServiceConfig serviceConfig = extensions.getConfiguration().getProvider().getService();
RouteInfo routeInfo = new RouteInfo(
null, null, dstSvcInfo, null, "", serviceConfig);
routeInfo.putRouterMetadata("metadataRoute", metadata);
ResourcesResponse resourcesResponse = BaseFlow
.syncGetResources(extensions, false, provider, flowControlParam);
LOG.debug("[ConnectionManager]success to discover service {}", svcEventKey);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,7 @@
import com.tencent.polaris.api.plugin.common.PluginTypes;
import com.tencent.polaris.api.plugin.compose.Extensions;
import com.tencent.polaris.api.plugin.loadbalance.LoadBalancer;
import com.tencent.polaris.api.pojo.DefaultInstance;
import com.tencent.polaris.api.pojo.DefaultServiceInstances;
import com.tencent.polaris.api.pojo.Instance;
import com.tencent.polaris.api.pojo.ServiceInstances;
import com.tencent.polaris.api.pojo.ServiceKey;
import com.tencent.polaris.api.pojo.*;
import com.tencent.polaris.api.rpc.Criteria;
import com.tencent.polaris.api.utils.CollectionUtils;
import com.tencent.polaris.api.utils.IPAddressUtils;
Expand Down Expand Up @@ -190,11 +186,8 @@ public int nodeListSize() {
}

private Instance getDiscoverInstance() throws PolarisException {
Instance instance = BaseFlow.commonGetOneInstance(extensions, remoteCluster, routers, lbPolicy, protocol,
return BaseFlow.commonGetOneInstance(extensions, remoteCluster, routers, lbPolicy, protocol,
clientId);
LOG.info("success to get instance for service {}, instance is {}:{}", remoteCluster, instance.getHost(),
instance.getPort());
return instance;
}

@JustForTest
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,15 @@ public Map<String, String> getRouterMetadata(String routerType) {
return Collections.unmodifiableMap(metadata);
}

public void putRouterMetadata(String routerType, Map<String, String> metadata) {
Map<String, String> tempMetadata = routerMetadata.get(routerType);
if (tempMetadata == null || tempMetadata.isEmpty()) {
tempMetadata = new HashMap<>();
routerMetadata.put(routerType, tempMetadata);
}
tempMetadata.putAll(metadata);
}

public void setRouterArguments(Map<String, Set<RouteArgument>> routerArguments) {
Map<String, Map<String, String>> routerMetadata = this.routerMetadata;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,13 @@ public StreamCounterSet getStreamCounterSet(Extensions extensions, ServiceKey re
}
if (null != streamCounterSet) {
//切换了节点,去掉初始化记录
streamCounterSet.deleteInitRecord(serviceIdentifier);
InitializeRecord removedRecord = streamCounterSet.deleteInitRecord(serviceIdentifier);
if (removedRecord != null) {
RateLimitWindow rateLimitWindow = removedRecord.getRateLimitWindow();
uniqueKey = rateLimitWindow != null ? rateLimitWindow.getUniqueKey() : null;
LOG.info("[getStreamCounterSet] host switched, and initRecord removed serviceIdentifier: {}, window "
+ "{} {}", serviceIdentifier, rateLimitWindow, uniqueKey);
}
//切换了节点,老的不再使用
if (streamCounterSet.decreaseReference()) {
nodeToStream.remove(node);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,15 @@ public void init(Extensions extensions) throws PolarisException {
FlowCache flowCache = extensions.getFlowCache();
return flowCache.loadPluginCacheObject(API_ID, key, path -> TrieUtil.buildSimpleApiTrieNode((String) path));
};
rateLimitExtension.submitExpireJob(() -> {
try {
for (Map.Entry<ServiceKey, RateLimitWindowSet> entry : svcToWindowSet.entrySet()) {
entry.getValue().cleanupContainers();
}
} catch (Throwable e) {
LOG.error("Failed to cleanup expired rate limit window", e);
}
});

// init tsf rate limit master utils if need
Map<String, String> metadata = rateLimitConfig.getMetadata();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,20 +25,22 @@
import com.tencent.polaris.api.utils.StringUtils;
import com.tencent.polaris.api.utils.ThreadPoolUtils;
import com.tencent.polaris.client.util.NamedThreadFactory;
import com.tencent.polaris.logging.LoggerFactory;
import com.tencent.polaris.ratelimit.client.sync.RemoteSyncTask;
import com.tencent.polaris.ratelimit.client.utils.RateLimitConstants;
import com.tencent.polaris.specification.api.v1.traffic.manage.RateLimitProto;
import org.slf4j.Logger;

import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.*;

import static com.tencent.polaris.api.plugin.ratelimiter.ServiceRateLimiter.*;

public class RateLimitExtension extends Destroyable {

private static final Logger LOG = LoggerFactory.getLogger(RateLimitExtension.class);

private final Extensions extensions;

private final Map<String, ServiceRateLimiter> rateLimiters = new HashMap<>();
Expand Down Expand Up @@ -113,9 +115,17 @@ private String getRateLimiterName(RateLimitProto.Rule.Resource resource, String
* @param task 任务
*/
public void submitSyncTask(RemoteSyncTask task, long initialDelay, long delay) {
if (scheduledTasks.containsKey(task.getWindow().getUniqueKey())) {
LOG.warn("task has exist, ignore, task {}, window {}, uniqueKey {} ", task, task.getWindow(),
task.getWindow().getUniqueKey());
task.getWindow().setStatus(RateLimitWindow.WindowStatus.CREATED.ordinal());
return;
}
ScheduledFuture<?> scheduledFuture = syncExecutor
.scheduleWithFixedDelay(task, 0, delay, TimeUnit.MILLISECONDS);
.scheduleWithFixedDelay(task, initialDelay, delay, TimeUnit.MILLISECONDS);
scheduledTasks.put(task.getWindow().getUniqueKey(), scheduledFuture);
LOG.info("submit sync task success, task {}, future {}, window {}, uniqueKey {} ", task, scheduledFuture,
task.getWindow(), task.getWindow().getUniqueKey());
}

private static final int EXPIRE_INTERVAL_SECOND = 5;
Expand All @@ -130,8 +140,33 @@ public void submitExpireJob(Runnable task) {
.scheduleWithFixedDelay(task, EXPIRE_INTERVAL_SECOND, EXPIRE_INTERVAL_SECOND, TimeUnit.SECONDS);
}

public void stopSyncTask(String uniqueKey) {
/**
* 停止同步任务
*
* @param uniqueKey 窗口唯一标识
* @param window 限流窗口
*/
public void stopSyncTask(String uniqueKey, RateLimitWindow window) {
// 从connector初始化列表清理
Runnable cleanTask = () -> {
try {
AsyncRateLimitConnector connector = window.getWindowSet().getAsyncRateLimitConnector();
ServiceIdentifier identifier = new ServiceIdentifier(window.getSvcKey().getService(),
window.getSvcKey().getNamespace(), window.getLabels());
StreamCounterSet streamCounterSet = connector.getStreamCounterSet(
window.getWindowSet().getRateLimitExtension().getExtensions(),
window.getRemoteCluster(), window.getServiceAddressRepository(), window.getUniqueKey(), identifier);
if (streamCounterSet != null) {
streamCounterSet.deleteInitRecord(identifier, window);
}
LOG.info("clean task run success, window {}", window);
} catch (Throwable e) {
LOG.error("clean task run failed, window {}", window.getUniqueKey(), e);
}
};
syncExecutor.schedule(cleanTask, 10, TimeUnit.MILLISECONDS);
ScheduledFuture<?> future = scheduledTasks.remove(uniqueKey);
LOG.info("scheduledTasks remove uniqueKey {}, future {}", uniqueKey, future);
if (null != future) {
future.cancel(true);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,14 @@
package com.tencent.polaris.ratelimit.client.flow;

import com.tencent.polaris.api.config.consumer.LoadBalanceConfig;
import com.tencent.polaris.api.config.consumer.ServiceRouterConfig;
import com.tencent.polaris.api.config.provider.RateLimitConfig;
import com.tencent.polaris.api.plugin.compose.Extensions;
import com.tencent.polaris.api.plugin.ratelimiter.InitCriteria;
import com.tencent.polaris.api.plugin.ratelimiter.QuotaBucket;
import com.tencent.polaris.api.plugin.ratelimiter.QuotaResult;
import com.tencent.polaris.api.plugin.ratelimiter.ServiceRateLimiter;
import com.tencent.polaris.api.pojo.*;
import com.tencent.polaris.api.pojo.ServiceKey;
import com.tencent.polaris.api.utils.StringUtils;
import com.tencent.polaris.client.flow.FlowControlParam;
import com.tencent.polaris.client.remote.ServiceAddressRepository;
Expand All @@ -39,11 +40,12 @@
import com.tencent.polaris.specification.api.v1.traffic.manage.RateLimitProto.Amount;
import com.tencent.polaris.specification.api.v1.traffic.manage.RateLimitProto.RateLimitCluster;
import com.tencent.polaris.specification.api.v1.traffic.manage.RateLimitProto.Rule;
import java.util.Random;
import org.slf4j.Logger;

import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.Random;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
Expand Down Expand Up @@ -97,6 +99,8 @@ public enum WindowStatus {

private final AtomicLong lastInitTimeMs = new AtomicLong();

private final AtomicLong lastSyncTimeMs = new AtomicLong();

// 执行正式分配的令牌桶
private final QuotaBucket allocatingBucket;

Expand Down Expand Up @@ -144,16 +148,18 @@ public RateLimitWindow(RateLimitWindowSet windowSet, CommonQuotaRequest quotaReq
this.syncParam = quotaRequest.getFlowControlParam();
remoteCluster = getLimiterClusterService(rule.getCluster(), rateLimitConfig);
serviceAddressRepository = buildServiceAddressRepository(rateLimitConfig.getLimiterAddresses(),
uniqueKey, windowSet.getExtensions(), remoteCluster, null, LoadBalanceConfig.LOAD_BALANCE_RING_HASH, "grpc");
uniqueKey, windowSet.getExtensions(), remoteCluster);
allocatingBucket = getQuotaBucket(initCriteria, windowSet.getRateLimitExtension());
lastAccessTimeMs.set(System.currentTimeMillis());
this.rateLimitConfig = rateLimitConfig;
buildRemoteConfigMode();
}

private ServiceAddressRepository buildServiceAddressRepository(List<String> addresses, String hash, Extensions extensions,
ServiceKey remoteCluster, List<String> routers, String lbPolicy, String protocol) {
return new ServiceAddressRepository(addresses, hash, extensions, remoteCluster, routers, lbPolicy, protocol);
ServiceKey remoteCluster) {
List<String> routers = new ArrayList<>();
routers.add(ServiceRouterConfig.DEFAULT_ROUTER_METADATA);
return new ServiceAddressRepository(addresses, hash, extensions, remoteCluster, routers, LoadBalanceConfig.LOAD_BALANCE_RING_HASH, "grpc");
}


Expand Down Expand Up @@ -249,10 +255,12 @@ public void init() {
}
if (configMode == RateLimitConstants.CONFIG_QUOTA_LOCAL_MODE && !isTsfCluster) {
//本地限流,则直接可用
LOG.info("[RateLimitWindow] local window {} initiated", this);
status.set(WindowStatus.INITIALIZED.ordinal());
return;
}
//加入轮询队列,走异步调度
LOG.info("[RateLimitWindow] remote window {} first init", this);
if (rule.getMetadataMap().containsKey("limiter")
&& StringUtils.equalsIgnoreCase("tsf", rule.getMetadataMap().get("limiter"))) {
windowSet.getRateLimitExtension().submitSyncTask(new TsfRemoteSyncTask(this), 0L, 1000L);
Expand All @@ -270,8 +278,13 @@ public void unInit() {
return;
}
status.set(WindowStatus.DELETED.ordinal());
LOG.info("[RateLimitWindow] window {} {} is set to DELETED", uniqueKey, this);
//从轮询队列中剔除
windowSet.getRateLimitExtension().stopSyncTask(uniqueKey);
if (configMode == RateLimitConstants.CONFIG_QUOTA_LOCAL_MODE) {
return;
}
LOG.info("[RateLimitWindow] stopSyncTask( uniqueKey {}, window {} ) ", uniqueKey, this);
windowSet.getRateLimitExtension().stopSyncTask(uniqueKey, this);
}
}

Expand Down Expand Up @@ -301,16 +314,21 @@ public void returnQuota(CommonQuotaRequest request) {

/**
* 窗口已经过期
* TSF 设置为不过期
*
* @return boolean
*/
public boolean isExpired() {
long curTimeMs = System.currentTimeMillis();
boolean expired = curTimeMs - lastAccessTimeMs.get() > expireDurationMs;
if (expired) {
LOG.info("[RateLimit]window has expired, expireDurationMs {}, uniqueKey {}", expireDurationMs, uniqueKey);
if (!isTsfCluster) {
long curTimeMs = System.currentTimeMillis();
boolean expired = curTimeMs - lastAccessTimeMs.get() > expireDurationMs;
if (expired) {
LOG.info("[RateLimit] window has expired, expireDurationMs {}, uniqueKey {}, window {}", expireDurationMs,
uniqueKey, this);
}
return expired;
}
return expired;
return false;
}

public long getLastInitTimeMs() {
Expand All @@ -321,6 +339,14 @@ public void setLastInitTimeMs(long lastInitTimeMs) {
this.lastInitTimeMs.set(lastInitTimeMs);
}

public long getLastSyncTimeMs() {
return lastSyncTimeMs.get();
}

public void setLastSyncTimeMs(long lastSyncTimeMs) {
this.lastSyncTimeMs.set(lastSyncTimeMs);
}

/**
* 获取当前窗口的状态
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;

public class RateLimitWindowSet {
Expand Down Expand Up @@ -133,6 +134,26 @@ public void deleteRules(Set<String> rules) {
}
}

/**
* 过期清理单个rule下所有WindowContainer
*/
public void cleanupContainers() {
AtomicInteger rulesExpired = new AtomicInteger(0);
windowByRule.entrySet().removeIf(entry -> {
boolean expired = entry.getValue().checkAndCleanExpiredWindows();
if (expired) {
rulesExpired.incrementAndGet();
LOG.info("[RateLimitWindowSet] rule {} for service {} has been expired, window container {}",
entry.getKey(), serviceKey, entry.getValue());
}
return expired;
});
if (rulesExpired.get() > 0) {
LOG.info("[RateLimitWindowSet] {} rules have been cleaned up due to expiration, service {}",
rulesExpired, serviceKey);
}
}

public AsyncRateLimitConnector getAsyncRateLimitConnector() {
return asyncRateLimitConnector;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,11 @@

import com.tencent.polaris.client.pojo.Node;
import com.tencent.polaris.logging.LoggerFactory;
import org.slf4j.Logger;

import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;

import org.slf4j.Logger;

/**
* 计数器对象
*/
Expand Down Expand Up @@ -97,11 +96,20 @@ public boolean decreaseReference() {
return false;
}

public void deleteInitRecord(ServiceIdentifier serviceIdentifier) {
public InitializeRecord deleteInitRecord(ServiceIdentifier serviceIdentifier) {
StreamResource streamResource = currentStreamResource.get();
if (null != streamResource) {
return streamResource.deleteInitRecord(serviceIdentifier);
}
return null;
}

public InitializeRecord deleteInitRecord(ServiceIdentifier serviceIdentifier, RateLimitWindow window) {
StreamResource streamResource = currentStreamResource.get();
if (null != streamResource) {
streamResource.deleteInitRecord(serviceIdentifier);
return streamResource.deleteInitRecord(serviceIdentifier, window);
}
return null;
}


Expand Down
Loading