Skip to content

Commit 0de024e

Browse files
balance: fix connection is repeatedly migrated by CPU and location (#823) (#824)
Co-authored-by: djshow832 <[email protected]>
1 parent 0c4c57f commit 0de024e

16 files changed

+195
-57
lines changed

pkg/balance/factor/factor.go

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,17 @@ import (
88
"go.uber.org/zap"
99
)
1010

11+
type BalanceAdvice int
12+
13+
const (
14+
// AdviceNeutral indicates skipping this factor and continue to the next factor.
15+
AdviceNeutral BalanceAdvice = iota
16+
// AdviceNegtive indicates don't balance these 2 backends, even for the rest factors.
17+
AdviceNegtive
18+
// AdvicePositive indicates balancing these 2 backends now.
19+
AdvicePositive
20+
)
21+
1122
type Factor interface {
1223
// Name returns the name of the factor.
1324
Name() string
@@ -17,7 +28,7 @@ type Factor interface {
1728
ScoreBitNum() int
1829
// BalanceCount returns the count of connections to balance per second.
1930
// 0 indicates the factor is already balanced.
20-
BalanceCount(from, to scoredBackend) (float64, []zap.Field)
31+
BalanceCount(from, to scoredBackend) (BalanceAdvice, float64, []zap.Field)
2132
SetConfig(cfg *config.Config)
2233
// CanBeRouted returns whether a connection can be routed or migrated to the backend with the score.
2334
CanBeRouted(score uint64) bool

pkg/balance/factor/factor_balance.go

Lines changed: 24 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -213,7 +213,9 @@ func (fbb *FactorBasedBalance) BackendToRoute(backends []policy.BackendCtx) poli
213213
score2 := scoredBackends[0].scoreBits << (maxBitNum - leftBitNum) >> (maxBitNum - bitNum)
214214
if score1 > score2 {
215215
var balanceFields []zap.Field
216-
if balanceCount, balanceFields = factor.BalanceCount(scoredBackends[i], scoredBackends[0]); balanceCount > 0.0001 {
216+
var advice BalanceAdvice
217+
advice, balanceCount, balanceFields = factor.BalanceCount(scoredBackends[i], scoredBackends[0])
218+
if advice == AdvicePositive && balanceCount > 0.0001 {
217219
// This backend is too busy. If it's routed, migration may happen.
218220
fields = append(fields, zap.String(scoredBackends[i].Addr(), factor.Name()))
219221
fields = append(fields, balanceFields...)
@@ -274,20 +276,28 @@ func (fbb *FactorBasedBalance) BackendsToBalance(backends []policy.BackendCtx) (
274276
score1 := scoredBackends[i].scoreBits << (maxBitNum - leftBitNum) >> (maxBitNum - bitNum)
275277
score2 := scoredBackends[0].scoreBits << (maxBitNum - leftBitNum) >> (maxBitNum - bitNum)
276278
if score1 > score2 {
277-
// The previous factors are ordered, so this factor won't violate them.
278-
// E.g. the factor scores of 2 backends are [1, 1], [0, 0]
279-
// Balancing the second factor won't make the first factor unbalanced.
279+
// The factors with higher priorities are ordered, so this factor shouldn't violate them.
280+
// E.g. if the CPU usage of A is higher than B, don't migrate from B to A even if A is preferred in location.
280281
var fields []zap.Field
281-
if balanceCount, fields = factor.BalanceCount(scoredBackends[i], scoredBackends[0]); balanceCount > 0.0001 {
282-
from, to = scoredBackends[i].BackendCtx, scoredBackends[0].BackendCtx
283-
reason = factor.Name()
284-
logFields = append(fields, zap.String("factor", reason),
285-
zap.String("from_total_score", strconv.FormatUint(scoredBackends[i].scoreBits, 16)),
286-
zap.String("to_total_score", strconv.FormatUint(scoredBackends[0].scoreBits, 16)),
287-
zap.Uint64("from_factor_score", score1),
288-
zap.Uint64("to_factor_score", score2),
289-
zap.Float64("balance_count", balanceCount))
290-
return
282+
var advice BalanceAdvice
283+
advice, balanceCount, fields = factor.BalanceCount(scoredBackends[i], scoredBackends[0])
284+
if advice == AdviceNegtive {
285+
// If the factor will be unbalanced after migration, skip the rest factors.
286+
// E.g. if the CPU usage of A will be much higher than B after migration,
287+
// don't migrate from B to A even if A is preferred in location.
288+
break
289+
} else if advice == AdvicePositive {
290+
if balanceCount > 0.0001 {
291+
from, to = scoredBackends[i].BackendCtx, scoredBackends[0].BackendCtx
292+
reason = factor.Name()
293+
logFields = append(fields, zap.String("factor", reason),
294+
zap.String("from_total_score", strconv.FormatUint(scoredBackends[i].scoreBits, 16)),
295+
zap.String("to_total_score", strconv.FormatUint(scoredBackends[0].scoreBits, 16)),
296+
zap.Uint64("from_factor_score", score1),
297+
zap.Uint64("to_factor_score", score2),
298+
zap.Float64("balance_count", balanceCount))
299+
return
300+
}
291301
}
292302
} else if score1 < score2 {
293303
// Stop it once a factor is in the opposite order, otherwise a subsequent factor may violate this one.

pkg/balance/factor/factor_balance_test.go

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -475,6 +475,51 @@ func TestCanBeRouted(t *testing.T) {
475475
}
476476
}
477477

478+
func TestCanBalance(t *testing.T) {
479+
fm := NewFactorBasedBalance(zap.NewNop(), newMockMetricsReader())
480+
factor1 := &mockFactor{bitNum: 2, balanceCount: 1, advice: AdviceNegtive}
481+
factor2 := &mockFactor{bitNum: 2, balanceCount: 1}
482+
fm.factors = []Factor{factor1, factor2}
483+
require.NoError(t, fm.updateBitNum())
484+
485+
tests := []struct {
486+
scores1 []int
487+
scores2 []int
488+
balanced bool
489+
}{
490+
{
491+
scores1: []int{1, 0},
492+
scores2: []int{0, 0},
493+
balanced: false,
494+
},
495+
{
496+
scores1: []int{1, 1},
497+
scores2: []int{1, 0},
498+
balanced: false,
499+
},
500+
{
501+
scores1: []int{1, 1},
502+
scores2: []int{1, 1},
503+
balanced: false,
504+
},
505+
}
506+
for tIdx, test := range tests {
507+
factor1.updateScore = func(backends []scoredBackend) {
508+
for i := 0; i < len(backends); i++ {
509+
backends[i].addScore(test.scores1[i], factor1.bitNum)
510+
}
511+
}
512+
factor2.updateScore = func(backends []scoredBackend) {
513+
for i := 0; i < len(backends); i++ {
514+
backends[i].addScore(test.scores2[i], factor2.bitNum)
515+
}
516+
}
517+
backends := createBackends(len(test.scores1))
518+
_, _, count, _, _ := fm.BackendsToBalance(backends)
519+
require.Equal(t, test.balanced, count > 0, "test index %d", tIdx)
520+
}
521+
}
522+
478523
func TestSetFactorConcurrently(t *testing.T) {
479524
fbb := NewFactorBasedBalance(zap.NewNop(), newMockMetricsReader())
480525
var wg waitgroup.WaitGroup

pkg/balance/factor/factor_conn.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -54,16 +54,16 @@ func (fcc *FactorConnCount) ScoreBitNum() int {
5454
return fcc.bitNum
5555
}
5656

57-
func (fcc *FactorConnCount) BalanceCount(from, to scoredBackend) (float64, []zap.Field) {
57+
func (fcc *FactorConnCount) BalanceCount(from, to scoredBackend) (BalanceAdvice, float64, []zap.Field) {
5858
if float64(from.ConnScore()) <= float64(to.ConnScore()+1)*connBalancedRatio {
59-
return 0, nil
59+
return AdviceNeutral, 0, nil
6060
}
6161
targetTo := float64(from.ConnScore()+to.ConnScore()+1) / (1 + connBalancedRatio)
6262
count := (targetTo - float64(to.ConnScore()+1)) / balanceSeconds4Conn
6363
if count < 0 {
6464
count = 0
6565
}
66-
return count, nil
66+
return AdvicePositive, count, nil
6767
}
6868

6969
func (fcc *FactorConnCount) SetConfig(cfg *config.Config) {

pkg/balance/factor/factor_conn_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -73,8 +73,8 @@ func TestFactorConnSpeed(t *testing.T) {
7373
lastRedirectTime := 0
7474
// Simulate rebalance for 5 minutes.
7575
for j := 0; j < 30000; j++ {
76-
balanceCount, _ := factor.BalanceCount(scoredBackend1, scoredBackend2)
77-
if balanceCount < 0.0001 {
76+
advice, balanceCount, _ := factor.BalanceCount(scoredBackend1, scoredBackend2)
77+
if advice != AdvicePositive || balanceCount < 0.0001 {
7878
break
7979
}
8080
migrationInterval := 100 / balanceCount

pkg/balance/factor/factor_cpu.go

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,9 @@ const (
2121
cpuMetricExpDuration = 2 * time.Minute
2222
cpuScoreStep = 5
2323
// 0.001 represents for 0.1%
24-
minCpuPerConn = 0.001
25-
cpuBalancedRatio = 1.2
24+
minCpuPerConn = 0.001
25+
cpuBalancedRatio = 1.2
26+
cpuUnbalancedRatio = 1.1
2627
// If the CPU difference of 2 backends is 30% and we're narrowing it to 20% in 30 seconds,
2728
// then in each round, we migrate ((30% - 20%) / 2) / usagePerConn / 30 = 1 / usagePerConn / 600 connections.
2829
balanceRatio4Cpu = 600
@@ -246,7 +247,7 @@ func (fc *FactorCPU) ScoreBitNum() int {
246247
return fc.bitNum
247248
}
248249

249-
func (fc *FactorCPU) BalanceCount(from, to scoredBackend) (float64, []zap.Field) {
250+
func (fc *FactorCPU) BalanceCount(from, to scoredBackend) (BalanceAdvice, float64, []zap.Field) {
250251
fromAvgUsage, fromLatestUsage := fc.getUsage(from)
251252
toAvgUsage, toLatestUsage := fc.getUsage(to)
252253
fields := []zap.Field{
@@ -260,13 +261,18 @@ func (fc *FactorCPU) BalanceCount(from, to scoredBackend) (float64, []zap.Field)
260261
zap.Int("to_conn", to.ConnScore()),
261262
zap.Float64("usage_per_conn", fc.usagePerConn),
262263
}
264+
// Reject migration if it will make the target backend even much busier than the source.
265+
if (1.3-(toAvgUsage+fc.usagePerConn))*cpuUnbalancedRatio < 1.3-(fromAvgUsage-fc.usagePerConn) ||
266+
(1.3-(toLatestUsage+fc.usagePerConn))*cpuUnbalancedRatio < 1.3-(fromLatestUsage-fc.usagePerConn) {
267+
return AdviceNegtive, 0, fields
268+
}
263269
// The higher the CPU usage, the more sensitive the load balance should be.
264270
// E.g. 10% vs 25% don't need rebalance, but 80% vs 95% need rebalance.
265271
// Use the average usage to avoid thrash when CPU jitters too much and use the latest usage to avoid migrate too many connections.
266272
if 1.3-toAvgUsage < (1.3-fromAvgUsage)*cpuBalancedRatio || 1.3-toLatestUsage < (1.3-fromLatestUsage)*cpuBalancedRatio {
267-
return 0, nil
273+
return AdviceNeutral, 0, nil
268274
}
269-
return 1 / fc.usagePerConn / balanceRatio4Cpu, fields
275+
return AdvicePositive, 1 / fc.usagePerConn / balanceRatio4Cpu, fields
270276
}
271277

272278
func (fc *FactorCPU) SetConfig(cfg *config.Config) {

pkg/balance/factor/factor_cpu_test.go

Lines changed: 55 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,7 @@ func TestCPUBalanceOnce(t *testing.T) {
7979
backends := make([]scoredBackend, 0, len(test.cpus))
8080
values := make([]*model.SampleStream, 0, len(test.cpus))
8181
for j := 0; j < len(test.cpus); j++ {
82-
backends = append(backends, createBackend(j, 0, 0))
82+
backends = append(backends, createBackend(j, 100, 100))
8383
values = append(values, createSampleStream(test.cpus[j], j, model.Now()))
8484
}
8585
mmr := &mockMetricsReader{
@@ -100,7 +100,8 @@ func TestCPUBalanceOnce(t *testing.T) {
100100
}
101101
require.Equal(t, test.scoreOrder, sortedIdx, "test index %d", i)
102102
from, to := backends[len(backends)-1], backends[0]
103-
balanceCount, _ := fc.BalanceCount(from, to)
103+
advice, balanceCount, _ := fc.BalanceCount(from, to)
104+
require.Equal(t, test.balanced, advice == AdviceNeutral, "test index %d", i)
104105
require.Equal(t, test.balanced, balanceCount < 0.0001, "test index %d", i)
105106
}
106107
}
@@ -207,8 +208,8 @@ func TestCPUBalanceContinuously(t *testing.T) {
207208
t.Fatal("balance doesn't stop")
208209
}
209210
updateScore(fc, backends)
210-
balanceCount, _ := fc.BalanceCount(backends[len(backends)-1], backends[0])
211-
if balanceCount == 0 {
211+
advice, balanceCount, _ := fc.BalanceCount(backends[len(backends)-1], backends[0])
212+
if advice != AdvicePositive || balanceCount == 0 {
212213
break
213214
}
214215
count := int(balanceCount + 0.9999)
@@ -414,3 +415,53 @@ func TestCPUScore(t *testing.T) {
414415
require.Equal(t, test.scores, scores, "test index %d", i)
415416
}
416417
}
418+
419+
func TestCPURejectBalance(t *testing.T) {
420+
tests := []struct {
421+
cpus [][]float64
422+
conns []int
423+
advice BalanceAdvice
424+
}{
425+
{
426+
cpus: [][]float64{{0.7}, {0.7}},
427+
conns: []int{10, 10},
428+
advice: AdviceNegtive,
429+
},
430+
{
431+
cpus: [][]float64{{0.75}, {0.7}},
432+
conns: []int{10, 10},
433+
advice: AdviceNegtive,
434+
},
435+
{
436+
cpus: [][]float64{{0.78}, {0.7}},
437+
conns: []int{20, 20},
438+
advice: AdviceNeutral,
439+
},
440+
{
441+
cpus: [][]float64{{0.8}, {0.6}},
442+
conns: []int{10, 10},
443+
advice: AdvicePositive,
444+
},
445+
}
446+
447+
for i, test := range tests {
448+
backends := make([]scoredBackend, 0, len(test.cpus))
449+
values := make([]*model.SampleStream, 0, len(test.cpus))
450+
for j := 0; j < len(test.cpus); j++ {
451+
backends = append(backends, createBackend(j, test.conns[j], test.conns[j]))
452+
values = append(values, createSampleStream(test.cpus[j], j, model.Now()))
453+
}
454+
mmr := &mockMetricsReader{
455+
qrs: map[string]metricsreader.QueryResult{
456+
"cpu": {
457+
UpdateTime: time.Now(),
458+
Value: model.Matrix(values),
459+
},
460+
},
461+
}
462+
fc := NewFactorCPU(mmr, zap.NewNop())
463+
updateScore(fc, backends)
464+
advice, _, _ := fc.BalanceCount(backends[1], backends[0])
465+
require.Equal(t, test.advice, advice, "test index %d", i)
466+
}
467+
}

pkg/balance/factor/factor_health.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -349,18 +349,18 @@ func (fh *FactorHealth) ScoreBitNum() int {
349349
return fh.bitNum
350350
}
351351

352-
func (fh *FactorHealth) BalanceCount(from, to scoredBackend) (float64, []zap.Field) {
352+
func (fh *FactorHealth) BalanceCount(from, to scoredBackend) (BalanceAdvice, float64, []zap.Field) {
353353
// Only migrate connections when one is valueRangeNormal and the other is valueRangeAbnormal.
354354
fromScore := fh.caclErrScore(from.Addr())
355355
toScore := fh.caclErrScore(to.Addr())
356356
if fromScore-toScore <= 1 {
357-
return 0, nil
357+
return AdviceNeutral, 0, nil
358358
}
359359
snapshot := fh.snapshot[from.Addr()]
360360
fields := []zap.Field{
361361
zap.String("indicator", snapshot.indicator),
362362
zap.Int("value", snapshot.value)}
363-
return snapshot.balanceCount, fields
363+
return AdvicePositive, snapshot.balanceCount, fields
364364
}
365365

366366
func (fh *FactorHealth) SetConfig(cfg *config.Config) {

pkg/balance/factor/factor_health_test.go

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -163,7 +163,8 @@ func TestHealthBalance(t *testing.T) {
163163
return backends[i].score() < backends[j].score()
164164
})
165165
from, to := backends[len(backends)-1], backends[0]
166-
balanceCount, _ := fh.BalanceCount(from, to)
166+
advice, balanceCount, _ := fh.BalanceCount(from, to)
167+
require.Equal(t, test.balanced, advice == AdviceNeutral, "test index %d", i)
167168
require.Equal(t, test.balanced, balanceCount < 0.0001, "test index %d", i)
168169
}
169170
}
@@ -286,7 +287,8 @@ func TestHealthBalanceCount(t *testing.T) {
286287
if test.count == 0 {
287288
continue
288289
}
289-
count, _ := fh.BalanceCount(backends[0], backends[1])
290+
advice, count, _ := fh.BalanceCount(backends[0], backends[1])
291+
require.Equal(t, AdvicePositive, advice)
290292
require.Equal(t, test.count, count, "test idx: %d", i)
291293
}
292294
}
@@ -373,14 +375,16 @@ func TestMissBackendInHealth(t *testing.T) {
373375
Value: model.Vector(values),
374376
}
375377
fh.UpdateScore(backends)
376-
count, _ := fh.BalanceCount(backends[0], backends[1])
378+
advice, count, _ := fh.BalanceCount(backends[0], backends[1])
379+
require.Equal(t, AdvicePositive, advice)
377380
require.Equal(t, 100/balanceSeconds4Health, count)
378381

379382
// Miss the first backend but the snapshot should be preserved.
380383
fh.UpdateScore(backends[1:])
381384
unhealthyBackend := backends[0].BackendCtx.(*mockBackend)
382385
unhealthyBackend.connScore = 50
383386
fh.UpdateScore(backends)
384-
count, _ = fh.BalanceCount(backends[0], backends[1])
387+
advice, count, _ = fh.BalanceCount(backends[0], backends[1])
388+
require.Equal(t, AdvicePositive, advice)
385389
require.Equal(t, 100/balanceSeconds4Health, count)
386390
}

pkg/balance/factor/factor_label.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -53,13 +53,13 @@ func (fl *FactorLabel) ScoreBitNum() int {
5353
return fl.bitNum
5454
}
5555

56-
func (fl *FactorLabel) BalanceCount(from, to scoredBackend) (float64, []zap.Field) {
56+
func (fl *FactorLabel) BalanceCount(from, to scoredBackend) (BalanceAdvice, float64, []zap.Field) {
5757
fields := []zap.Field{
5858
zap.String("label_key", fl.labelName),
5959
zap.Any("from_labels", from.GetBackendInfo().Labels),
6060
zap.String("self_label_value", fl.selfLabelVal),
6161
}
62-
return balanceCount4Label, fields
62+
return AdvicePositive, balanceCount4Label, fields
6363
}
6464

6565
func (fl *FactorLabel) SetConfig(cfg *config.Config) {

0 commit comments

Comments
 (0)