diff --git a/collector/config.yml b/collector/config.yml index d489ab0b7b..8847629dd3 100644 --- a/collector/config.yml +++ b/collector/config.yml @@ -16,7 +16,7 @@ # under the License. # the cluster that this collector is binding -cluster_name : "onebox" +cluster_name: "onebox" # the meta server addresses of the cluster. meta_servers: @@ -25,33 +25,34 @@ meta_servers: - 127.0.0.1:34603 # local server port -port : 34101 +port: 34101 metrics: # use falcon as monitoring system. - sink : falcon - report_interval : 10s + sink: falcon + report_interval: 10s prometheus: # the exposed port for prometheus exposer - exposer_port : 1111 + exposer_port: 1111 falcon_agent: # the host IP of falcon agent - host : "127.0.0.1" - port : 1988 - http_path : "/v1/push" + host: "127.0.0.1" + port: 1988 + http_path: "/v1/push" availability_detect: - table_name : test - partition_count : 16 - max_replica_count : 3 + table_name: test + partition_count: 16 + max_replica_count: 3 hotspot: - rpc_timeout : 5s - partition_detect_interval : 30s - pull_metrics_timeout : 5s - sample_metrics_interval : 10s - max_sample_size : 128 + retention_period: 24h + rpc_timeout: 5s + partition_detect_interval: 30s + pull_metrics_timeout: 5s + sample_metrics_interval: 10s + max_sample_size: 128 hotspot_partition_min_score: 3 hotspot_partition_min_qps: 100 diff --git a/collector/go.mod b/collector/go.mod index 5aabb0b16e..58aa51cf83 100644 --- a/collector/go.mod +++ b/collector/go.mod @@ -20,7 +20,7 @@ module github.com/apache/incubator-pegasus/collector go 1.18 require ( - github.com/apache/incubator-pegasus/go-client v0.0.0-20260121121155-96868ed93b2a + github.com/apache/incubator-pegasus/go-client v0.0.0-20260211095029-022854b0259f github.com/gammazero/deque v1.0.0 github.com/kataras/iris/v12 v12.2.0 github.com/prometheus/client_golang v1.18.0 diff --git a/collector/go.sum b/collector/go.sum index 95ebed26dd..59e6276e46 100644 --- a/collector/go.sum +++ b/collector/go.sum @@ -35,8 +35,8 @@ github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuy github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0= github.com/andybalholm/brotli v1.0.5 h1:8uQZIdzKmjc/iuPu7O2ioW48L81FgatrcpfFmiq/cCs= github.com/andybalholm/brotli v1.0.5/go.mod h1:fO7iG3H7G2nSZ7m0zPUDn85XEX2GTukHGRSepvi9Eig= -github.com/apache/incubator-pegasus/go-client v0.0.0-20260121121155-96868ed93b2a h1:Vqws5uoQ/ibw4QcnDHdXIleiGunC1QmZaMCrJN0znEk= -github.com/apache/incubator-pegasus/go-client v0.0.0-20260121121155-96868ed93b2a/go.mod h1:SQnz/3Qg6uH1tfl3MKmiYwNk+i5CZiMD9AtMOTZkpgw= +github.com/apache/incubator-pegasus/go-client v0.0.0-20260211095029-022854b0259f h1:Q9jSLZZCsD8tdU8h+qFe6PN5DPqWfiezkfK/8l16i7Y= +github.com/apache/incubator-pegasus/go-client v0.0.0-20260211095029-022854b0259f/go.mod h1:SQnz/3Qg6uH1tfl3MKmiYwNk+i5CZiMD9AtMOTZkpgw= github.com/apache/thrift v0.13.0 h1:5hryIiq9gtn+MiLVn0wP37kb/uTeRZgN08WoCsAhIhI= github.com/apache/thrift v0.13.0/go.mod h1:cp2SuWMxlEZw2r+iP2GNCdIi4C1qmUzdZFSVb+bacwQ= github.com/armon/circbuf v0.0.0-20150827004946-bbbad097214e/go.mod h1:3U/XgcO3hCbHZ8TKRvWD2dDTCfh9M9ya+I9JpbB7O8o= diff --git a/collector/hotspot/partition_detector.go b/collector/hotspot/partition_detector.go index 8e0e0a8be6..1a8c6277fd 100644 --- a/collector/hotspot/partition_detector.go +++ b/collector/hotspot/partition_detector.go @@ -42,6 +42,7 @@ type PartitionDetector interface { type PartitionDetectorConfig struct { MetaServers []string + RetentionPeriod time.Duration RpcTimeout time.Duration DetectInterval time.Duration PullMetricsTimeout time.Duration @@ -54,6 +55,7 @@ type PartitionDetectorConfig struct { func LoadPartitionDetectorConfig() *PartitionDetectorConfig { return &PartitionDetectorConfig{ MetaServers: viper.GetStringSlice("meta_servers"), + RetentionPeriod: viper.GetDuration("hotspot.retention_period"), RpcTimeout: viper.GetDuration("hotspot.rpc_timeout"), DetectInterval: viper.GetDuration("hotspot.partition_detect_interval"), PullMetricsTimeout: viper.GetDuration("hotspot.pull_metrics_timeout"), @@ -69,6 +71,10 @@ func NewPartitionDetector(cfg *PartitionDetectorConfig) (PartitionDetector, erro return nil, fmt.Errorf("MetaServers should not be empty") } + if cfg.RetentionPeriod <= 0 { + return nil, fmt.Errorf("RetentionPeriod(%d) must be > 0", cfg.RetentionPeriod) + } + if cfg.DetectInterval <= 0 { return nil, fmt.Errorf("DetectInterval(%d) must be > 0", cfg.DetectInterval) } @@ -111,6 +117,12 @@ type partitionDetectorImpl struct { } func (d *partitionDetectorImpl) Run(tom *tomb.Tomb) error { + ctx, cancel := context.WithCancel(context.Background()) + + var wg sync.WaitGroup + wg.Add(1) + go d.checkExpiration(ctx, &wg) + ticker := time.NewTicker(d.cfg.DetectInterval) defer ticker.Stop() @@ -119,12 +131,50 @@ func (d *partitionDetectorImpl) Run(tom *tomb.Tomb) error { case <-ticker.C: d.detect() case <-tom.Dying(): + cancel() + wg.Wait() + log.Info("Hotspot partition detector exited.") return nil } } } +func (d *partitionDetectorImpl) checkExpiration(ctx context.Context, wg *sync.WaitGroup) { + defer wg.Done() + + ticker := time.NewTicker(d.cfg.RetentionPeriod) + defer ticker.Stop() + + for { + select { + case <-ticker.C: + d.retireExpiredTables() + + case <-ctx.Done(): + log.Info("Expiration checker for hotspot exited.") + return + } + } +} + +func (d *partitionDetectorImpl) retireExpiredTables() { + currentTimestampSeconds := time.Now().Unix() + + d.mtx.Lock() + defer d.mtx.Unlock() + + log.Info("check expired tables") + + for key, analyzer := range d.analyzers { + if !analyzer.isExpired(currentTimestampSeconds) { + continue + } + + delete(d.analyzers, key) + } +} + func (d *partitionDetectorImpl) detect() { appMap, err := d.aggregate() if err != nil { @@ -369,10 +419,8 @@ func calculateStats( } // Only primary replica of a partition will be counted. - // TODO(wangdan): support Equal() for base.HostPort. primary := stats.partitionConfigs[partitionID].HpPrimary - if primary.GetHost() != node.HpNode.GetHost() || - primary.GetPort() != node.HpNode.GetPort() { + if !node.HpNode.Equal(primary) { continue } @@ -439,6 +487,10 @@ func calculateHotspotStats(appMap appStatsMap) map[partitionAnalyzerKey][]hotspo func (d *partitionDetectorImpl) analyse(appMap appStatsMap) { hotspotMap := calculateHotspotStats(appMap) + nowTime := time.Now() + expireTime := nowTime.Add(d.cfg.RetentionPeriod) + expireTimestampSeconds := expireTime.Unix() + d.mtx.Lock() defer d.mtx.Unlock() @@ -455,7 +507,7 @@ func (d *partitionDetectorImpl) analyse(appMap appStatsMap) { d.analyzers[key] = analyzer } - analyzer.add(value) + analyzer.add(value, expireTimestampSeconds) // Perform the analysis asynchronously. go analyzer.analyse() @@ -489,13 +541,26 @@ type partitionAnalyzer struct { appID int32 partitionCount int32 mtx sync.RWMutex + expireTimestampSeconds int64 samples deque.Deque[[]hotspotPartitionStats] // Each element is a sample of all partitions of the table } -func (a *partitionAnalyzer) add(sample []hotspotPartitionStats) { +func (a *partitionAnalyzer) isExpired(currentTimestampSeconds int64) bool { + a.mtx.RLock() + defer a.mtx.RUnlock() + + return currentTimestampSeconds >= a.expireTimestampSeconds +} + +func (a *partitionAnalyzer) add( + sample []hotspotPartitionStats, + expireTimestampSeconds int64, +) { a.mtx.Lock() defer a.mtx.Unlock() + a.expireTimestampSeconds = expireTimestampSeconds + for a.samples.Len() >= a.maxSampleSize { a.samples.PopFront() }