Skip to content

Commit 022854b

Browse files
authored
feat(collector): analyze statistics to determine whether partitions are hotspots (#2367)
#2358 Based on the [Z-score](https://en.wikipedia.org/wiki/Standard_score) method, hotspot scores are calculated for each partition using historical samples of total read/write QPS. A partition is considered a hotspot once both its score and QPS exceed the configured thresholds.
1 parent fcab87d commit 022854b

File tree

2 files changed

+181
-34
lines changed

2 files changed

+181
-34
lines changed

collector/config.yml

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
cluster_name : "onebox"
2020

2121
# the meta server addresses of the cluster.
22-
meta_servers:
22+
meta_servers:
2323
- 127.0.0.1:34601
2424
- 127.0.0.1:34602
2525
- 127.0.0.1:34603
@@ -29,12 +29,12 @@ port : 34101
2929

3030
metrics:
3131
# use falcon as monitoring system.
32-
sink : falcon
32+
sink : falcon
3333
report_interval : 10s
3434

3535
prometheus:
3636
# the exposed port for prometheus exposer
37-
exposer_port : 1111
37+
exposer_port : 1111
3838

3939
falcon_agent:
4040
# the host IP of falcon agent
@@ -53,3 +53,5 @@ hotspot:
5353
pull_metrics_timeout : 5s
5454
sample_metrics_interval : 10s
5555
max_sample_size : 128
56+
hotspot_partition_min_score: 3
57+
hotspot_partition_min_qps: 100

collector/hotspot/partition_detector.go

Lines changed: 176 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ package hotspot
2020
import (
2121
"context"
2222
"fmt"
23+
"math"
2324
"strconv"
2425
"sync"
2526
"time"
@@ -40,22 +41,26 @@ type PartitionDetector interface {
4041
}
4142

4243
type PartitionDetectorConfig struct {
43-
MetaServers []string
44-
RpcTimeout time.Duration
45-
DetectInterval time.Duration
46-
PullMetricsTimeout time.Duration
47-
SampleMetricsInterval time.Duration
48-
MaxSampleSize int
44+
MetaServers []string
45+
RpcTimeout time.Duration
46+
DetectInterval time.Duration
47+
PullMetricsTimeout time.Duration
48+
SampleMetricsInterval time.Duration
49+
MaxSampleSize int
50+
HotspotPartitionMinScore float64
51+
HotspotPartitionMinQPS float64
4952
}
5053

5154
func LoadPartitionDetectorConfig() *PartitionDetectorConfig {
5255
return &PartitionDetectorConfig{
53-
MetaServers: viper.GetStringSlice("meta_servers"),
54-
RpcTimeout: viper.GetDuration("hotspot.rpc_timeout"),
55-
DetectInterval: viper.GetDuration("hotspot.partition_detect_interval"),
56-
PullMetricsTimeout: viper.GetDuration("hotspot.pull_metrics_timeout"),
57-
SampleMetricsInterval: viper.GetDuration("hotspot.sample_metrics_interval"),
58-
MaxSampleSize: viper.GetInt("hotspot.max_sample_size"),
56+
MetaServers: viper.GetStringSlice("meta_servers"),
57+
RpcTimeout: viper.GetDuration("hotspot.rpc_timeout"),
58+
DetectInterval: viper.GetDuration("hotspot.partition_detect_interval"),
59+
PullMetricsTimeout: viper.GetDuration("hotspot.pull_metrics_timeout"),
60+
SampleMetricsInterval: viper.GetDuration("hotspot.sample_metrics_interval"),
61+
MaxSampleSize: viper.GetInt("hotspot.max_sample_size"),
62+
HotspotPartitionMinScore: viper.GetFloat64("hotspot.hotspot_partition_min_score"),
63+
HotspotPartitionMinQPS: viper.GetFloat64("hotspot.hotspot_partition_min_qps"),
5964
}
6065
}
6166

@@ -81,6 +86,18 @@ func NewPartitionDetector(cfg *PartitionDetectorConfig) (PartitionDetector, erro
8186
cfg.DetectInterval, cfg.SampleMetricsInterval)
8287
}
8388

89+
if cfg.MaxSampleSize <= 0 {
90+
return nil, fmt.Errorf("MaxSampleSize(%d) must be > 0", cfg.MaxSampleSize)
91+
}
92+
93+
if cfg.HotspotPartitionMinScore <= 0 {
94+
return nil, fmt.Errorf("HotspotPartitionMinScore(%f) must be > 0", cfg.HotspotPartitionMinScore)
95+
}
96+
97+
if cfg.HotspotPartitionMinQPS <= 0 {
98+
return nil, fmt.Errorf("HotspotPartitionMinQPS (%f) must be > 0", cfg.HotspotPartitionMinQPS)
99+
}
100+
84101
return &partitionDetectorImpl{
85102
cfg: cfg,
86103
analyzers: make(map[partitionAnalyzerKey]*partitionAnalyzer),
@@ -109,10 +126,14 @@ func (d *partitionDetectorImpl) Run(tom *tomb.Tomb) error {
109126
}
110127

111128
func (d *partitionDetectorImpl) detect() {
112-
err := d.aggregate()
129+
appMap, err := d.aggregate()
113130
if err != nil {
114131
log.Error("failed to aggregate metrics for hotspot: ", err)
115132
}
133+
134+
log.Debugf("stats=%v", appMap)
135+
136+
d.analyse(appMap)
116137
}
117138

118139
// {appID -> appStats}.
@@ -125,21 +146,19 @@ type appStats struct {
125146
partitionStats []map[string]float64 // {metricName -> metricValue} for each partition.
126147
}
127148

128-
func (d *partitionDetectorImpl) aggregate() error {
149+
func (d *partitionDetectorImpl) aggregate() (appStatsMap, error) {
129150
// appMap includes the structures that hold all the final statistical values.
130151
appMap, nodes, err := d.fetchMetadata()
131152
if err != nil {
132-
return err
153+
return nil, err
133154
}
134155

135156
err = d.aggregateMetrics(appMap, nodes)
136157
if err != nil {
137-
return err
158+
return nil, err
138159
}
139160

140-
d.addHotspotSamples(appMap)
141-
142-
return nil
161+
return appMap, nil
143162
}
144163

145164
// Fetch necessary metadata from meta server for the aggregation of metrics, including:
@@ -217,7 +236,7 @@ func (d *partitionDetectorImpl) aggregateMetrics(appMap appStatsMap, nodes []*ad
217236
snapshot.TimestampNS, startSnapshots[i].TimestampNS)
218237
}
219238

220-
d.calculateStats(snapshot, nodes[i],
239+
calculateStats(snapshot, nodes[i],
221240
func(stats map[string]float64, key string, operand float64) {
222241
// Just set the number of requests with ending snapshot.
223242
stats[key] = operand
@@ -226,7 +245,7 @@ func (d *partitionDetectorImpl) aggregateMetrics(appMap appStatsMap, nodes []*ad
226245
}
227246

228247
for i, snapshot := range startSnapshots {
229-
d.calculateStats(snapshot, nodes[i],
248+
calculateStats(snapshot, nodes[i],
230249
func(duration time.Duration) aggregator {
231250
return func(stats map[string]float64, key string, operand float64) {
232251
value, ok := stats[key]
@@ -315,7 +334,7 @@ func (d *partitionDetectorImpl) pullMetrics(nodes []*admin.NodeInfo) ([]*metrics
315334
return results, nil
316335
}
317336

318-
func (d *partitionDetectorImpl) calculateStats(
337+
func calculateStats(
319338
snapshot *metrics.MetricQueryBriefValueSnapshot,
320339
node *admin.NodeInfo,
321340
adder aggregator,
@@ -415,8 +434,9 @@ func calculateHotspotStats(appMap appStatsMap) map[partitionAnalyzerKey][]hotspo
415434
}
416435

417436
// Calculate statistical values over multiples tables with all partitions of each table as
418-
// a sample used for future analysis of hotspot partitions.
419-
func (d *partitionDetectorImpl) addHotspotSamples(appMap appStatsMap) {
437+
// a sample, and analyse all samples of each table asynchronously to decide which partitions
438+
// of it are hotspots.
439+
func (d *partitionDetectorImpl) analyse(appMap appStatsMap) {
420440
hotspotMap := calculateHotspotStats(appMap)
421441

422442
d.mtx.Lock()
@@ -425,31 +445,156 @@ func (d *partitionDetectorImpl) addHotspotSamples(appMap appStatsMap) {
425445
for key, value := range hotspotMap {
426446
analyzer, ok := d.analyzers[key]
427447
if !ok {
428-
analyzer = newPartitionAnalyzer(d.cfg.MaxSampleSize)
448+
analyzer = newPartitionAnalyzer(
449+
d.cfg.MaxSampleSize,
450+
d.cfg.HotspotPartitionMinScore,
451+
d.cfg.HotspotPartitionMinQPS,
452+
key.appID,
453+
key.partitionCount,
454+
)
429455
d.analyzers[key] = analyzer
430456
}
431457

432-
analyzer.addSample(value)
458+
analyzer.add(value)
459+
460+
// Perform the analysis asynchronously.
461+
go analyzer.analyse()
433462
}
434463
}
435464

436-
func newPartitionAnalyzer(maxSampleSize int) *partitionAnalyzer {
437-
return &partitionAnalyzer{maxSampleSize: maxSampleSize}
465+
func newPartitionAnalyzer(
466+
maxSampleSize int,
467+
hotspotPartitionMinScore float64,
468+
hotspotPartitionMinQPS float64,
469+
appID int32,
470+
partitionCount int32,
471+
) *partitionAnalyzer {
472+
return &partitionAnalyzer{
473+
maxSampleSize: maxSampleSize,
474+
hotspotPartitionMinScore: hotspotPartitionMinScore,
475+
hotspotPartitionMinQPS: hotspotPartitionMinQPS,
476+
appID: appID,
477+
partitionCount: partitionCount,
478+
}
438479
}
439480

440481
// partitionAnalyzer holds the samples for all partitions of a table and analyses hotspot
441482
// partitions based on them.
442483
type partitionAnalyzer struct {
443484
// TODO(wangdan): bump gammazero/deque to the lastest version after upgrading Go to 1.23+,
444485
// since older Go versions do not support the `Deque.Iter()` iterator interface.
445-
maxSampleSize int
446-
samples deque.Deque[[]hotspotPartitionStats] // Each element is a sample of all partitions of the table
486+
maxSampleSize int
487+
hotspotPartitionMinScore float64
488+
hotspotPartitionMinQPS float64
489+
appID int32
490+
partitionCount int32
491+
mtx sync.RWMutex
492+
samples deque.Deque[[]hotspotPartitionStats] // Each element is a sample of all partitions of the table
447493
}
448494

449-
func (a *partitionAnalyzer) addSample(sample []hotspotPartitionStats) {
495+
func (a *partitionAnalyzer) add(sample []hotspotPartitionStats) {
496+
a.mtx.Lock()
497+
defer a.mtx.Unlock()
498+
450499
for a.samples.Len() >= a.maxSampleSize {
451500
a.samples.PopFront()
452501
}
453502

454503
a.samples.PushBack(sample)
504+
log.Debugf("appID=%d, partitionCount=%d, samples=%v", a.appID, a.partitionCount, a.samples)
505+
}
506+
507+
func (a *partitionAnalyzer) analyse() {
508+
a.mtx.RLock()
509+
defer a.mtx.RUnlock()
510+
511+
a.analyseHotspots(readHotspotData)
512+
a.analyseHotspots(writeHotspotData)
513+
}
514+
515+
func (a *partitionAnalyzer) analyseHotspots(operationType int) {
516+
sample, scores := a.calculateScores(operationType)
517+
if len(scores) == 0 {
518+
return
519+
}
520+
521+
hotspotCount := a.countHotspots(operationType, sample, scores)
522+
523+
// TODO(wangdan): export the hotspot-related metrics for collection by monitoring
524+
// systems such as Prometheus.
525+
log.Infof("appID=%d, partitionCount=%d, operationType=%d, hotspotPartitions=%d, scores=%v",
526+
a.appID, a.partitionCount, operationType, hotspotCount, scores)
527+
}
528+
529+
// Calculates [Z-score](https://en.wikipedia.org/wiki/Standard_score) for each partition by
530+
// comparing historical data vertically and concurrent data horizontally to describe the
531+
// hotspots.
532+
func (a *partitionAnalyzer) calculateScores(
533+
operationType int,
534+
) (
535+
[]hotspotPartitionStats,
536+
[]float64,
537+
) {
538+
var count int
539+
var partitionQPSSum float64
540+
// TODO(wangdan): use `range a.samples.Iter()` instead for Go 1.23+.
541+
for i, n := 0, a.samples.Len(); i < n; i++ {
542+
sample := a.samples.At(i)
543+
count += len(sample)
544+
for _, stats := range sample {
545+
partitionQPSSum += stats.totalQPS[operationType]
546+
}
547+
}
548+
549+
if count <= 1 {
550+
log.Infof("sample size(%d) <= 1, not enough data for calculation", count)
551+
return nil, nil
552+
}
553+
554+
partitionQPSAvg := partitionQPSSum / float64(count)
555+
556+
var standardDeviation float64
557+
// TODO(wangdan): use `range a.samples.Iter()` instead for Go 1.23+.
558+
for i, n := 0, a.samples.Len(); i < n; i++ {
559+
for _, stats := range a.samples.At(i) {
560+
deviation := stats.totalQPS[operationType] - partitionQPSAvg
561+
standardDeviation += deviation * deviation
562+
}
563+
}
564+
565+
standardDeviation = math.Sqrt(standardDeviation / float64(count-1))
566+
567+
sample := a.samples.Back()
568+
scores := make([]float64, 0, len(sample))
569+
for i := 0; i < len(sample); i++ {
570+
if standardDeviation == 0 {
571+
scores = append(scores, 0)
572+
continue
573+
}
574+
575+
score := (sample[i].totalQPS[operationType] - partitionQPSAvg) / standardDeviation
576+
scores = append(scores, score)
577+
}
578+
579+
return sample, scores
580+
}
581+
582+
func (a *partitionAnalyzer) countHotspots(
583+
operationType int,
584+
sample []hotspotPartitionStats,
585+
scores []float64,
586+
) (hotspotCount int) {
587+
for i, score := range scores {
588+
if score < a.hotspotPartitionMinScore {
589+
continue
590+
}
591+
592+
if sample[i].totalQPS[operationType] < a.hotspotPartitionMinQPS {
593+
continue
594+
}
595+
596+
hotspotCount++
597+
}
598+
599+
return
455600
}

0 commit comments

Comments
 (0)