Skip to content

Commit d18577f

Browse files
authored
balance, infosync: support reading metrics from both prometheus and backend (#606)
1 parent e6b76e5 commit d18577f

22 files changed

+898
-538
lines changed

lib/config/health.go

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,64 @@
33

44
package config
55

6+
import "time"
7+
68
type HealthInfo struct {
79
ConfigChecksum uint32 `json:"config_checksum"`
810
}
11+
12+
const (
13+
healthCheckInterval = 3 * time.Second
14+
healthCheckMaxRetries = 3
15+
healthCheckRetryInterval = 1 * time.Second
16+
healthCheckTimeout = 2 * time.Second
17+
readMetricsInterval = 5 * time.Second
18+
readMetricsTimeout = 3 * time.Second
19+
)
20+
21+
// HealthCheck contains some configurations for health check.
22+
// Some general configurations of them may be exposed to users in the future.
23+
// We can use shorter durations to speed up unit tests.
24+
type HealthCheck struct {
25+
Enable bool `yaml:"enable" json:"enable" toml:"enable"`
26+
Interval time.Duration `yaml:"interval" json:"interval" toml:"interval"`
27+
MaxRetries int `yaml:"max-retries" json:"max-retries" toml:"max-retries"`
28+
RetryInterval time.Duration `yaml:"retry-interval" json:"retry-interval" toml:"retry-interval"`
29+
DialTimeout time.Duration `yaml:"dial-timeout" json:"dial-timeout" toml:"dial-timeout"`
30+
MetricsInterval time.Duration `yaml:"metrics-interval" json:"metrics-interval" toml:"metrics-interval"`
31+
MetricsTimeout time.Duration `yaml:"metrics-timeout" json:"metrics-timeout" toml:"metrics-timeout"`
32+
}
33+
34+
// NewDefaultHealthCheckConfig creates a default HealthCheck.
35+
func NewDefaultHealthCheckConfig() *HealthCheck {
36+
return &HealthCheck{
37+
Enable: true,
38+
Interval: healthCheckInterval,
39+
MaxRetries: healthCheckMaxRetries,
40+
RetryInterval: healthCheckRetryInterval,
41+
DialTimeout: healthCheckTimeout,
42+
MetricsInterval: readMetricsInterval,
43+
MetricsTimeout: readMetricsTimeout,
44+
}
45+
}
46+
47+
func (hc *HealthCheck) Check() {
48+
if hc.Interval == 0 {
49+
hc.Interval = healthCheckInterval
50+
}
51+
if hc.MaxRetries == 0 {
52+
hc.MaxRetries = healthCheckMaxRetries
53+
}
54+
if hc.RetryInterval == 0 {
55+
hc.RetryInterval = healthCheckRetryInterval
56+
}
57+
if hc.DialTimeout == 0 {
58+
hc.DialTimeout = healthCheckTimeout
59+
}
60+
if hc.MetricsInterval == 0 {
61+
hc.MetricsInterval = readMetricsInterval
62+
}
63+
if hc.MetricsTimeout == 0 {
64+
hc.MetricsTimeout = readMetricsTimeout
65+
}
66+
}

lib/config/namespace.go

Lines changed: 0 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@ package config
55

66
import (
77
"bytes"
8-
"time"
98

109
"github.com/BurntSushi/toml"
1110
)
@@ -24,63 +23,6 @@ type FrontendNamespace struct {
2423
type BackendNamespace struct {
2524
Instances []string `yaml:"instances" json:"instances" toml:"instances"`
2625
Security TLSConfig `yaml:"security" json:"security" toml:"security"`
27-
//HealthCheck HealthCheck `yaml:"health-check" json:"health-check" toml:"health-check"`
28-
}
29-
30-
const (
31-
healthCheckInterval = 3 * time.Second
32-
healthCheckMaxRetries = 3
33-
healthCheckRetryInterval = 1 * time.Second
34-
healthCheckTimeout = 2 * time.Second
35-
readMetricsInterval = 5 * time.Second
36-
readMetricsTimeout = 3 * time.Second
37-
)
38-
39-
// HealthCheck contains some configurations for health check.
40-
// Some general configurations of them may be exposed to users in the future.
41-
// We can use shorter durations to speed up unit tests.
42-
type HealthCheck struct {
43-
Enable bool `yaml:"enable" json:"enable" toml:"enable"`
44-
Interval time.Duration `yaml:"interval" json:"interval" toml:"interval"`
45-
MaxRetries int `yaml:"max-retries" json:"max-retries" toml:"max-retries"`
46-
RetryInterval time.Duration `yaml:"retry-interval" json:"retry-interval" toml:"retry-interval"`
47-
DialTimeout time.Duration `yaml:"dial-timeout" json:"dial-timeout" toml:"dial-timeout"`
48-
MetricsInterval time.Duration `yaml:"metrics-interval" json:"metrics-interval" toml:"metrics-interval"`
49-
MetricsTimeout time.Duration `yaml:"metrics-timeout" json:"metrics-timeout" toml:"metrics-timeout"`
50-
}
51-
52-
// NewDefaultHealthCheckConfig creates a default HealthCheck.
53-
func NewDefaultHealthCheckConfig() *HealthCheck {
54-
return &HealthCheck{
55-
Enable: true,
56-
Interval: healthCheckInterval,
57-
MaxRetries: healthCheckMaxRetries,
58-
RetryInterval: healthCheckRetryInterval,
59-
DialTimeout: healthCheckTimeout,
60-
MetricsInterval: readMetricsInterval,
61-
MetricsTimeout: readMetricsTimeout,
62-
}
63-
}
64-
65-
func (hc *HealthCheck) Check() {
66-
if hc.Interval == 0 {
67-
hc.Interval = healthCheckInterval
68-
}
69-
if hc.MaxRetries == 0 {
70-
hc.MaxRetries = healthCheckMaxRetries
71-
}
72-
if hc.RetryInterval == 0 {
73-
hc.RetryInterval = healthCheckRetryInterval
74-
}
75-
if hc.DialTimeout == 0 {
76-
hc.DialTimeout = healthCheckTimeout
77-
}
78-
if hc.MetricsInterval == 0 {
79-
hc.MetricsInterval = readMetricsInterval
80-
}
81-
if hc.MetricsTimeout == 0 {
82-
hc.MetricsTimeout = readMetricsTimeout
83-
}
8426
}
8527

8628
func NewNamespace(data []byte) (*Namespace, error) {

pkg/balance/factor/factor_cpu.go

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,10 @@ var (
5656
if timeDiff < 1e-4 {
5757
return model.SampleValue(math.NaN())
5858
}
59+
// Maybe the backend just rebooted.
60+
if pair1.Value > pair2.Value {
61+
return model.SampleValue(math.NaN())
62+
}
5963
return (pair2.Value - pair1.Value) / model.SampleValue(timeDiff)
6064
},
6165
ResultType: model.ValMatrix,
@@ -80,17 +84,17 @@ type FactorCPU struct {
8084
// The estimated average CPU usage used by one connection.
8185
usagePerConn float64
8286
mr metricsreader.MetricsReader
83-
queryID uint64
8487
bitNum int
8588
}
8689

8790
func NewFactorCPU(mr metricsreader.MetricsReader) *FactorCPU {
88-
return &FactorCPU{
91+
fc := &FactorCPU{
8992
mr: mr,
90-
queryID: mr.AddQueryExpr(cpuQueryExpr),
9193
bitNum: 5,
9294
snapshot: make(map[string]cpuBackendSnapshot),
9395
}
96+
mr.AddQueryExpr(fc.Name(), cpuQueryExpr, cpuQueryRule)
97+
return fc
9498
}
9599

96100
func (fc *FactorCPU) Name() string {
@@ -101,8 +105,8 @@ func (fc *FactorCPU) UpdateScore(backends []scoredBackend) {
101105
if len(backends) <= 1 {
102106
return
103107
}
104-
qr := fc.mr.GetQueryResult(fc.queryID)
105-
if qr.Err != nil || qr.Empty() {
108+
qr := fc.mr.GetQueryResult(fc.Name())
109+
if qr.Empty() {
106110
return
107111
}
108112

@@ -250,5 +254,5 @@ func (fc *FactorCPU) SetConfig(cfg *config.Config) {
250254
}
251255

252256
func (fc *FactorCPU) Close() {
253-
fc.mr.RemoveQueryExpr(fc.queryID)
257+
fc.mr.RemoveQueryExpr(fc.Name())
254258
}

pkg/balance/factor/factor_cpu_test.go

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -83,8 +83,8 @@ func TestCPUBalanceOnce(t *testing.T) {
8383
values = append(values, createSampleStream(test.cpus[j], j, model.Now()))
8484
}
8585
mmr := &mockMetricsReader{
86-
qrs: map[uint64]metricsreader.QueryResult{
87-
1: {
86+
qrs: map[string]metricsreader.QueryResult{
87+
"cpu": {
8888
UpdateTime: monotime.Now(),
8989
Value: model.Matrix(values),
9090
},
@@ -197,7 +197,7 @@ func TestCPUBalanceContinuously(t *testing.T) {
197197
values = append(values, createSampleStream(test.cpus[j], j, curTime))
198198
}
199199
curTime = curTime.Add(time.Millisecond)
200-
mmr.qrs[1] = metricsreader.QueryResult{
200+
mmr.qrs["cpu"] = metricsreader.QueryResult{
201201
UpdateTime: monotime.Now(),
202202
Value: model.Matrix(values),
203203
}
@@ -251,7 +251,7 @@ func TestNoCPUMetric(t *testing.T) {
251251
ss := createSampleStream(test.cpus[j], j, model.Time(test.updateTime/monotime.Time(time.Millisecond)))
252252
values = append(values, ss)
253253
}
254-
mmr.qrs[1] = metricsreader.QueryResult{
254+
mmr.qrs["cpu"] = metricsreader.QueryResult{
255255
UpdateTime: test.updateTime,
256256
Value: model.Matrix(values),
257257
}
@@ -290,7 +290,7 @@ func TestCPUResultNotUpdated(t *testing.T) {
290290
for i, test := range tests {
291291
array := []float64{test.cpu}
292292
values := []*model.SampleStream{createSampleStream(array, 0, test.updateTime), createSampleStream(array, 1, test.updateTime)}
293-
mmr.qrs[1] = metricsreader.QueryResult{
293+
mmr.qrs["cpu"] = metricsreader.QueryResult{
294294
UpdateTime: monotime.Now(),
295295
Value: model.Matrix(values),
296296
}
@@ -330,6 +330,14 @@ tidb_server_maxprocs 2
330330
curValue: 6,
331331
finalValue: 1,
332332
},
333+
{
334+
text: `process_cpu_seconds_total 2
335+
tidb_server_maxprocs 2
336+
`,
337+
timestamp: model.Time(3000),
338+
curValue: 1,
339+
finalValue: model.SampleValue(math.NaN()),
340+
},
333341
}
334342

335343
historyPair := make([]model.SamplePair, 0)

pkg/balance/factor/factor_health.go

Lines changed: 23 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ const (
3434
type errDefinition struct {
3535
queryRule metricsreader.QueryRule
3636
promQL string
37+
key string
3738
failThreshold int
3839
recoverThreshold int
3940
}
@@ -65,6 +66,7 @@ var (
6566
promQL: `sum(increase(tidb_tikvclient_backoff_seconds_count{type="pdRPC"}[2m])) by (instance)`,
6667
failThreshold: 50,
6768
recoverThreshold: 10,
69+
key: "health_pd",
6870
queryRule: metricsreader.QueryRule{
6971
Names: []string{"tidb_tikvclient_backoff_seconds_count"},
7072
Retention: 2 * time.Minute,
@@ -87,7 +89,12 @@ var (
8789
if len(pairs) < 2 {
8890
return model.SampleValue(math.NaN())
8991
}
90-
return pairs[len(pairs)-1].Value - pairs[0].Value
92+
diff := pairs[len(pairs)-1].Value - pairs[0].Value
93+
// Maybe the backend just rebooted.
94+
if diff < 0 {
95+
return model.SampleValue(math.NaN())
96+
}
97+
return diff
9198
},
9299
ResultType: model.ValVector,
93100
},
@@ -99,6 +106,7 @@ var (
99106
promQL: `sum(increase(tidb_tikvclient_backoff_seconds_count{type=~"regionMiss|tikvRPC"}[2m])) by (instance)`,
100107
failThreshold: 1000,
101108
recoverThreshold: 100,
109+
key: "health_tikv",
102110
queryRule: metricsreader.QueryRule{
103111
Names: []string{"tidb_tikvclient_backoff_seconds_count"},
104112
Retention: 2 * time.Minute,
@@ -121,7 +129,12 @@ var (
121129
if len(pairs) < 2 {
122130
return model.SampleValue(math.NaN())
123131
}
124-
return pairs[len(pairs)-1].Value - pairs[0].Value
132+
diff := pairs[len(pairs)-1].Value - pairs[0].Value
133+
// Maybe the backend just rebooted.
134+
if diff < 0 {
135+
return model.SampleValue(math.NaN())
136+
}
137+
return diff
125138
},
126139
ResultType: model.ValVector,
127140
},
@@ -141,8 +154,9 @@ type healthBackendSnapshot struct {
141154

142155
type errIndicator struct {
143156
queryExpr metricsreader.QueryExpr
157+
queryRule metricsreader.QueryRule
144158
queryResult metricsreader.QueryResult
145-
queryID uint64
159+
key string
146160
failThreshold int
147161
recoverThreshold int
148162
}
@@ -170,10 +184,12 @@ func initErrIndicator(mr metricsreader.MetricsReader) []errIndicator {
170184
queryExpr: metricsreader.QueryExpr{
171185
PromQL: def.promQL,
172186
},
187+
queryRule: def.queryRule,
188+
key: def.key,
173189
failThreshold: def.failThreshold,
174190
recoverThreshold: def.recoverThreshold,
175191
}
176-
indicator.queryID = mr.AddQueryExpr(indicator.queryExpr)
192+
mr.AddQueryExpr(indicator.key, indicator.queryExpr, indicator.queryRule)
177193
indicators = append(indicators, indicator)
178194
}
179195
return indicators
@@ -189,8 +205,8 @@ func (fh *FactorHealth) UpdateScore(backends []scoredBackend) {
189205
}
190206
needUpdateSnapshot, latestTime := false, monotime.Time(0)
191207
for i := 0; i < len(fh.indicators); i++ {
192-
qr := fh.mr.GetQueryResult(fh.indicators[i].queryID)
193-
if qr.Err != nil || qr.Empty() {
208+
qr := fh.mr.GetQueryResult(fh.indicators[i].key)
209+
if qr.Empty() {
194210
continue
195211
}
196212
if fh.indicators[i].queryResult.UpdateTime != qr.UpdateTime {
@@ -313,6 +329,6 @@ func (fh *FactorHealth) SetConfig(cfg *config.Config) {
313329

314330
func (fh *FactorHealth) Close() {
315331
for _, indicator := range fh.indicators {
316-
fh.mr.RemoveQueryExpr(indicator.queryID)
332+
fh.mr.RemoveQueryExpr(indicator.key)
317333
}
318334
}

0 commit comments

Comments
 (0)