Skip to content

Commit 0c4c57f

Browse files
factor, router: routing fails when there's only one backend with a different label (#820) (#821)
Signed-off-by: ti-chi-bot <[email protected]> Co-authored-by: djshow832 <[email protected]>
1 parent d69a263 commit 0c4c57f

File tree

8 files changed

+180
-36
lines changed

8 files changed

+180
-36
lines changed

pkg/balance/factor/factor_balance.go

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -177,25 +177,29 @@ func (fbb *FactorBasedBalance) updateScore(backends []policy.BackendCtx) []score
177177

178178
// BackendToRoute returns one backend to route a new connection to.
179179
func (fbb *FactorBasedBalance) BackendToRoute(backends []policy.BackendCtx) policy.BackendCtx {
180+
fields := []zap.Field{zap.Int("backend_num", len(backends))}
181+
defer func() {
182+
fbb.lg.Debug("route", fields...)
183+
}()
184+
180185
if len(backends) == 0 {
181186
return nil
182187
}
183188

184189
fbb.Lock()
185190
defer fbb.Unlock()
186191
scoredBackends := fbb.updateScore(backends)
192+
for _, backend := range scoredBackends {
193+
fields = append(fields, zap.String(backend.Addr(), strconv.FormatUint(backend.scoreBits, 16)))
194+
}
187195
if !fbb.canBeRouted(scoredBackends[0].scoreBits) {
188196
return nil
189197
}
190198
if len(backends) == 1 {
199+
fields = append(fields, zap.String("target", backends[0].Addr()))
191200
return backends[0]
192201
}
193202

194-
var fields []zap.Field
195-
for _, backend := range scoredBackends {
196-
fields = append(fields, zap.String(backend.Addr(), strconv.FormatUint(backend.scoreBits, 16)))
197-
}
198-
199203
// Evict the backends that are so busy that it should migrate connections to another, and then randomly choose one.
200204
// Always choosing the idlest one works badly for short connections because even a little jitter may cause all the connections
201205
// in the next second route to the same backend.
@@ -234,7 +238,6 @@ func (fbb *FactorBasedBalance) BackendToRoute(backends []policy.BackendCtx) poli
234238
idx = idxes[int(time.Now().UnixMicro()%int64(len(idxes)))]
235239
}
236240
fields = append(fields, zap.String("target", scoredBackends[idx].Addr()), zap.Ints("rand", idxes))
237-
fbb.lg.Debug("route", fields...)
238241
return scoredBackends[idx].BackendCtx
239242
}
240243

pkg/balance/factor/factor_label.go

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,8 @@ func (fl *FactorLabel) Name() string {
3535
}
3636

3737
func (fl *FactorLabel) UpdateScore(backends []scoredBackend) {
38-
if len(fl.labelName) == 0 || len(fl.selfLabelVal) == 0 || len(backends) <= 1 {
38+
// The score will be used for CanBeRouted so don't skip updating even when only one backend.
39+
if len(fl.labelName) == 0 || len(fl.selfLabelVal) == 0 {
3940
return
4041
}
4142
for i := 0; i < len(backends); i++ {
@@ -53,7 +54,12 @@ func (fl *FactorLabel) ScoreBitNum() int {
5354
}
5455

5556
func (fl *FactorLabel) BalanceCount(from, to scoredBackend) (float64, []zap.Field) {
56-
return balanceCount4Label, nil
57+
fields := []zap.Field{
58+
zap.String("label_key", fl.labelName),
59+
zap.Any("from_labels", from.GetBackendInfo().Labels),
60+
zap.String("self_label_value", fl.selfLabelVal),
61+
}
62+
return balanceCount4Label, fields
5763
}
5864

5965
func (fl *FactorLabel) SetConfig(cfg *config.Config) {

pkg/balance/factor/factor_label_test.go

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -53,14 +53,10 @@ func TestFactorLabelOneBackend(t *testing.T) {
5353
backendCtx := &mockBackend{
5454
BackendInfo: observer.BackendInfo{Labels: backendLabels},
5555
}
56-
// Create 2 backends so that UpdateScore won't skip calculating scores.
5756
backends := []scoredBackend{
5857
{
5958
BackendCtx: backendCtx,
6059
},
61-
{
62-
BackendCtx: backendCtx,
63-
},
6460
}
6561
selfLabels := make(map[string]string)
6662
if test.labelName != "" && test.selfLabelVal != "" {
@@ -75,6 +71,7 @@ func TestFactorLabelOneBackend(t *testing.T) {
7571
factor.UpdateScore(backends)
7672
for _, backend := range backends {
7773
require.Equal(t, test.expectedScore, backend.score(), "test idx: %d", i)
74+
require.Equal(t, test.expectedScore == 0, factor.CanBeRouted(backend.score()), "test idx: %d", i)
7875
}
7976
}
8077
}
@@ -133,5 +130,6 @@ func TestFactorLabelMultiBackends(t *testing.T) {
133130
factor.UpdateScore(backends)
134131
for i, test := range tests {
135132
require.Equal(t, test.expectedScore, backends[i].score(), "test idx: %d", i)
133+
require.Equal(t, test.expectedScore == 0, factor.CanBeRouted(backends[i].score()), "test idx: %d", i)
136134
}
137135
}

pkg/balance/observer/backend_health.go

Lines changed: 25 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@ package observer
55

66
import (
77
"fmt"
8+
"maps"
9+
"strings"
810

911
"github.com/pingcap/tiproxy/lib/config"
1012
)
@@ -37,19 +39,30 @@ func (bh *BackendHealth) setLocal(cfg *config.Config) {
3739
bh.Local = false
3840
}
3941

40-
func (bh *BackendHealth) Equals(health BackendHealth) bool {
41-
return bh.Healthy == health.Healthy && bh.ServerVersion == health.ServerVersion && bh.Local == health.Local
42+
func (bh *BackendHealth) Equals(other BackendHealth) bool {
43+
return bh.BackendInfo.Equals(other.BackendInfo) &&
44+
bh.Healthy == other.Healthy &&
45+
bh.ServerVersion == other.ServerVersion
4246
}
4347

4448
func (bh *BackendHealth) String() string {
45-
str := "down"
49+
var sb strings.Builder
4650
if bh.Healthy {
47-
str = "healthy"
51+
_, _ = sb.WriteString("healthy")
52+
} else {
53+
_, _ = sb.WriteString("down")
4854
}
4955
if bh.PingErr != nil {
50-
str += fmt.Sprintf(", err: %s", bh.PingErr.Error())
56+
_, _ = sb.WriteString(fmt.Sprintf(", err: %s", bh.PingErr.Error()))
5157
}
52-
return str
58+
if len(bh.ServerVersion) > 0 {
59+
_, _ = sb.WriteString(", version: ")
60+
_, _ = sb.WriteString(bh.ServerVersion)
61+
}
62+
if bh.Labels != nil {
63+
_, _ = sb.WriteString(fmt.Sprintf(", labels: %v", bh.Labels))
64+
}
65+
return sb.String()
5366
}
5467

5568
// BackendInfo stores the status info of each backend.
@@ -59,6 +72,12 @@ type BackendInfo struct {
5972
StatusPort uint
6073
}
6174

75+
func (bi BackendInfo) Equals(other BackendInfo) bool {
76+
return bi.IP == other.IP &&
77+
bi.StatusPort == other.StatusPort &&
78+
maps.Equal(bi.Labels, other.Labels)
79+
}
80+
6281
// HealthResult contains the health check results and is used to notify the routers.
6382
// It's read-only for subscribers.
6483
type HealthResult struct {
Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,109 @@
1+
// Copyright 2025 PingCAP, Inc.
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
package observer
5+
6+
import (
7+
"errors"
8+
"testing"
9+
10+
"github.com/stretchr/testify/require"
11+
)
12+
13+
func TestBackendHealthToString(t *testing.T) {
14+
tests := []BackendHealth{
15+
{},
16+
{
17+
BackendInfo: BackendInfo{
18+
IP: "127.0.0.1",
19+
StatusPort: 1,
20+
Labels: map[string]string{"k1": "v1", "k2": "v2"},
21+
},
22+
Healthy: true,
23+
PingErr: errors.New("mock error"),
24+
ServerVersion: "v1.0.0",
25+
Local: true,
26+
},
27+
}
28+
// Just test no error happens
29+
for _, test := range tests {
30+
_ = test.String()
31+
}
32+
}
33+
34+
func TestBackendHealthEquals(t *testing.T) {
35+
tests := []struct {
36+
a, b BackendHealth
37+
equal bool
38+
}{
39+
{
40+
a: BackendHealth{},
41+
b: BackendHealth{},
42+
equal: true,
43+
},
44+
{
45+
a: BackendHealth{
46+
BackendInfo: BackendInfo{
47+
IP: "127.0.0.1",
48+
StatusPort: 1,
49+
Labels: map[string]string{"k1": "v1", "k2": "v2"},
50+
},
51+
},
52+
b: BackendHealth{
53+
BackendInfo: BackendInfo{
54+
IP: "127.0.0.1",
55+
StatusPort: 1,
56+
},
57+
},
58+
equal: false,
59+
},
60+
{
61+
a: BackendHealth{
62+
BackendInfo: BackendInfo{
63+
IP: "127.0.0.1",
64+
StatusPort: 1,
65+
Labels: map[string]string{"k1": "v1", "k2": "v2"},
66+
},
67+
},
68+
b: BackendHealth{
69+
BackendInfo: BackendInfo{
70+
IP: "127.0.0.1",
71+
StatusPort: 1,
72+
Labels: map[string]string{"k1": "v1", "k2": "v2"},
73+
},
74+
},
75+
equal: true,
76+
},
77+
{
78+
a: BackendHealth{
79+
BackendInfo: BackendInfo{
80+
IP: "127.0.0.1",
81+
StatusPort: 1,
82+
Labels: map[string]string{"k1": "v1", "k2": "v2"},
83+
},
84+
Healthy: true,
85+
PingErr: errors.New("mock error"),
86+
ServerVersion: "v1.0.0",
87+
Local: true,
88+
},
89+
b: BackendHealth{},
90+
equal: false,
91+
},
92+
{
93+
a: BackendHealth{
94+
ServerVersion: "v1.0.0",
95+
},
96+
b: BackendHealth{
97+
ServerVersion: "v1.1.0",
98+
},
99+
equal: false,
100+
},
101+
}
102+
// Just test no error happens
103+
for i, test := range tests {
104+
require.True(t, test.a.Equals(test.a), "test %d", i)
105+
require.True(t, test.b.Equals(test.b), "test %d", i)
106+
require.Equal(t, test.equal, test.a.Equals(test.b), "test %d", i)
107+
require.Equal(t, test.equal, test.b.Equals(test.a), "test %d", i)
108+
}
109+
}

pkg/balance/observer/backend_observer.go

Lines changed: 8 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ package observer
55

66
import (
77
"context"
8+
"maps"
89
"sync"
910
"time"
1011

@@ -193,27 +194,22 @@ func (bo *DefaultBackendObserver) updateHealthResult(result HealthResult) {
193194
continue
194195
}
195196
if oldHealth, ok := bo.curBackends[addr]; !ok || !oldHealth.Healthy {
196-
prev := "none"
197-
if oldHealth != nil {
198-
prev = oldHealth.String()
199-
}
200-
bo.logger.Info("update backend", zap.String("backend_addr", addr),
201-
zap.String("prev", prev), zap.String("cur", newHealth.String()))
197+
bo.logger.Info("update backend", zap.String("addr", addr), zap.Stringer("prev", oldHealth), zap.Stringer("cur", newHealth),
198+
zap.Int("total", len(bo.curBackends)))
202199
updateBackendStatusMetrics(addr, true)
203200
delete(bo.downBackends, addr)
201+
} else if ok && !maps.Equal(oldHealth.Labels, newHealth.Labels) {
202+
bo.logger.Info("update backend labels", zap.String("addr", addr),
203+
zap.Any("prev", oldHealth.Labels), zap.Any("cur", newHealth.Labels))
204204
}
205205
}
206206
for addr, oldHealth := range bo.curBackends {
207207
if !oldHealth.Healthy {
208208
continue
209209
}
210210
if newHealth, ok := result.backends[addr]; !ok || !newHealth.Healthy {
211-
cur := "not in list"
212-
if newHealth != nil {
213-
cur = newHealth.String()
214-
}
215-
bo.logger.Info("update backend", zap.String("backend_addr", addr),
216-
zap.String("prev", oldHealth.String()), zap.String("cur", cur))
211+
bo.logger.Info("update backend", zap.String("addr", addr), zap.Stringer("prev", oldHealth), zap.Stringer("cur", newHealth),
212+
zap.Int("total", len(bo.curBackends)))
217213
updateBackendStatusMetrics(addr, false)
218214
bo.downBackends[addr] = time.Now()
219215
}

pkg/balance/router/router.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,13 @@ func (b *backendWrapper) setHealth(health observer.BackendHealth) {
110110
b.mu.Unlock()
111111
}
112112

113+
func (b *backendWrapper) getHealth() observer.BackendHealth {
114+
b.mu.RLock()
115+
health := b.mu.BackendHealth
116+
b.mu.RUnlock()
117+
return health
118+
}
119+
113120
func (b *backendWrapper) ConnScore() int {
114121
return b.connScore
115122
}

pkg/balance/router/router_score.go

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -282,6 +282,8 @@ func (router *ScoreBasedRouter) updateBackendHealth(healthResults observer.Healt
282282
// If some backends are removed from the list, add them to `backends`.
283283
for addr, backend := range router.backends {
284284
if _, ok := backends[addr]; !ok {
285+
health := backend.getHealth()
286+
router.logger.Debug("backend is removed from the list, add it back to router", zap.String("addr", addr), zap.Stringer("health", &health))
285287
backends[addr] = &observer.BackendHealth{
286288
BackendInfo: backend.GetBackendInfo(),
287289
Healthy: false,
@@ -293,16 +295,20 @@ func (router *ScoreBasedRouter) updateBackendHealth(healthResults observer.Healt
293295
for addr, health := range backends {
294296
backend, ok := router.backends[addr]
295297
if !ok && health.Healthy {
298+
router.logger.Debug("add new backend to router", zap.String("addr", addr), zap.Stringer("health", health))
296299
router.backends[addr] = newBackendWrapper(addr, *health)
297300
serverVersion = health.ServerVersion
298301
} else if ok {
299-
if !backend.Equals(*health) {
300-
backend.setHealth(*health)
301-
router.removeBackendIfEmpty(backend)
302-
if health.Healthy {
303-
serverVersion = health.ServerVersion
304-
}
302+
if !health.Equals(backend.getHealth()) {
303+
router.logger.Debug("update backend in router", zap.String("addr", addr), zap.Stringer("health", health))
304+
}
305+
backend.setHealth(*health)
306+
router.removeBackendIfEmpty(backend)
307+
if health.Healthy {
308+
serverVersion = health.ServerVersion
305309
}
310+
} else {
311+
router.logger.Debug("unhealthy backend is not in router", zap.String("addr", addr), zap.Stringer("health", health))
306312
}
307313
}
308314
if len(serverVersion) > 0 {

0 commit comments

Comments
 (0)