Skip to content

Commit 3e160fc

Browse files
authored
metricsreader, api: read backend metrics from owner (#601)
1 parent 6ef55c7 commit 3e160fc

File tree

11 files changed

+506
-215
lines changed

11 files changed

+506
-215
lines changed

pkg/balance/metricsreader/backend_reader.go

Lines changed: 95 additions & 89 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,7 @@ package metricsreader
55

66
import (
77
"context"
8-
"fmt"
9-
"io"
8+
"encoding/json"
109
"math"
1110
"net"
1211
"net/http"
@@ -20,10 +19,9 @@ import (
2019

2120
"github.com/cenkalti/backoff/v4"
2221
"github.com/pingcap/tiproxy/lib/config"
23-
"github.com/pingcap/tiproxy/lib/util/errors"
2422
"github.com/pingcap/tiproxy/lib/util/waitgroup"
2523
"github.com/pingcap/tiproxy/pkg/manager/elect"
26-
pnet "github.com/pingcap/tiproxy/pkg/proxy/net"
24+
"github.com/pingcap/tiproxy/pkg/util/httputil"
2725
dto "github.com/prometheus/client_model/go"
2826
"github.com/prometheus/common/expfmt"
2927
"github.com/prometheus/common/model"
@@ -39,37 +37,43 @@ const (
3937
sessionTTL = 30
4038
// backendMetricPath is the path of backend HTTP API to read metrics.
4139
backendMetricPath = "/metrics"
42-
goPoolSize = 100
43-
goMaxIdle = time.Minute
40+
// ownerMetricPath is the path of reading backend metrics from the backend reader owner.
41+
ownerMetricPath = "/api/backend/metrics"
42+
goPoolSize = 100
43+
goMaxIdle = time.Minute
4444
)
4545

4646
type backendHistory struct {
47-
step1History []model.SamplePair
48-
step2History []model.SamplePair
47+
Step1History []model.SamplePair
48+
Step2History []model.SamplePair
4949
}
5050

5151
type BackendReader struct {
5252
sync.Mutex
53-
queryRules map[uint64]QueryRule
54-
queryResults map[uint64]QueryResult
55-
// rule id: {backend name: backendHistory}
56-
history map[uint64]map[string]backendHistory
57-
election elect.Election
58-
cfgGetter config.ConfigGetter
59-
backendFetcher TopologyFetcher
60-
httpCli *http.Client
61-
lg *zap.Logger
62-
cfg *config.HealthCheck
63-
wgp *waitgroup.WaitGroupPool
64-
isOwner atomic.Bool
53+
queryRules map[string]QueryRule
54+
queryResults map[string]QueryResult
55+
// the owner generates the history from querying backends and other members query the history from the owner
56+
// rule key: {backend name: backendHistory}
57+
history map[string]map[string]backendHistory
58+
// the owner marshalles history to share it to other members
59+
// cache the marshalled history to avoid duplicated marshalling
60+
marshalledHistory []byte
61+
election elect.Election
62+
cfgGetter config.ConfigGetter
63+
backendFetcher TopologyFetcher
64+
httpCli *http.Client
65+
lg *zap.Logger
66+
cfg *config.HealthCheck
67+
wgp *waitgroup.WaitGroupPool
68+
isOwner atomic.Bool
6569
}
6670

6771
func NewBackendReader(lg *zap.Logger, cfgGetter config.ConfigGetter, httpCli *http.Client, backendFetcher TopologyFetcher,
6872
cfg *config.HealthCheck) *BackendReader {
6973
return &BackendReader{
70-
queryRules: make(map[uint64]QueryRule),
71-
queryResults: make(map[uint64]QueryResult),
72-
history: make(map[uint64]map[string]backendHistory),
74+
queryRules: make(map[string]QueryRule),
75+
queryResults: make(map[string]QueryResult),
76+
history: make(map[string]map[string]backendHistory),
7377
lg: lg,
7478
cfgGetter: cfgGetter,
7579
backendFetcher: backendFetcher,
@@ -103,26 +107,26 @@ func (br *BackendReader) OnRetired() {
103107
br.isOwner.Store(false)
104108
}
105109

106-
func (br *BackendReader) AddQueryRule(id uint64, rule QueryRule) {
110+
func (br *BackendReader) AddQueryRule(key string, rule QueryRule) {
107111
br.Lock()
108112
defer br.Unlock()
109-
br.queryRules[id] = rule
113+
br.queryRules[key] = rule
110114
}
111115

112-
func (br *BackendReader) RemoveQueryRule(id uint64) {
116+
func (br *BackendReader) RemoveQueryRule(key string) {
113117
br.Lock()
114118
defer br.Unlock()
115-
delete(br.queryRules, id)
119+
delete(br.queryRules, key)
116120
}
117121

118-
func (br *BackendReader) GetQueryResult(id uint64) QueryResult {
122+
func (br *BackendReader) GetQueryResult(key string) QueryResult {
119123
br.Lock()
120124
defer br.Unlock()
121125
// Return an empty QueryResult if it's not found.
122-
return br.queryResults[id]
126+
return br.queryResults[key]
123127
}
124128

125-
func (br *BackendReader) ReadMetrics(ctx context.Context) (map[uint64]QueryResult, error) {
129+
func (br *BackendReader) ReadMetrics(ctx context.Context) (map[string]QueryResult, error) {
126130
if br.isOwner.Load() {
127131
if err := br.readFromBackends(ctx); err != nil {
128132
return nil, err
@@ -137,7 +141,6 @@ func (br *BackendReader) ReadMetrics(ctx context.Context) (map[uint64]QueryResul
137141
return nil, err
138142
}
139143
}
140-
br.purgeHistory()
141144
return br.queryResults, nil
142145
}
143146

@@ -177,6 +180,11 @@ func (br *BackendReader) readFromBackends(ctx context.Context) error {
177180
}(addr)
178181
}
179182
br.wgp.Wait()
183+
184+
br.purgeHistory()
185+
br.Lock()
186+
br.marshalledHistory = nil
187+
br.Unlock()
180188
return nil
181189
}
182190

@@ -195,55 +203,20 @@ func (br *BackendReader) collectAllNames() []string {
195203
}
196204

197205
func (br *BackendReader) readBackendMetric(ctx context.Context, addr string) ([]byte, error) {
198-
schema := "http"
199-
if v, ok := br.httpCli.Transport.(*http.Transport); ok && v != nil && v.TLSClientConfig != nil {
200-
schema = "https"
201-
}
202206
httpCli := *br.httpCli
203207
httpCli.Timeout = br.cfg.DialTimeout
204-
url := fmt.Sprintf("%s://%s%s", schema, addr, backendMetricPath)
205-
var body []byte
206-
err := br.connectWithRetry(ctx, func() error {
207-
resp, err := httpCli.Get(url)
208-
if err != nil {
209-
return err
210-
}
211-
defer func() {
212-
if ignoredErr := resp.Body.Close(); ignoredErr != nil {
213-
br.lg.Warn("close http response failed", zap.String("url", url), zap.Error(ignoredErr))
214-
}
215-
}()
216-
217-
if resp.StatusCode != http.StatusOK {
218-
return backoff.Permanent(errors.Errorf("http status %d", resp.StatusCode))
219-
}
220-
body, err = io.ReadAll(resp.Body)
221-
if err != nil {
222-
br.lg.Error("read response body failed", zap.String("url", url), zap.Error(err))
223-
}
224-
return err
225-
})
226-
return body, err
227-
}
228-
229-
func (br *BackendReader) connectWithRetry(ctx context.Context, connect func() error) error {
230-
err := backoff.Retry(func() error {
231-
err := connect()
232-
if !pnet.IsRetryableError(err) {
233-
return backoff.Permanent(err)
234-
}
235-
return err
236-
}, backoff.WithContext(backoff.WithMaxRetries(backoff.NewConstantBackOff(br.cfg.RetryInterval), uint64(br.cfg.MaxRetries)), ctx))
237-
return err
208+
b := backoff.WithContext(backoff.WithMaxRetries(backoff.NewConstantBackOff(br.cfg.RetryInterval), uint64(br.cfg.MaxRetries)), ctx)
209+
return httputil.Get(httpCli, addr, backendMetricPath, b)
238210
}
239211

240212
// groupMetricsByRule gets the result for each rule of one backend.
241-
func (br *BackendReader) groupMetricsByRule(mfs map[string]*dto.MetricFamily, backend string) map[uint64]model.Value {
213+
func (br *BackendReader) groupMetricsByRule(mfs map[string]*dto.MetricFamily, backend string) map[string]model.Value {
242214
now := model.TimeFromUnixNano(time.Now().UnixNano())
243215
br.Lock()
244216
defer br.Unlock()
245-
results := make(map[uint64]model.Value, len(br.queryRules))
246-
for id, rule := range br.queryRules {
217+
// rule key: backend value
218+
results := make(map[string]model.Value, len(br.queryRules))
219+
for ruleKey, rule := range br.queryRules {
247220
// If the metric doesn't exist, skip it.
248221
metricExists := true
249222
for _, name := range rule.Names {
@@ -263,21 +236,21 @@ func (br *BackendReader) groupMetricsByRule(mfs map[string]*dto.MetricFamily, ba
263236
continue
264237
}
265238
pair := model.SamplePair{Timestamp: now, Value: sampleValue}
266-
ruleHistory, ok := br.history[id]
239+
ruleHistory, ok := br.history[ruleKey]
267240
if !ok {
268241
ruleHistory = make(map[string]backendHistory)
269-
br.history[id] = ruleHistory
242+
br.history[ruleKey] = ruleHistory
270243
}
271244
beHistory := ruleHistory[backend]
272-
beHistory.step1History = append(beHistory.step1History, pair)
245+
beHistory.Step1History = append(beHistory.Step1History, pair)
273246

274247
// step 2: get the latest pair by the history and add it to step2History
275248
// E.g. calculate irate(process_cpu_seconds_total/tidb_server_maxprocs[30s])
276-
sampleValue = rule.Range2Value(beHistory.step1History)
249+
sampleValue = rule.Range2Value(beHistory.Step1History)
277250
if math.IsNaN(float64(sampleValue)) {
278251
continue
279252
}
280-
beHistory.step2History = append(beHistory.step2History, model.SamplePair{Timestamp: now, Value: sampleValue})
253+
beHistory.Step2History = append(beHistory.Step2History, model.SamplePair{Timestamp: now, Value: sampleValue})
281254
ruleHistory[backend] = beHistory
282255

283256
// step 3: return the result
@@ -286,13 +259,13 @@ func (br *BackendReader) groupMetricsByRule(mfs map[string]*dto.MetricFamily, ba
286259
switch rule.ResultType {
287260
case model.ValVector:
288261
// vector indicates returning the latest pair
289-
results[id] = model.Vector{{Value: sampleValue, Timestamp: now, Metric: labels}}
262+
results[ruleKey] = model.Vector{{Value: sampleValue, Timestamp: now, Metric: labels}}
290263
case model.ValMatrix:
291264
// matrix indicates returning the history
292265
// copy a slice to avoid data race
293-
pairs := make([]model.SamplePair, len(beHistory.step2History))
294-
copy(pairs, beHistory.step2History)
295-
results[id] = model.Matrix{{Values: pairs, Metric: labels}}
266+
pairs := make([]model.SamplePair, len(beHistory.Step2History))
267+
copy(pairs, beHistory.Step2History)
268+
results[ruleKey] = model.Matrix{{Values: pairs, Metric: labels}}
296269
default:
297270
br.lg.Error("unsupported value type", zap.String("value type", rule.ResultType.String()))
298271
}
@@ -301,14 +274,14 @@ func (br *BackendReader) groupMetricsByRule(mfs map[string]*dto.MetricFamily, ba
301274
}
302275

303276
// mergeQueryResult merges the result of one backend into the final result.
304-
func (br *BackendReader) mergeQueryResult(backendValues map[uint64]model.Value, backend string) {
277+
func (br *BackendReader) mergeQueryResult(backendValues map[string]model.Value, backend string) {
305278
br.Lock()
306279
defer br.Unlock()
307-
for id, value := range backendValues {
308-
result := br.queryResults[id]
280+
for ruleKey, value := range backendValues {
281+
result := br.queryResults[ruleKey]
309282
if result.Value == nil || reflect.ValueOf(result.Value).IsNil() {
310283
result.Value = value
311-
br.queryResults[id] = result
284+
br.queryResults[ruleKey] = result
312285
continue
313286
}
314287
switch result.Value.Type() {
@@ -341,7 +314,7 @@ func (br *BackendReader) mergeQueryResult(backendValues map[uint64]model.Value,
341314
default:
342315
br.lg.Error("unsupported value type", zap.Stringer("value type", result.Value.Type()))
343316
}
344-
br.queryResults[id] = result
317+
br.queryResults[ruleKey] = result
345318
}
346319
}
347320

@@ -358,10 +331,10 @@ func (br *BackendReader) purgeHistory() {
358331
continue
359332
}
360333
for backend, backendHistory := range ruleHistory {
361-
backendHistory.step1History = purgeHistory(backendHistory.step1History, rule.Retention, now)
362-
backendHistory.step2History = purgeHistory(backendHistory.step2History, rule.Retention, now)
334+
backendHistory.Step1History = purgeHistory(backendHistory.Step1History, rule.Retention, now)
335+
backendHistory.Step2History = purgeHistory(backendHistory.Step2History, rule.Retention, now)
363336
// the history is expired, maybe the backend is down
364-
if len(backendHistory.step1History) == 0 && len(backendHistory.step2History) == 0 {
337+
if len(backendHistory.Step1History) == 0 && len(backendHistory.Step2History) == 0 {
365338
delete(ruleHistory, backend)
366339
} else {
367340
ruleHistory[backend] = backendHistory
@@ -370,7 +343,40 @@ func (br *BackendReader) purgeHistory() {
370343
}
371344
}
372345

346+
func (br *BackendReader) GetBackendMetrics() ([]byte, error) {
347+
br.Lock()
348+
defer br.Unlock()
349+
if br.marshalledHistory != nil {
350+
return br.marshalledHistory, nil
351+
}
352+
marshalled, err := json.Marshal(br.history)
353+
if err != nil {
354+
return nil, err
355+
}
356+
br.marshalledHistory = marshalled
357+
return marshalled, nil
358+
}
359+
360+
// readFromOwner queries metric history from the owner.
361+
// If every member queries directly from backends, the backends may suffer from too much pressure.
373362
func (br *BackendReader) readFromOwner(ctx context.Context, addr string) error {
363+
httpCli := *br.httpCli
364+
httpCli.Timeout = br.cfg.DialTimeout
365+
b := backoff.WithContext(backoff.WithMaxRetries(backoff.NewConstantBackOff(br.cfg.RetryInterval), uint64(br.cfg.MaxRetries)), ctx)
366+
resp, err := httputil.Get(*br.httpCli, addr, ownerMetricPath, b)
367+
if err != nil {
368+
return err
369+
}
370+
371+
var newHistory map[string]map[string]backendHistory
372+
if err := json.Unmarshal(resp, &newHistory); err != nil {
373+
return err
374+
}
375+
// If this instance becomes the owner in the next round, it can reuse the history.
376+
br.Lock()
377+
br.history = newHistory
378+
br.marshalledHistory = nil
379+
br.Unlock()
374380
return nil
375381
}
376382

0 commit comments

Comments
 (0)