Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 17 additions & 16 deletions collector/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
# under the License.

# the cluster that this collector is binding
cluster_name : "onebox"
cluster_name: "onebox"

# the meta server addresses of the cluster.
meta_servers:
Expand All @@ -25,33 +25,34 @@ meta_servers:
- 127.0.0.1:34603

# local server port
port : 34101
port: 34101

metrics:
# use falcon as monitoring system.
sink : falcon
report_interval : 10s
sink: falcon
report_interval: 10s

prometheus:
# the exposed port for prometheus exposer
exposer_port : 1111
exposer_port: 1111

falcon_agent:
# the host IP of falcon agent
host : "127.0.0.1"
port : 1988
http_path : "/v1/push"
host: "127.0.0.1"
port: 1988
http_path: "/v1/push"

availability_detect:
table_name : test
partition_count : 16
max_replica_count : 3
table_name: test
partition_count: 16
max_replica_count: 3

hotspot:
rpc_timeout : 5s
partition_detect_interval : 30s
pull_metrics_timeout : 5s
sample_metrics_interval : 10s
max_sample_size : 128
retention_period: 24h
rpc_timeout: 5s
partition_detect_interval: 30s
pull_metrics_timeout: 5s
sample_metrics_interval: 10s
max_sample_size: 128
hotspot_partition_min_score: 3
hotspot_partition_min_qps: 100
2 changes: 1 addition & 1 deletion collector/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ module github.com/apache/incubator-pegasus/collector
go 1.18

require (
github.com/apache/incubator-pegasus/go-client v0.0.0-20260121121155-96868ed93b2a
github.com/apache/incubator-pegasus/go-client v0.0.0-20260211095029-022854b0259f
github.com/gammazero/deque v1.0.0
github.com/kataras/iris/v12 v12.2.0
github.com/prometheus/client_golang v1.18.0
Expand Down
4 changes: 2 additions & 2 deletions collector/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@ github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuy
github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0=
github.com/andybalholm/brotli v1.0.5 h1:8uQZIdzKmjc/iuPu7O2ioW48L81FgatrcpfFmiq/cCs=
github.com/andybalholm/brotli v1.0.5/go.mod h1:fO7iG3H7G2nSZ7m0zPUDn85XEX2GTukHGRSepvi9Eig=
github.com/apache/incubator-pegasus/go-client v0.0.0-20260121121155-96868ed93b2a h1:Vqws5uoQ/ibw4QcnDHdXIleiGunC1QmZaMCrJN0znEk=
github.com/apache/incubator-pegasus/go-client v0.0.0-20260121121155-96868ed93b2a/go.mod h1:SQnz/3Qg6uH1tfl3MKmiYwNk+i5CZiMD9AtMOTZkpgw=
github.com/apache/incubator-pegasus/go-client v0.0.0-20260211095029-022854b0259f h1:Q9jSLZZCsD8tdU8h+qFe6PN5DPqWfiezkfK/8l16i7Y=
github.com/apache/incubator-pegasus/go-client v0.0.0-20260211095029-022854b0259f/go.mod h1:SQnz/3Qg6uH1tfl3MKmiYwNk+i5CZiMD9AtMOTZkpgw=
github.com/apache/thrift v0.13.0 h1:5hryIiq9gtn+MiLVn0wP37kb/uTeRZgN08WoCsAhIhI=
github.com/apache/thrift v0.13.0/go.mod h1:cp2SuWMxlEZw2r+iP2GNCdIi4C1qmUzdZFSVb+bacwQ=
github.com/armon/circbuf v0.0.0-20150827004946-bbbad097214e/go.mod h1:3U/XgcO3hCbHZ8TKRvWD2dDTCfh9M9ya+I9JpbB7O8o=
Expand Down
75 changes: 70 additions & 5 deletions collector/hotspot/partition_detector.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ type PartitionDetector interface {

type PartitionDetectorConfig struct {
MetaServers []string
RetentionPeriod time.Duration
RpcTimeout time.Duration
DetectInterval time.Duration
PullMetricsTimeout time.Duration
Expand All @@ -54,6 +55,7 @@ type PartitionDetectorConfig struct {
func LoadPartitionDetectorConfig() *PartitionDetectorConfig {
return &PartitionDetectorConfig{
MetaServers: viper.GetStringSlice("meta_servers"),
RetentionPeriod: viper.GetDuration("hotspot.retention_period"),
RpcTimeout: viper.GetDuration("hotspot.rpc_timeout"),
DetectInterval: viper.GetDuration("hotspot.partition_detect_interval"),
PullMetricsTimeout: viper.GetDuration("hotspot.pull_metrics_timeout"),
Expand All @@ -69,6 +71,10 @@ func NewPartitionDetector(cfg *PartitionDetectorConfig) (PartitionDetector, erro
return nil, fmt.Errorf("MetaServers should not be empty")
}

if cfg.RetentionPeriod <= 0 {
return nil, fmt.Errorf("RetentionPeriod(%d) must be > 0", cfg.RetentionPeriod)
}

if cfg.DetectInterval <= 0 {
return nil, fmt.Errorf("DetectInterval(%d) must be > 0", cfg.DetectInterval)
}
Expand Down Expand Up @@ -111,6 +117,12 @@ type partitionDetectorImpl struct {
}

func (d *partitionDetectorImpl) Run(tom *tomb.Tomb) error {
ctx, cancel := context.WithCancel(context.Background())

var wg sync.WaitGroup
wg.Add(1)
go d.checkExpiration(ctx, &wg)

ticker := time.NewTicker(d.cfg.DetectInterval)
defer ticker.Stop()

Expand All @@ -119,12 +131,50 @@ func (d *partitionDetectorImpl) Run(tom *tomb.Tomb) error {
case <-ticker.C:
d.detect()
case <-tom.Dying():
cancel()
wg.Wait()

log.Info("Hotspot partition detector exited.")
return nil
}
}
}

func (d *partitionDetectorImpl) checkExpiration(ctx context.Context, wg *sync.WaitGroup) {
defer wg.Done()

ticker := time.NewTicker(d.cfg.RetentionPeriod)
defer ticker.Stop()

for {
select {
case <-ticker.C:
d.retireExpiredTables()

case <-ctx.Done():
log.Info("Expiration checker for hotspot exited.")
return
}
}
}

func (d *partitionDetectorImpl) retireExpiredTables() {
currentTimestampSeconds := time.Now().Unix()

d.mtx.Lock()
defer d.mtx.Unlock()

log.Info("check expired tables")

for key, analyzer := range d.analyzers {
if !analyzer.isExpired(currentTimestampSeconds) {
continue
}

delete(d.analyzers, key)
}
}

func (d *partitionDetectorImpl) detect() {
appMap, err := d.aggregate()
if err != nil {
Expand Down Expand Up @@ -369,10 +419,8 @@ func calculateStats(
}

// Only primary replica of a partition will be counted.
// TODO(wangdan): support Equal() for base.HostPort.
primary := stats.partitionConfigs[partitionID].HpPrimary
if primary.GetHost() != node.HpNode.GetHost() ||
primary.GetPort() != node.HpNode.GetPort() {
if !node.HpNode.Equal(primary) {
continue
}

Expand Down Expand Up @@ -439,6 +487,10 @@ func calculateHotspotStats(appMap appStatsMap) map[partitionAnalyzerKey][]hotspo
func (d *partitionDetectorImpl) analyse(appMap appStatsMap) {
hotspotMap := calculateHotspotStats(appMap)

nowTime := time.Now()
expireTime := nowTime.Add(d.cfg.RetentionPeriod)
expireTimestampSeconds := expireTime.Unix()

d.mtx.Lock()
defer d.mtx.Unlock()

Expand All @@ -455,7 +507,7 @@ func (d *partitionDetectorImpl) analyse(appMap appStatsMap) {
d.analyzers[key] = analyzer
}

analyzer.add(value)
analyzer.add(value, expireTimestampSeconds)

// Perform the analysis asynchronously.
go analyzer.analyse()
Expand Down Expand Up @@ -489,13 +541,26 @@ type partitionAnalyzer struct {
appID int32
partitionCount int32
mtx sync.RWMutex
expireTimestampSeconds int64
samples deque.Deque[[]hotspotPartitionStats] // Each element is a sample of all partitions of the table
}

func (a *partitionAnalyzer) add(sample []hotspotPartitionStats) {
func (a *partitionAnalyzer) isExpired(currentTimestampSeconds int64) bool {
a.mtx.RLock()
defer a.mtx.RUnlock()

return currentTimestampSeconds >= a.expireTimestampSeconds
}

func (a *partitionAnalyzer) add(
sample []hotspotPartitionStats,
expireTimestampSeconds int64,
) {
a.mtx.Lock()
defer a.mtx.Unlock()

a.expireTimestampSeconds = expireTimestampSeconds

for a.samples.Len() >= a.maxSampleSize {
a.samples.PopFront()
}
Expand Down
Loading