Skip to content

Commit 85f6564

Browse files
authored
metricreader: fix that sometimes the metric history is not purged (#639)
1 parent 3a49e3c commit 85f6564

File tree

2 files changed

+70
-50
lines changed

2 files changed

+70
-50
lines changed

pkg/balance/metricsreader/backend_reader.go

Lines changed: 46 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ const (
4949
)
5050

5151
var (
52-
errReadFromOwner = errors.New("read metrics from owner failed")
52+
errReadMetrics = errors.New("read backend metrics failed")
5353
)
5454

5555
type backendHistory struct {
@@ -85,17 +85,18 @@ type BackendReader struct {
8585
func NewBackendReader(lg *zap.Logger, cfgGetter config.ConfigGetter, httpCli *http.Client, etcdCli *clientv3.Client,
8686
backendFetcher TopologyFetcher, cfg *config.HealthCheck) *BackendReader {
8787
return &BackendReader{
88-
queryRules: make(map[string]QueryRule),
89-
queryResults: make(map[string]QueryResult),
90-
history: make(map[string]map[string]backendHistory),
91-
lg: lg,
92-
cfgGetter: cfgGetter,
93-
backendFetcher: backendFetcher,
94-
cfg: cfg,
95-
wgp: waitgroup.NewWaitGroupPool(goPoolSize, goMaxIdle),
96-
electionCfg: elect.DefaultElectionConfig(sessionTTL),
97-
etcdCli: etcdCli,
98-
httpCli: httpCli,
88+
queryRules: make(map[string]QueryRule),
89+
queryResults: make(map[string]QueryResult),
90+
history: make(map[string]map[string]backendHistory),
91+
lg: lg,
92+
cfgGetter: cfgGetter,
93+
backendFetcher: backendFetcher,
94+
cfg: cfg,
95+
wgp: waitgroup.NewWaitGroupPool(goPoolSize, goMaxIdle),
96+
electionCfg: elect.DefaultElectionConfig(sessionTTL),
97+
etcdCli: etcdCli,
98+
httpCli: httpCli,
99+
marshalledHistory: []byte{},
99100
}
100101
}
101102

@@ -167,30 +168,39 @@ func (br *BackendReader) ReadMetrics(ctx context.Context) error {
167168
if err != nil {
168169
return err
169170
}
171+
172+
// If self is a owner, read the backends that are not read by any other owners.
170173
var errs []error
171-
for _, owner := range owners {
172-
if owner == br.election.ID() {
173-
continue
174+
var backendLabels []string
175+
if br.isOwner.Load() {
176+
if idx := slices.Index(zones, zone); idx >= 0 {
177+
zones = slices.Delete(zones, idx, idx+1)
174178
}
175-
if err = br.readFromOwner(ctx, owner); err != nil {
179+
backendLabels, err = br.readFromBackends(ctx, zones)
180+
if err != nil {
176181
errs = append(errs, err)
177182
}
178183
}
179184

180-
// If self is a owner, read the backends that are not read by any other owners.
181-
if br.isOwner.Load() {
182-
if idx := slices.Index(zones, zone); idx >= 0 {
183-
zones = slices.Delete(zones, idx, idx+1)
185+
for _, owner := range owners {
186+
if owner == br.election.ID() {
187+
continue
184188
}
185-
if err := br.readFromBackends(ctx, zones); err != nil {
186-
return err
189+
if err = br.readFromOwner(ctx, owner); err != nil {
190+
errs = append(errs, err)
187191
}
188192
}
189193

190194
// Purge expired history.
191195
br.purgeHistory()
196+
// Marshal backend history for other members to query.
197+
if err := br.marshalHistory(backendLabels); err != nil {
198+
br.lg.Error("marshal backend history failed", zap.Any("addrs", backendLabels), zap.Error(err))
199+
}
200+
// Generate query result for all backends.
201+
br.history2QueryResult()
192202
if len(errs) > 0 {
193-
return errors.Collect(errReadFromOwner, errs...)
203+
return errors.Collect(errReadMetrics, errs...)
194204
}
195205
return nil
196206
}
@@ -259,17 +269,17 @@ func (br *BackendReader) queryAllOwners(ctx context.Context) (zones, owners []st
259269
// 1. In k8s, the zone is not set at startup and then is set by HTTP API, so there may temporarily exist both global and zonal owners.
260270
// 2. Some backends may not be in the same zone with any owner. E.g. there are only 2 TiProxy in a 3-AZ cluster.
261271
// In any way, the owner queries the backends that are not queried by other owners.
262-
func (br *BackendReader) readFromBackends(ctx context.Context, excludeZones []string) error {
272+
func (br *BackendReader) readFromBackends(ctx context.Context, excludeZones []string) ([]string, error) {
263273
addrs, err := br.getBackendAddrs(ctx, excludeZones)
264274
if err != nil {
265-
return err
275+
return nil, err
266276
}
267277
if len(addrs) == 0 {
268-
return nil
278+
return nil, nil
269279
}
270280
allNames := br.collectAllNames()
271281
if len(allNames) == 0 {
272-
return nil
282+
return nil, nil
273283
}
274284

275285
backendLabels := make([]string, 0, len(addrs))
@@ -298,12 +308,7 @@ func (br *BackendReader) readFromBackends(ctx context.Context, excludeZones []st
298308
}(addrs[i], backendLabels[i])
299309
}
300310
br.wgp.Wait()
301-
302-
br.history2QueryResult()
303-
if err := br.marshalHistory(backendLabels); err != nil {
304-
br.lg.Error("marshal backend history failed", zap.Any("addrs", backendLabels), zap.Error(err))
305-
}
306-
return nil
311+
return backendLabels, nil
307312
}
308313

309314
func (br *BackendReader) collectAllNames() []string {
@@ -473,9 +478,6 @@ func (br *BackendReader) readFromOwner(ctx context.Context, ownerAddr string) er
473478

474479
// If this instance becomes the owner in the next round, it can reuse the history.
475480
br.mergeHistory(newHistory)
476-
477-
// Generate query result for all backends.
478-
br.history2QueryResult()
479481
return nil
480482
}
481483

@@ -506,8 +508,6 @@ func (br *BackendReader) mergeHistory(newHistory map[string]map[string]backendHi
506508
ruleHistory[backend] = backendHistory
507509
}
508510
}
509-
// avoid that the stale history is returned to other members when it just becomes the owner
510-
br.marshalledHistory = nil
511511
}
512512

513513
// marshalHistory marshals the backends that are read by this owner. The marshaled data will be returned to other members.
@@ -516,12 +516,14 @@ func (br *BackendReader) marshalHistory(backends []string) error {
516516
defer br.Unlock()
517517

518518
filteredHistory := make(map[string]map[string]backendHistory, len(br.queryRules))
519-
for ruleKey, ruleHistory := range br.history {
520-
filteredRuleHistory := make(map[string]backendHistory, len(backends))
521-
filteredHistory[ruleKey] = filteredRuleHistory
522-
for backend, backendHistory := range ruleHistory {
523-
if slices.Contains(backends, backend) {
524-
filteredRuleHistory[backend] = backendHistory
519+
if len(backends) > 0 {
520+
for ruleKey, ruleHistory := range br.history {
521+
filteredRuleHistory := make(map[string]backendHistory, len(backends))
522+
filteredHistory[ruleKey] = filteredRuleHistory
523+
for backend, backendHistory := range ruleHistory {
524+
if slices.Contains(backends, backend) {
525+
filteredRuleHistory[backend] = backendHistory
526+
}
525527
}
526528
}
527529
}

pkg/balance/metricsreader/backend_reader_test.go

Lines changed: 24 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -886,16 +886,19 @@ func TestQueryBackendConcurrently(t *testing.T) {
886886
addRule(i)
887887
}
888888

889-
// start a goroutine to query metrics
889+
// start a goroutine to query metrics from backends
890890
var wg waitgroup.WaitGroup
891891
childCtx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
892892
// fill the initial query result to ensure the result is always non-empty
893-
err := br.readFromBackends(childCtx, nil)
893+
backends, err := br.readFromBackends(childCtx, nil)
894894
require.NoError(t, err)
895+
require.Len(t, backends, initialBackends)
896+
br.history2QueryResult()
895897
wg.Run(func() {
896898
for childCtx.Err() == nil {
897-
err := br.readFromBackends(childCtx, nil)
899+
backends, err := br.readFromBackends(childCtx, nil)
898900
require.NoError(t, err)
901+
require.Len(t, backends, initialBackends)
899902
br.purgeHistory()
900903
}
901904
})
@@ -951,7 +954,7 @@ func TestQueryBackendConcurrently(t *testing.T) {
951954
}
952955
})
953956

954-
// start a goroutine to marshal history
957+
// start a goroutine to query marshalled history
955958
wg.Run(func() {
956959
for childCtx.Err() == nil {
957960
select {
@@ -1223,7 +1226,7 @@ func TestElection(t *testing.T) {
12231226
},
12241227
},
12251228
}
1226-
hitoryText, err := json.Marshal(history)
1229+
historyText, err := json.Marshal(history)
12271230
require.NoError(t, err)
12281231

12291232
// setup rule
@@ -1244,7 +1247,7 @@ func TestElection(t *testing.T) {
12441247
ownerPort := ownerHttpHandler.Start()
12451248
addr := net.JoinHostPort("127.0.0.1", strconv.Itoa(ownerPort))
12461249
ownerFunc := func(_ string) string {
1247-
return string(hitoryText)
1250+
return string(historyText)
12481251
}
12491252
ownerHttpHandler.getRespBody.Store(&ownerFunc)
12501253
t.Cleanup(ownerHttpHandler.Close)
@@ -1276,11 +1279,20 @@ func TestElection(t *testing.T) {
12761279
ts := time.Now()
12771280
err = br.ReadMetrics(context.Background())
12781281
require.NoError(t, err)
1282+
// check that the query result is updated
12791283
qr := br.GetQueryResult("rule_id1")
12801284
require.False(t, qr.Empty())
12811285
require.Equal(t, model.SampleValue(100.0), qr.Value.(model.Vector)[0].Value)
12821286
require.GreaterOrEqual(t, qr.UpdateTime, ts)
12831287
ts = qr.UpdateTime
1288+
// check that the marshalled history is empty
1289+
var unmarshalled map[string]map[string]backendHistory
1290+
marshalledHistory := br.GetBackendMetrics()
1291+
if len(marshalledHistory) > 0 {
1292+
err = json.Unmarshal(marshalledHistory, &unmarshalled)
1293+
require.NoError(t, err)
1294+
require.Len(t, unmarshalled, 0)
1295+
}
12841296

12851297
// test owner
12861298
suite.delKV(ownerKey)
@@ -1290,10 +1302,16 @@ func TestElection(t *testing.T) {
12901302
}, 3*time.Second, 10*time.Millisecond)
12911303
err = br.ReadMetrics(context.Background())
12921304
require.NoError(t, err)
1305+
// check that the query result is updated
12931306
qr = br.GetQueryResult("rule_id1")
12941307
require.False(t, qr.Empty())
12951308
require.Equal(t, model.SampleValue(80.0), qr.Value.(model.Vector)[0].Value)
12961309
require.GreaterOrEqual(t, qr.UpdateTime, ts)
1310+
// check that the marshalled history is updated
1311+
marshalledHistory = br.GetBackendMetrics()
1312+
err = json.Unmarshal(marshalledHistory, &unmarshalled)
1313+
require.NoError(t, err)
1314+
require.Equal(t, br.history, unmarshalled)
12971315
}
12981316

12991317
func setupTypicalBackendListener(t *testing.T, respBody string) (backendPort int, infos map[string]*infosync.TiDBTopologyInfo) {

0 commit comments

Comments
 (0)