Skip to content

Commit 136271d

Browse files
committed
feat:support rate-limit window expiration.
Signed-off-by: Haotian Zhang <[email protected]>
1 parent e368bd0 commit 136271d

File tree

11 files changed

+392
-31
lines changed

11 files changed

+392
-31
lines changed

polaris-ratelimit/polaris-ratelimit-client/src/main/java/com/tencent/polaris/ratelimit/client/flow/AsyncRateLimitConnector.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,13 @@ public StreamCounterSet getStreamCounterSet(Extensions extensions, ServiceKey re
8484
}
8585
if (null != streamCounterSet) {
8686
//切换了节点,去掉初始化记录
87-
streamCounterSet.deleteInitRecord(serviceIdentifier);
87+
InitializeRecord removedRecord = streamCounterSet.deleteInitRecord(serviceIdentifier);
88+
if (removedRecord != null) {
89+
RateLimitWindow rateLimitWindow = removedRecord.getRateLimitWindow();
90+
uniqueKey = rateLimitWindow != null ? rateLimitWindow.getUniqueKey() : null;
91+
LOG.info("[getStreamCounterSet] host switched, and initRecord removed serviceIdentifier: {}, window "
92+
+ "{} {}", serviceIdentifier, rateLimitWindow, uniqueKey);
93+
}
8894
//切换了节点,老的不再使用
8995
if (streamCounterSet.decreaseReference()) {
9096
nodeToStream.remove(node);

polaris-ratelimit/polaris-ratelimit-client/src/main/java/com/tencent/polaris/ratelimit/client/flow/QuotaFlow.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,15 @@ public void init(Extensions extensions) throws PolarisException {
9292
FlowCache flowCache = extensions.getFlowCache();
9393
return flowCache.loadPluginCacheObject(API_ID, key, path -> TrieUtil.buildSimpleApiTrieNode((String) path));
9494
};
95+
rateLimitExtension.submitExpireJob(() -> {
96+
try {
97+
for (Map.Entry<ServiceKey, RateLimitWindowSet> entry : svcToWindowSet.entrySet()) {
98+
entry.getValue().cleanupContainers();
99+
}
100+
} catch (Throwable e) {
101+
LOG.error("Failed to cleanup expired rate limit window", e);
102+
}
103+
});
95104

96105
// init tsf rate limit master utils if need
97106
Map<String, String> metadata = rateLimitConfig.getMetadata();

polaris-ratelimit/polaris-ratelimit-client/src/main/java/com/tencent/polaris/ratelimit/client/flow/RateLimitExtension.java

Lines changed: 39 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -25,20 +25,22 @@
2525
import com.tencent.polaris.api.utils.StringUtils;
2626
import com.tencent.polaris.api.utils.ThreadPoolUtils;
2727
import com.tencent.polaris.client.util.NamedThreadFactory;
28+
import com.tencent.polaris.logging.LoggerFactory;
2829
import com.tencent.polaris.ratelimit.client.sync.RemoteSyncTask;
29-
import com.tencent.polaris.ratelimit.client.utils.RateLimitConstants;
3030
import com.tencent.polaris.specification.api.v1.traffic.manage.RateLimitProto;
31+
import org.slf4j.Logger;
3132

3233
import java.util.Collection;
3334
import java.util.HashMap;
3435
import java.util.Map;
35-
import java.util.Random;
3636
import java.util.concurrent.*;
3737

3838
import static com.tencent.polaris.api.plugin.ratelimiter.ServiceRateLimiter.*;
3939

4040
public class RateLimitExtension extends Destroyable {
4141

42+
private static final Logger LOG = LoggerFactory.getLogger(RateLimitExtension.class);
43+
4244
private final Extensions extensions;
4345

4446
private final Map<String, ServiceRateLimiter> rateLimiters = new HashMap<>();
@@ -113,9 +115,17 @@ private String getRateLimiterName(RateLimitProto.Rule.Resource resource, String
113115
* @param task 任务
114116
*/
115117
public void submitSyncTask(RemoteSyncTask task, long initialDelay, long delay) {
118+
if (scheduledTasks.containsKey(task.getWindow().getUniqueKey())) {
119+
LOG.warn("task has exist, ignore, task {}, window {}, uniqueKey {} ", task, task.getWindow(),
120+
task.getWindow().getUniqueKey());
121+
task.getWindow().setStatus(RateLimitWindow.WindowStatus.CREATED.ordinal());
122+
return;
123+
}
116124
ScheduledFuture<?> scheduledFuture = syncExecutor
117-
.scheduleWithFixedDelay(task, 0, delay, TimeUnit.MILLISECONDS);
125+
.scheduleWithFixedDelay(task, initialDelay, delay, TimeUnit.MILLISECONDS);
118126
scheduledTasks.put(task.getWindow().getUniqueKey(), scheduledFuture);
127+
LOG.info("submit sync task success, task {}, future {}, window {}, uniqueKey {} ", task, scheduledFuture,
128+
task.getWindow(), task.getWindow().getUniqueKey());
119129
}
120130

121131
private static final int EXPIRE_INTERVAL_SECOND = 5;
@@ -130,8 +140,33 @@ public void submitExpireJob(Runnable task) {
130140
.scheduleWithFixedDelay(task, EXPIRE_INTERVAL_SECOND, EXPIRE_INTERVAL_SECOND, TimeUnit.SECONDS);
131141
}
132142

133-
public void stopSyncTask(String uniqueKey) {
143+
/**
144+
* 停止同步任务
145+
*
146+
* @param uniqueKey 窗口唯一标识
147+
* @param window 限流窗口
148+
*/
149+
public void stopSyncTask(String uniqueKey, RateLimitWindow window) {
150+
// 从connector初始化列表清理
151+
Runnable cleanTask = () -> {
152+
try {
153+
AsyncRateLimitConnector connector = window.getWindowSet().getAsyncRateLimitConnector();
154+
ServiceIdentifier identifier = new ServiceIdentifier(window.getSvcKey().getService(),
155+
window.getSvcKey().getNamespace(), window.getLabels());
156+
StreamCounterSet streamCounterSet = connector.getStreamCounterSet(
157+
window.getWindowSet().getRateLimitExtension().getExtensions(),
158+
window.getRemoteCluster(), window.getServiceAddressRepository(), window.getUniqueKey(), identifier);
159+
if (streamCounterSet != null) {
160+
streamCounterSet.deleteInitRecord(identifier, window);
161+
}
162+
LOG.info("clean task run success, window {}", window);
163+
} catch (Throwable e) {
164+
LOG.error("clean task run failed, window {}", window.getUniqueKey(), e);
165+
}
166+
};
167+
syncExecutor.schedule(cleanTask, 10, TimeUnit.MILLISECONDS);
134168
ScheduledFuture<?> future = scheduledTasks.remove(uniqueKey);
169+
LOG.info("scheduledTasks remove uniqueKey {}, future {}", uniqueKey, future);
135170
if (null != future) {
136171
future.cancel(true);
137172
}

polaris-ratelimit/polaris-ratelimit-client/src/main/java/com/tencent/polaris/ratelimit/client/flow/RateLimitWindow.java

Lines changed: 30 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@
2424
import com.tencent.polaris.api.plugin.ratelimiter.QuotaBucket;
2525
import com.tencent.polaris.api.plugin.ratelimiter.QuotaResult;
2626
import com.tencent.polaris.api.plugin.ratelimiter.ServiceRateLimiter;
27-
import com.tencent.polaris.api.pojo.*;
27+
import com.tencent.polaris.api.pojo.ServiceKey;
2828
import com.tencent.polaris.api.utils.StringUtils;
2929
import com.tencent.polaris.client.flow.FlowControlParam;
3030
import com.tencent.polaris.client.remote.ServiceAddressRepository;
@@ -39,11 +39,11 @@
3939
import com.tencent.polaris.specification.api.v1.traffic.manage.RateLimitProto.Amount;
4040
import com.tencent.polaris.specification.api.v1.traffic.manage.RateLimitProto.RateLimitCluster;
4141
import com.tencent.polaris.specification.api.v1.traffic.manage.RateLimitProto.Rule;
42-
import java.util.Random;
4342
import org.slf4j.Logger;
4443

4544
import java.util.List;
4645
import java.util.Objects;
46+
import java.util.Random;
4747
import java.util.concurrent.atomic.AtomicInteger;
4848
import java.util.concurrent.atomic.AtomicLong;
4949
import java.util.concurrent.atomic.AtomicReference;
@@ -97,6 +97,8 @@ public enum WindowStatus {
9797

9898
private final AtomicLong lastInitTimeMs = new AtomicLong();
9999

100+
private final AtomicLong lastSyncTimeMs = new AtomicLong();
101+
100102
// 执行正式分配的令牌桶
101103
private final QuotaBucket allocatingBucket;
102104

@@ -249,10 +251,12 @@ public void init() {
249251
}
250252
if (configMode == RateLimitConstants.CONFIG_QUOTA_LOCAL_MODE && !isTsfCluster) {
251253
//本地限流,则直接可用
254+
LOG.info("[RateLimitWindow] local window {} initiated", this);
252255
status.set(WindowStatus.INITIALIZED.ordinal());
253256
return;
254257
}
255258
//加入轮询队列,走异步调度
259+
LOG.info("[RateLimitWindow] remote window {} first init", this);
256260
if (rule.getMetadataMap().containsKey("limiter")
257261
&& StringUtils.equalsIgnoreCase("tsf", rule.getMetadataMap().get("limiter"))) {
258262
windowSet.getRateLimitExtension().submitSyncTask(new TsfRemoteSyncTask(this), 0L, 1000L);
@@ -270,8 +274,13 @@ public void unInit() {
270274
return;
271275
}
272276
status.set(WindowStatus.DELETED.ordinal());
277+
LOG.info("[RateLimitWindow] window {} {} is set to DELETED", uniqueKey, this);
273278
//从轮询队列中剔除
274-
windowSet.getRateLimitExtension().stopSyncTask(uniqueKey);
279+
if (configMode == RateLimitConstants.CONFIG_QUOTA_LOCAL_MODE) {
280+
return;
281+
}
282+
LOG.info("[RateLimitWindow] stopSyncTask( uniqueKey {}, window {} ) ", uniqueKey, this);
283+
windowSet.getRateLimitExtension().stopSyncTask(uniqueKey, this);
275284
}
276285
}
277286

@@ -301,16 +310,21 @@ public void returnQuota(CommonQuotaRequest request) {
301310

302311
/**
303312
* 窗口已经过期
313+
* TSF 设置为不过期
304314
*
305315
* @return boolean
306316
*/
307317
public boolean isExpired() {
308-
long curTimeMs = System.currentTimeMillis();
309-
boolean expired = curTimeMs - lastAccessTimeMs.get() > expireDurationMs;
310-
if (expired) {
311-
LOG.info("[RateLimit]window has expired, expireDurationMs {}, uniqueKey {}", expireDurationMs, uniqueKey);
318+
if (!isTsfCluster) {
319+
long curTimeMs = System.currentTimeMillis();
320+
boolean expired = curTimeMs - lastAccessTimeMs.get() > expireDurationMs;
321+
if (expired) {
322+
LOG.info("[RateLimit] window has expired, expireDurationMs {}, uniqueKey {}, window {}", expireDurationMs,
323+
uniqueKey, this);
324+
}
325+
return expired;
312326
}
313-
return expired;
327+
return false;
314328
}
315329

316330
public long getLastInitTimeMs() {
@@ -321,6 +335,14 @@ public void setLastInitTimeMs(long lastInitTimeMs) {
321335
this.lastInitTimeMs.set(lastInitTimeMs);
322336
}
323337

338+
public long getLastSyncTimeMs() {
339+
return lastSyncTimeMs.get();
340+
}
341+
342+
public void setLastSyncTimeMs(long lastSyncTimeMs) {
343+
this.lastSyncTimeMs.set(lastSyncTimeMs);
344+
}
345+
324346
/**
325347
* 获取当前窗口的状态
326348
*

polaris-ratelimit/polaris-ratelimit-client/src/main/java/com/tencent/polaris/ratelimit/client/flow/RateLimitWindowSet.java

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import java.util.Map;
3030
import java.util.Set;
3131
import java.util.concurrent.ConcurrentHashMap;
32+
import java.util.concurrent.atomic.AtomicInteger;
3233
import java.util.function.Function;
3334

3435
public class RateLimitWindowSet {
@@ -133,6 +134,26 @@ public void deleteRules(Set<String> rules) {
133134
}
134135
}
135136

137+
/**
138+
* 过期清理单个rule下所有WindowContainer
139+
*/
140+
public void cleanupContainers() {
141+
AtomicInteger rulesExpired = new AtomicInteger(0);
142+
windowByRule.entrySet().removeIf(entry -> {
143+
boolean expired = entry.getValue().checkAndCleanExpiredWindows();
144+
if (expired) {
145+
rulesExpired.incrementAndGet();
146+
LOG.info("[RateLimitWindowSet] rule {} for service {} has been expired, window container {}",
147+
entry.getKey(), serviceKey, entry.getValue());
148+
}
149+
return expired;
150+
});
151+
if (rulesExpired.get() > 0) {
152+
LOG.info("[RateLimitWindowSet] {} rules have been cleaned up due to expiration, service {}",
153+
rulesExpired, serviceKey);
154+
}
155+
}
156+
136157
public AsyncRateLimitConnector getAsyncRateLimitConnector() {
137158
return asyncRateLimitConnector;
138159
}

polaris-ratelimit/polaris-ratelimit-client/src/main/java/com/tencent/polaris/ratelimit/client/flow/StreamCounterSet.java

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,12 +19,11 @@
1919

2020
import com.tencent.polaris.client.pojo.Node;
2121
import com.tencent.polaris.logging.LoggerFactory;
22+
import org.slf4j.Logger;
2223

2324
import java.util.concurrent.atomic.AtomicInteger;
2425
import java.util.concurrent.atomic.AtomicReference;
2526

26-
import org.slf4j.Logger;
27-
2827
/**
2928
* 计数器对象
3029
*/
@@ -97,11 +96,20 @@ public boolean decreaseReference() {
9796
return false;
9897
}
9998

100-
public void deleteInitRecord(ServiceIdentifier serviceIdentifier) {
99+
public InitializeRecord deleteInitRecord(ServiceIdentifier serviceIdentifier) {
100+
StreamResource streamResource = currentStreamResource.get();
101+
if (null != streamResource) {
102+
return streamResource.deleteInitRecord(serviceIdentifier);
103+
}
104+
return null;
105+
}
106+
107+
public InitializeRecord deleteInitRecord(ServiceIdentifier serviceIdentifier, RateLimitWindow window) {
101108
StreamResource streamResource = currentStreamResource.get();
102109
if (null != streamResource) {
103-
streamResource.deleteInitRecord(serviceIdentifier);
110+
return streamResource.deleteInitRecord(serviceIdentifier, window);
104111
}
112+
return null;
105113
}
106114

107115

0 commit comments

Comments
 (0)