Skip to content

Commit d06ed3c

Browse files
fix:fix counter expired no recover bug. (#638)
1 parent f1a385e commit d06ed3c

File tree

3 files changed

+38
-19
lines changed

3 files changed

+38
-19
lines changed

polaris-ratelimit/polaris-ratelimit-client/src/main/java/com/tencent/polaris/ratelimit/client/flow/RateLimitWindow.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,8 @@ public enum WindowStatus {
9393

9494
private final AtomicLong lastAccessTimeMs = new AtomicLong();
9595

96+
private final AtomicLong lastInitTimeMs = new AtomicLong();
97+
9698
// 执行正式分配的令牌桶
9799
private final QuotaBucket allocatingBucket;
98100

@@ -314,6 +316,14 @@ public boolean isExpired() {
314316
return expired;
315317
}
316318

319+
public long getLastInitTimeMs() {
320+
return lastInitTimeMs.get();
321+
}
322+
323+
public void setLastInitTimeMs(long lastInitTimeMs) {
324+
this.lastInitTimeMs.set(lastInitTimeMs);
325+
}
326+
317327
/**
318328
* 获取当前窗口的状态
319329
*

polaris-ratelimit/polaris-ratelimit-client/src/main/java/com/tencent/polaris/ratelimit/client/flow/StreamResource.java

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -142,6 +142,10 @@ public void closeStream(boolean closeSend) {
142142
if (null != channel) {
143143
channel.shutdown();
144144
}
145+
// 重置窗口最后初始化时间,清初始化窗口记录、上报索引记录
146+
initRecord.forEach((serviceIdentifier, record) -> record.getRateLimitWindow().setLastInitTimeMs(0));
147+
initRecord.clear();
148+
counters.clear();
145149
}
146150
}
147151

@@ -152,7 +156,9 @@ public void onNext(RateLimitResponse rateLimitResponse) {
152156
if (RateLimitCmd.INIT.equals(rateLimitResponse.getCmd())) {
153157
handleRateLimitInitResponse(rateLimitResponse.getRateLimitInitResponse());
154158
} else if (RateLimitCmd.ACQUIRE.equals(rateLimitResponse.getCmd())) {
155-
handleRateLimitReportResponse(rateLimitResponse.getRateLimitReportResponse());
159+
if (!handleRateLimitReportResponse(rateLimitResponse.getRateLimitReportResponse())) {
160+
closeStream(true);
161+
}
156162
}
157163
}
158164

@@ -221,6 +227,7 @@ private void handleRateLimitInitResponse(RateLimitInitResponse rateLimitInitResp
221227
long remoteQuotaTimeMilli = rateLimitInitResponse.getTimestamp();
222228
long localQuotaTimeMilli = getLocalTimeMilli(remoteQuotaTimeMilli);
223229
RateLimitWindow rateLimitWindow = initializeRecord.getRateLimitWindow();
230+
rateLimitWindow.setLastInitTimeMs(0); // 重置上次初始化时间,从而在metric变更或上报失败时可再次立刻再初始化
224231
countersList.forEach(counter -> {
225232
initializeRecord.getDurationRecord().putIfAbsent(counter.getDuration(), counter.getCounterKey());
226233
counters.putIfAbsent(counter.getCounterKey(),
@@ -238,16 +245,16 @@ private void handleRateLimitInitResponse(RateLimitInitResponse rateLimitInitResp
238245
*
239246
* @param rateLimitReportResponse report的回包
240247
*/
241-
void handleRateLimitReportResponse(RateLimitReportResponse rateLimitReportResponse) {
248+
boolean handleRateLimitReportResponse(RateLimitReportResponse rateLimitReportResponse) {
242249
LOG.debug("[handleRateLimitReportRequest] response:{}", rateLimitReportResponse);
243250
if (rateLimitReportResponse.getCode() != ServerCodes.EXECUTE_SUCCESS) {
244251
LOG.error("[handleRateLimitReportRequest] failed. code is {}", rateLimitReportResponse.getCode());
245-
return;
252+
return false;
246253
}
247254
List<QuotaLeft> quotaLeftsList = rateLimitReportResponse.getQuotaLeftsList();
248255
if (CollectionUtils.isEmpty(quotaLeftsList)) {
249256
LOG.error("[handleRateLimitReportRequest] quotaLefts is empty.");
250-
return;
257+
return true;
251258
}
252259
long remoteQuotaTimeMilli = rateLimitReportResponse.getTimestamp();
253260
long localQuotaTimeMilli = getLocalTimeMilli(remoteQuotaTimeMilli);
@@ -257,6 +264,7 @@ void handleRateLimitReportResponse(RateLimitReportResponse rateLimitReportRespon
257264
localQuotaTimeMilli, callback.getDuration() * 1000);
258265
callback.getRateLimitWindow().getAllocatingBucket().onRemoteUpdate(remoteQuotaInfo);
259266
});
267+
return true;
260268
}
261269

262270
public int getClientKey() {

polaris-ratelimit/polaris-ratelimit-client/src/main/java/com/tencent/polaris/ratelimit/client/sync/PolarisRemoteSyncTask.java

Lines changed: 16 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -57,11 +57,6 @@ public class PolarisRemoteSyncTask implements RemoteSyncTask {
5757
*/
5858
private final ServiceIdentifier serviceIdentifier;
5959

60-
/**
61-
* 最近一次初始化时间
62-
*/
63-
private long latestInitTime = 0;
64-
6560
public PolarisRemoteSyncTask(RateLimitWindow window) {
6661
this.window = window;
6762
this.asyncRateLimitConnector = window.getWindowSet().getAsyncRateLimitConnector();
@@ -83,12 +78,7 @@ public void run() {
8378
case DELETED:
8479
break;
8580
case INITIALIZING:
86-
if (latestInitTime != 0 && System.currentTimeMillis() - latestInitTime < INIT_WAIT_RESPONSE_TIME) {
87-
LOG.debug("currentTime - latestInitTime = {}", System.currentTimeMillis() - latestInitTime);
88-
break;
89-
}
90-
latestInitTime = System.currentTimeMillis();
91-
doRemoteInit();
81+
doRemoteInit(false);
9282
break;
9383
default:
9484
doRemoteAcquire();
@@ -110,7 +100,19 @@ private boolean isInitExpired(InitializeRecord initializeRecord) {
110100
/**
111101
* 发送初始化请求
112102
*/
113-
private void doRemoteInit() {
103+
private void doRemoteInit(boolean redoInit) {
104+
// 检查是否在初始化过程中
105+
long currentTimeMs = System.currentTimeMillis();
106+
long lastInitTime = this.window.getLastInitTimeMs();
107+
if (lastInitTime != 0 && currentTimeMs - lastInitTime < INIT_WAIT_RESPONSE_TIME) {
108+
LOG.debug("currentTime - latestInitTime = {}", currentTimeMs - lastInitTime);
109+
return;
110+
}
111+
this.window.setLastInitTimeMs(currentTimeMs);
112+
if (redoInit) {
113+
LOG.warn("[doRemoteAcquire] has not init. redo init: window {}, {}", window, window.getUniqueKey());
114+
}
115+
114116
StreamCounterSet streamCounterSet = asyncRateLimitConnector
115117
.getStreamCounterSet(window.getWindowSet().getRateLimitExtension().getExtensions(),
116118
window.getRemoteCluster(), window.getRemoteAddresses(), window.getUniqueKey(),
@@ -181,8 +183,7 @@ private void doRemoteAcquire() {
181183
StreamResource streamResource = streamCounterSet.checkAndCreateResource(serviceIdentifier, window);
182184

183185
if (!streamResource.hasInit(serviceIdentifier)) {
184-
LOG.warn("[doRemoteAcquire] has not inited. serviceKey:{}", window.getSvcKey());
185-
doRemoteInit();
186+
doRemoteInit(true);
186187
return;
187188
}
188189
//调整时间
@@ -206,7 +207,7 @@ private void doRemoteAcquire() {
206207
if (null == counterKey) {
207208
LOG.warn("[doRemoteAcquire] counterKey for {}, duration {} not found", window.getUniqueKey(),
208209
entry.getKey());
209-
doRemoteInit();
210+
doRemoteInit(true);
210211
return;
211212
}
212213
quotaSum.setCounterKey(counterKey);

0 commit comments

Comments
 (0)