Skip to content

Commit 31604f1

Browse files
authored
Merge pull request #233 from captainroy-hy/cherrypick/fix/ratelimit-grpc-leaking
修复分布式限流场景下rpc连接泄漏的问题
2 parents e9177ea + b7e72c0 commit 31604f1

File tree

1 file changed

+47
-55
lines changed

1 file changed

+47
-55
lines changed

pkg/flow/quota/remote.go

Lines changed: 47 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -130,8 +130,6 @@ type StreamCounterSet struct {
130130
lastConnectFailTimeMilli int64
131131
// 创建时间
132132
createTimeMilli int64
133-
// 是否已经过期
134-
expired int32
135133
// 时间差
136134
timeDiff int64
137135
}
@@ -156,13 +154,6 @@ func (s *StreamCounterSet) CompareTo(value interface{}) int {
156154
return 1
157155
}
158156

159-
// EnsureDeleted 删除前进行检查,返回true才删除,该检查是同步操作
160-
func (s *StreamCounterSet) EnsureDeleted(value interface{}) bool {
161-
counterSet := value.(*StreamCounterSet)
162-
result := atomic.LoadInt32(&counterSet.expired) > 0
163-
return result
164-
}
165-
166157
// HasInitialized 是否已经初始化
167158
func (s *StreamCounterSet) HasInitialized(svcKey model.ServiceKey, labels string) bool {
168159
s.mutex.RLock()
@@ -589,8 +580,6 @@ type asyncRateLimitConnector struct {
589580
destroyed bool
590581
// 全局上下文信息
591582
valueCtx model.ValueContext
592-
// 单次加载
593-
once *sync.Once
594583
// 获取自身IP的互斥锁
595584
clientHostMutex *sync.Mutex
596585
// 自身IP信息
@@ -599,10 +588,10 @@ type asyncRateLimitConnector struct {
599588
connTimeout time.Duration
600589
// 消息超时时间
601590
msgTimeout time.Duration
602-
// 淘汰清理任务列表
603-
taskValues model.TaskValues
604591
// 定时淘汰间隔
605592
purgeInterval time.Duration
593+
// 关闭定时清理任务的信号
594+
stopChan chan struct{}
606595
// 连接释放的空闲时长
607596
connIdleTimeout time.Duration
608597
// 重连间隔时间
@@ -619,7 +608,7 @@ func NewAsyncRateLimitConnector(valueCtx model.ValueContext, cfg config.Configur
619608
purgeInterval := cfg.GetProvider().GetRateLimit().GetPurgeInterval()
620609
connIdleTimeout := cfg.GetGlobal().GetServerConnector().GetConnectionIdleTimeout()
621610
reconnectInterval := cfg.GetGlobal().GetServerConnector().GetReconnectInterval()
622-
return &asyncRateLimitConnector{
611+
c := &asyncRateLimitConnector{
623612
mutex: &sync.RWMutex{},
624613
streams: make(map[HostIdentifier]*StreamCounterSet),
625614
valueCtx: valueCtx,
@@ -628,10 +617,12 @@ func NewAsyncRateLimitConnector(valueCtx model.ValueContext, cfg config.Configur
628617
purgeInterval: purgeInterval,
629618
connIdleTimeout: connIdleTimeout,
630619
reconnectInterval: reconnectInterval,
631-
once: &sync.Once{},
632620
clientHostMutex: &sync.Mutex{},
633621
protocol: protocol,
622+
stopChan: make(chan struct{}),
634623
}
624+
go c.startClearTask()
625+
return c
635626
}
636627

637628
// dropStreamCounterSet 淘汰流管理器
@@ -656,36 +647,6 @@ func (a *asyncRateLimitConnector) getStreamCounterSet(hostIdentifier HostIdentif
656647
return a.streams[hostIdentifier], nil
657648
}
658649

659-
// Process 定时处理过期任务
660-
func (a *asyncRateLimitConnector) Process(
661-
taskKey interface{}, taskValue interface{}, lastProcessTime time.Time) model.TaskResult {
662-
counterSet := taskValue.(*StreamCounterSet)
663-
nowMilli := model.CurrentMillisecond()
664-
lastProcessMilli := lastProcessTime.UnixNano() / 1e6
665-
if nowMilli-lastProcessMilli < model.ToMilliSeconds(a.purgeInterval) {
666-
return model.SKIP
667-
}
668-
669-
if !counterSet.Expired(nowMilli, true) {
670-
return model.CONTINUE
671-
}
672-
a.mutex.Lock()
673-
defer a.mutex.Unlock()
674-
if counterSet.Expired(nowMilli, false) {
675-
atomic.StoreInt32(&counterSet.expired, 1)
676-
delete(a.streams, *counterSet.HostIdentifier)
677-
counterSet.closeConnection()
678-
log.GetBaseLogger().Infof("[RateLimit]stream %s expired", *counterSet.HostIdentifier)
679-
return model.TERMINATE
680-
}
681-
return model.CONTINUE
682-
}
683-
684-
// OnTaskEvent 任务事件回调
685-
func (a *asyncRateLimitConnector) OnTaskEvent(event model.TaskEvent) {
686-
687-
}
688-
689650
// StreamCount 获取stream的数量,用于测试
690651
func (a *asyncRateLimitConnector) StreamCount() int {
691652
a.mutex.RLock()
@@ -703,15 +664,6 @@ func (a *asyncRateLimitConnector) GetMessageSender(
703664
req.HashValue = hashValue
704665
req.Metadata = map[string]string{"protocol": a.protocol}
705666
engine := a.valueCtx.GetEngine()
706-
a.once.Do(func() {
707-
_, taskValues := engine.ScheduleTask(&model.PeriodicTask{
708-
Name: "rateLimit-connector-clean",
709-
CallBack: a,
710-
Period: a.purgeInterval,
711-
DelayStart: false,
712-
})
713-
a.taskValues = taskValues
714-
})
715667
instanceResp, err := engine.SyncGetOneInstance(req)
716668
if err != nil {
717669
return nil, err
@@ -735,7 +687,6 @@ func (a *asyncRateLimitConnector) GetMessageSender(
735687
}
736688
counterSet = NewStreamCounterSet(a, hostIdentifier)
737689
a.streams[*hostIdentifier] = counterSet
738-
a.taskValues.AddValue(*hostIdentifier, counterSet)
739690
return counterSet, nil
740691
}
741692

@@ -759,6 +710,9 @@ func (a *asyncRateLimitConnector) getIPString(remoteHost string, remotePort uint
759710
// Destroy 清理
760711
func (a *asyncRateLimitConnector) Destroy() {
761712
a.mutex.Lock()
713+
if !a.destroyed {
714+
close(a.stopChan)
715+
}
762716
a.destroyed = true
763717
streams := a.streams
764718
a.streams = nil
@@ -770,3 +724,41 @@ func (a *asyncRateLimitConnector) Destroy() {
770724
stream.closeConnection()
771725
}
772726
}
727+
728+
// startClearTask 启动定时清理闲置超时的 StreamCounterSet
729+
func (a *asyncRateLimitConnector) startClearTask() {
730+
ticker := time.NewTicker(a.purgeInterval)
731+
defer ticker.Stop()
732+
for {
733+
select {
734+
case <-ticker.C:
735+
a.clearCounterSet()
736+
case <-a.stopChan:
737+
log.GetBaseLogger().Infof("[RateLimit] stop clear task")
738+
return
739+
}
740+
}
741+
}
742+
743+
// clearCounterSet 清理闲置超时的 StreamCounterSet
744+
func (a *asyncRateLimitConnector) clearCounterSet() {
745+
if a.destroyed {
746+
return
747+
}
748+
a.mutex.Lock()
749+
counterSets := make([]*StreamCounterSet, 0, len(a.streams))
750+
for _, counterSet := range a.streams {
751+
counterSets = append(counterSets, counterSet)
752+
}
753+
a.mutex.Unlock()
754+
nowMilli := model.CurrentMillisecond()
755+
for _, counterSet := range counterSets {
756+
if counterSet.Expired(nowMilli, true) {
757+
a.mutex.Lock()
758+
delete(a.streams, *counterSet.HostIdentifier)
759+
a.mutex.Unlock()
760+
counterSet.closeConnection()
761+
log.GetBaseLogger().Infof("[RateLimit]stream %s expired", *counterSet.HostIdentifier)
762+
}
763+
}
764+
}

0 commit comments

Comments
 (0)