Skip to content

Commit c22b4db

Browse files
balance: never route to backends with a different label (#805) (#807)
Co-authored-by: djshow832 <[email protected]>
1 parent a65bfbf commit c22b4db

File tree

11 files changed

+128
-19
lines changed

11 files changed

+128
-19
lines changed

pkg/balance/factor/factor.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,5 +19,7 @@ type Factor interface {
1919
// 0 indicates the factor is already balanced.
2020
BalanceCount(from, to scoredBackend) (float64, []zap.Field)
2121
SetConfig(cfg *config.Config)
22+
// CanBeRouted returns whether a connection can be routed or migrated to the backend with the score.
23+
CanBeRouted(score uint64) bool
2224
Close()
2325
}

pkg/balance/factor/factor_balance.go

Lines changed: 24 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -60,11 +60,6 @@ func (fbb *FactorBasedBalance) Init(cfg *config.Config) {
6060
func (fbb *FactorBasedBalance) setFactors(cfg *config.Config) {
6161
fbb.factors = fbb.factors[:0]
6262

63-
if fbb.factorStatus == nil {
64-
fbb.factorStatus = NewFactorStatus(fbb.lg.Named("status"))
65-
}
66-
fbb.factors = append(fbb.factors, fbb.factorStatus)
67-
6863
if cfg.Balance.LabelName != "" {
6964
if fbb.factorLabel == nil {
7065
fbb.factorLabel = NewFactorLabel()
@@ -75,6 +70,11 @@ func (fbb *FactorBasedBalance) setFactors(cfg *config.Config) {
7570
fbb.factorLabel = nil
7671
}
7772

73+
if fbb.factorStatus == nil {
74+
fbb.factorStatus = NewFactorStatus(fbb.lg.Named("status"))
75+
}
76+
fbb.factors = append(fbb.factors, fbb.factorStatus)
77+
7878
switch cfg.Balance.Policy {
7979
case config.BalancePolicyResource, config.BalancePolicyLocation:
8080
if fbb.factorLocation == nil {
@@ -177,12 +177,16 @@ func (fbb *FactorBasedBalance) BackendToRoute(backends []policy.BackendCtx) poli
177177
if len(backends) == 0 {
178178
return nil
179179
}
180+
181+
scoredBackends := fbb.updateScore(backends)
182+
if !fbb.canBeRouted(scoredBackends[0].scoreBits) {
183+
return nil
184+
}
180185
if len(backends) == 1 {
181186
return backends[0]
182187
}
183188

184189
var fields []zap.Field
185-
scoredBackends := fbb.updateScore(backends)
186190
for _, backend := range scoredBackends {
187191
fields = append(fields, zap.String(backend.Addr(), strconv.FormatUint(backend.scoreBits, 16)))
188192
}
@@ -237,8 +241,7 @@ func (fbb *FactorBasedBalance) BackendsToBalance(backends []policy.BackendCtx) (
237241
return
238242
}
239243
scoredBackends := fbb.updateScore(backends)
240-
// No need to migrate to unhealthy backends.
241-
if !scoredBackends[0].Healthy() {
244+
if !fbb.canBeRouted(scoredBackends[0].scoreBits) {
242245
return
243246
}
244247
if scoredBackends[0].scoreBits == scoredBackends[len(scoredBackends)-1].scoreBits {
@@ -287,6 +290,19 @@ func (fbb *FactorBasedBalance) BackendsToBalance(backends []policy.BackendCtx) (
287290
return
288291
}
289292

293+
func (fbb *FactorBasedBalance) canBeRouted(score uint64) bool {
294+
leftBitNum := fbb.totalBitNum
295+
for _, factor := range fbb.factors {
296+
bitNum := factor.ScoreBitNum()
297+
score := score << (maxBitNum - leftBitNum) >> (maxBitNum - bitNum)
298+
if !factor.CanBeRouted(score) {
299+
return false
300+
}
301+
leftBitNum -= bitNum
302+
}
303+
return true
304+
}
305+
290306
func (fbb *FactorBasedBalance) SetConfig(cfg *config.Config) {
291307
fbb.setFactors(cfg)
292308
}

pkg/balance/factor/factor_balance_test.go

Lines changed: 65 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -12,12 +12,13 @@ import (
1212
"github.com/pingcap/tiproxy/lib/util/logger"
1313
"github.com/pingcap/tiproxy/pkg/balance/policy"
1414
"github.com/stretchr/testify/require"
15+
"go.uber.org/zap"
1516
)
1617

1718
func TestRouteWithOneFactor(t *testing.T) {
1819
lg, _ := logger.CreateLoggerForTest(t)
1920
fm := NewFactorBasedBalance(lg, newMockMetricsReader())
20-
factor := &mockFactor{bitNum: 8, balanceCount: 1, threshold: 1}
21+
factor := &mockFactor{bitNum: 8, balanceCount: 1, threshold: 1, canBeRouted: true}
2122
fm.factors = []Factor{factor}
2223
require.NoError(t, fm.updateBitNum())
2324

@@ -71,8 +72,8 @@ func TestRouteWithOneFactor(t *testing.T) {
7172
func TestRouteWith2Factors(t *testing.T) {
7273
lg, _ := logger.CreateLoggerForTest(t)
7374
fm := NewFactorBasedBalance(lg, newMockMetricsReader())
74-
factor1 := &mockFactor{bitNum: 4, balanceCount: 1, threshold: 1}
75-
factor2 := &mockFactor{bitNum: 12, balanceCount: 1, threshold: 1}
75+
factor1 := &mockFactor{bitNum: 4, balanceCount: 1, threshold: 1, canBeRouted: true}
76+
factor2 := &mockFactor{bitNum: 12, balanceCount: 1, threshold: 1, canBeRouted: true}
7677
fm.factors = []Factor{factor1, factor2}
7778
require.NoError(t, fm.updateBitNum())
7879

@@ -143,7 +144,7 @@ func TestRouteWith2Factors(t *testing.T) {
143144
func TestBalanceWithOneFactor(t *testing.T) {
144145
lg, _ := logger.CreateLoggerForTest(t)
145146
fm := NewFactorBasedBalance(lg, newMockMetricsReader())
146-
factor := &mockFactor{bitNum: 2, balanceCount: 1}
147+
factor := &mockFactor{bitNum: 2, balanceCount: 1, canBeRouted: true}
147148
fm.factors = []Factor{factor}
148149
require.NoError(t, fm.updateBitNum())
149150

@@ -193,8 +194,8 @@ func TestBalanceWithOneFactor(t *testing.T) {
193194
func TestBalanceWith2Factors(t *testing.T) {
194195
lg, _ := logger.CreateLoggerForTest(t)
195196
fm := NewFactorBasedBalance(lg, newMockMetricsReader())
196-
factor1 := &mockFactor{bitNum: 2, balanceCount: 2, threshold: 1}
197-
factor2 := &mockFactor{bitNum: 12, balanceCount: 1}
197+
factor1 := &mockFactor{bitNum: 2, balanceCount: 2, threshold: 1, canBeRouted: true}
198+
factor2 := &mockFactor{bitNum: 12, balanceCount: 1, canBeRouted: true}
198199
fm.factors = []Factor{factor1, factor2}
199200
require.NoError(t, fm.updateBitNum())
200201

@@ -280,7 +281,7 @@ func TestBalanceWith2Factors(t *testing.T) {
280281
func TestBalanceWith3Factors(t *testing.T) {
281282
lg, _ := logger.CreateLoggerForTest(t)
282283
fm := NewFactorBasedBalance(lg, newMockMetricsReader())
283-
factors := []*mockFactor{{bitNum: 1}, {bitNum: 2}, {bitNum: 2}}
284+
factors := []*mockFactor{{bitNum: 1, canBeRouted: true}, {bitNum: 2, canBeRouted: true}, {bitNum: 2, canBeRouted: true}}
284285
fm.factors = []Factor{factors[0], factors[1], factors[2]}
285286
require.NoError(t, fm.updateBitNum())
286287

@@ -333,7 +334,7 @@ func TestBalanceWith3Factors(t *testing.T) {
333334
func TestScoreAlwaysUpdated(t *testing.T) {
334335
lg, _ := logger.CreateLoggerForTest(t)
335336
fm := NewFactorBasedBalance(lg, newMockMetricsReader())
336-
factors := []*mockFactor{{bitNum: 1}, {bitNum: 2}}
337+
factors := []*mockFactor{{bitNum: 1, canBeRouted: true}, {bitNum: 2, canBeRouted: true}}
337338
fm.factors = []Factor{factors[0], factors[1]}
338339
factors[0].updateScore = func(backends []scoredBackend) {
339340
for i := 0; i < len(backends); i++ {
@@ -375,14 +376,14 @@ func TestSetFactors(t *testing.T) {
375376
setFunc: func(balance *config.Balance) {
376377
balance.LabelName = "group"
377378
},
378-
expectedNames: []string{"status", "label", "health", "memory", "cpu", "location", "conn"},
379+
expectedNames: []string{"label", "status", "health", "memory", "cpu", "location", "conn"},
379380
},
380381
{
381382
setFunc: func(balance *config.Balance) {
382383
balance.Policy = config.BalancePolicyLocation
383384
balance.LabelName = "group"
384385
},
385-
expectedNames: []string{"status", "label", "location", "health", "memory", "cpu", "conn"},
386+
expectedNames: []string{"label", "status", "location", "health", "memory", "cpu", "conn"},
386387
},
387388
{
388389
setFunc: func(balance *config.Balance) {
@@ -395,7 +396,7 @@ func TestSetFactors(t *testing.T) {
395396
balance.Policy = config.BalancePolicyConnection
396397
balance.LabelName = "group"
397398
},
398-
expectedNames: []string{"status", "label", "conn"},
399+
expectedNames: []string{"label", "status", "conn"},
399400
},
400401
}
401402

@@ -417,3 +418,56 @@ func TestSetFactors(t *testing.T) {
417418
}
418419
fm.Close()
419420
}
421+
422+
func TestCanBeRouted(t *testing.T) {
423+
fm := NewFactorBasedBalance(zap.NewNop(), newMockMetricsReader())
424+
factor1 := &mockFactor{bitNum: 2, balanceCount: 1, canBeRouted: false}
425+
factor2 := &mockFactor{bitNum: 2, balanceCount: 1, canBeRouted: true}
426+
fm.factors = []Factor{factor1, factor2}
427+
require.NoError(t, fm.updateBitNum())
428+
429+
tests := []struct {
430+
scores1 []int
431+
scores2 []int
432+
routed bool
433+
}{
434+
{
435+
scores1: []int{1, 0},
436+
scores2: []int{0, 0},
437+
routed: true,
438+
},
439+
{
440+
scores1: []int{1, 1},
441+
scores2: []int{1, 0},
442+
routed: false,
443+
},
444+
{
445+
scores1: []int{0, 0},
446+
scores2: []int{2, 1},
447+
routed: true,
448+
},
449+
{
450+
scores1: []int{2, 1},
451+
scores2: []int{0, 0},
452+
routed: false,
453+
},
454+
}
455+
for tIdx, test := range tests {
456+
factor1.updateScore = func(backends []scoredBackend) {
457+
for i := 0; i < len(backends); i++ {
458+
backends[i].addScore(test.scores1[i], factor1.bitNum)
459+
}
460+
}
461+
factor2.updateScore = func(backends []scoredBackend) {
462+
for i := 0; i < len(backends); i++ {
463+
backends[i].addScore(test.scores2[i], factor2.bitNum)
464+
}
465+
}
466+
backends := createBackends(len(test.scores1))
467+
_, _, count, _, _ := fm.BackendsToBalance(backends)
468+
require.Equal(t, test.routed, count > 0, "test index %d", tIdx)
469+
backends = createBackends(len(test.scores1))
470+
b := fm.BackendToRoute(backends)
471+
require.Equal(t, test.routed, b != nil, "test index %d", tIdx)
472+
}
473+
}

pkg/balance/factor/factor_conn.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,5 +69,9 @@ func (fcc *FactorConnCount) BalanceCount(from, to scoredBackend) (float64, []zap
6969
func (fcc *FactorConnCount) SetConfig(cfg *config.Config) {
7070
}
7171

72+
func (fcc *FactorConnCount) CanBeRouted(_ uint64) bool {
73+
return true
74+
}
75+
7276
func (fcc *FactorConnCount) Close() {
7377
}

pkg/balance/factor/factor_cpu.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -272,6 +272,10 @@ func (fc *FactorCPU) BalanceCount(from, to scoredBackend) (float64, []zap.Field)
272272
func (fc *FactorCPU) SetConfig(cfg *config.Config) {
273273
}
274274

275+
func (fc *FactorCPU) CanBeRouted(_ uint64) bool {
276+
return true
277+
}
278+
275279
func (fc *FactorCPU) Close() {
276280
fc.mr.RemoveQueryExpr(fc.Name())
277281
}

pkg/balance/factor/factor_health.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -366,6 +366,10 @@ func (fh *FactorHealth) BalanceCount(from, to scoredBackend) (float64, []zap.Fie
366366
func (fh *FactorHealth) SetConfig(cfg *config.Config) {
367367
}
368368

369+
func (fh *FactorHealth) CanBeRouted(_ uint64) bool {
370+
return true
371+
}
372+
369373
func (fh *FactorHealth) Close() {
370374
for _, indicator := range fh.indicators {
371375
fh.mr.RemoveQueryExpr(indicator.key)

pkg/balance/factor/factor_label.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,5 +63,10 @@ func (fl *FactorLabel) SetConfig(cfg *config.Config) {
6363
}
6464
}
6565

66+
func (fl *FactorLabel) CanBeRouted(score uint64) bool {
67+
// Label is used to isolate business resources. Never route to the backends of another business.
68+
return score == 0
69+
}
70+
6671
func (fl *FactorLabel) Close() {
6772
}

pkg/balance/factor/factor_location.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,5 +53,9 @@ func (fl *FactorLocation) BalanceCount(from, to scoredBackend) (float64, []zap.F
5353
func (fl *FactorLocation) SetConfig(cfg *config.Config) {
5454
}
5555

56+
func (fl *FactorLocation) CanBeRouted(_ uint64) bool {
57+
return true
58+
}
59+
5660
func (fl *FactorLocation) Close() {
5761
}

pkg/balance/factor/factor_memory.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -288,6 +288,10 @@ func (fm *FactorMemory) BalanceCount(from, to scoredBackend) (float64, []zap.Fie
288288
func (fm *FactorMemory) SetConfig(cfg *config.Config) {
289289
}
290290

291+
func (fm *FactorMemory) CanBeRouted(_ uint64) bool {
292+
return true
293+
}
294+
291295
func (fm *FactorMemory) Close() {
292296
fm.mr.RemoveQueryExpr(fm.Name())
293297
}

pkg/balance/factor/factor_status.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -107,5 +107,9 @@ func (fs *FactorStatus) BalanceCount(from, to scoredBackend) (float64, []zap.Fie
107107
func (fs *FactorStatus) SetConfig(cfg *config.Config) {
108108
}
109109

110+
func (fl *FactorStatus) CanBeRouted(score uint64) bool {
111+
return score == 0
112+
}
113+
110114
func (fs *FactorStatus) Close() {
111115
}

0 commit comments

Comments
 (0)