Skip to content

Commit 3e7fdce

Browse files
authored
factor: update migration speed for factors (#573)
1 parent 00d7eac commit 3e7fdce

20 files changed

+364
-325
lines changed

pkg/balance/factor/factor.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ type Factor interface {
1414
ScoreBitNum() int
1515
// BalanceCount returns the count of connections to balance per second.
1616
// 0 indicates the factor is already balanced.
17-
BalanceCount(from, to scoredBackend) int
17+
BalanceCount(from, to scoredBackend) float64
1818
SetConfig(cfg *config.Config)
1919
Close()
2020
}

pkg/balance/factor/factor_balance.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -176,7 +176,7 @@ func (fbb *FactorBasedBalance) BackendToRoute(backends []policy.BackendCtx) poli
176176
// BackendsToBalance returns the busiest/unhealthy backend and the idlest backend.
177177
// balanceCount: the count of connections to migrate in this round. 0 indicates no need to balance.
178178
// reason: the debug information to be logged.
179-
func (fbb *FactorBasedBalance) BackendsToBalance(backends []policy.BackendCtx) (from, to policy.BackendCtx, balanceCount int, reason string, logFields []zap.Field) {
179+
func (fbb *FactorBasedBalance) BackendsToBalance(backends []policy.BackendCtx) (from, to policy.BackendCtx, balanceCount float64, reason string, logFields []zap.Field) {
180180
if len(backends) <= 1 {
181181
return
182182
}
@@ -217,7 +217,7 @@ func (fbb *FactorBasedBalance) BackendsToBalance(backends []policy.BackendCtx) (
217217
// backend2 factor scores: 0, 0
218218
// Balancing the second factor won't make the first factor unbalanced.
219219
balanceCount = factor.BalanceCount(*busiestBackend, *idlestBackend)
220-
if balanceCount > 0 {
220+
if balanceCount > 0.0001 {
221221
break
222222
}
223223
} else if score1 < score2 {

pkg/balance/factor/factor_balance_test.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -137,7 +137,7 @@ func TestBalanceWithOneFactor(t *testing.T) {
137137
}
138138
backends := createBackends(len(test.scores))
139139
from, to, count, reason, _ := fm.BackendsToBalance(backends)
140-
require.Equal(t, test.count, count, "test index %d", tIdx)
140+
require.EqualValues(t, test.count, count, "test index %d", tIdx)
141141
if test.count > 0 {
142142
require.Equal(t, backends[test.fromIdx], from, "test index %d", tIdx)
143143
require.Equal(t, backends[test.toIdx], to, "test index %d", tIdx)
@@ -217,7 +217,7 @@ func TestBalanceWith2Factors(t *testing.T) {
217217
}
218218
backends := createBackends(len(test.scores1))
219219
from, to, count, _, _ := fm.BackendsToBalance(backends)
220-
require.Equal(t, test.count, count, "test index %d", tIdx)
220+
require.EqualValues(t, test.count, count, "test index %d", tIdx)
221221
if test.count > 0 {
222222
require.Equal(t, backends[test.fromIdx], from, "test index %d", tIdx)
223223
require.Equal(t, backends[test.toIdx], to, "test index %d", tIdx)
@@ -258,7 +258,7 @@ func TestBalanceWith3Factors(t *testing.T) {
258258
for tIdx, test := range tests {
259259
for factorIdx, factor := range factors {
260260
func(factorIdx int, factor *mockFactor) {
261-
factor.balanceCount = test.balanceCounts[factorIdx]
261+
factor.balanceCount = float64(test.balanceCounts[factorIdx])
262262
factor.updateScore = func(backends []scoredBackend) {
263263
for i := 0; i < len(backends); i++ {
264264
backends[i].addScore(test.scores[i][factorIdx], factor.bitNum)
@@ -268,7 +268,7 @@ func TestBalanceWith3Factors(t *testing.T) {
268268
}
269269
backends := createBackends(len(test.scores))
270270
from, to, count, _, _ := fm.BackendsToBalance(backends)
271-
require.Equal(t, test.count, count, "test index %d", tIdx)
271+
require.EqualValues(t, test.count, count, "test index %d", tIdx)
272272
if test.count > 0 {
273273
require.Equal(t, backends[test.fromIdx], from, "test index %d", tIdx)
274274
require.Equal(t, backends[test.toIdx], to, "test index %d", tIdx)

pkg/balance/factor/factor_conn.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ func (fcc *FactorConnCount) ScoreBitNum() int {
4040
return fcc.bitNum
4141
}
4242

43-
func (fcc *FactorConnCount) BalanceCount(from, to scoredBackend) int {
43+
func (fcc *FactorConnCount) BalanceCount(from, to scoredBackend) float64 {
4444
if float64(from.ConnScore()) > float64(to.ConnScore()+1)*connBalancedRatio {
4545
return balanceCount4Conn
4646
}

pkg/balance/factor/factor_cpu.go

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -208,17 +208,14 @@ func (fc *FactorCPU) ScoreBitNum() int {
208208
return fc.bitNum
209209
}
210210

211-
func (fc *FactorCPU) BalanceCount(from, to scoredBackend) int {
211+
func (fc *FactorCPU) BalanceCount(from, to scoredBackend) float64 {
212212
fromAvgUsage, fromLatestUsage := fc.getUsage(from)
213213
toAvgUsage, toLatestUsage := fc.getUsage(to)
214214
// The higher the CPU usage, the more sensitive the load balance should be.
215215
// E.g. 10% vs 25% don't need rebalance, but 80% vs 95% need rebalance.
216216
// Use the average usage to avoid thrash when CPU jitters too much and use the latest usage to avoid migrate too many connections.
217217
if 1.3-toAvgUsage > (1.3-fromAvgUsage)*cpuBalancedRatio && 1.3-toLatestUsage > (1.3-fromLatestUsage)*cpuBalancedRatio {
218-
if balanceCount := int(1 / fc.usagePerConn / balanceRatio4Cpu); balanceCount > 1 {
219-
return balanceCount
220-
}
221-
return 1
218+
return 1 / fc.usagePerConn / balanceRatio4Cpu
222219
}
223220
return 0
224221
}

pkg/balance/factor/factor_cpu_test.go

Lines changed: 37 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -17,59 +17,59 @@ import (
1717

1818
func TestCPUBalanceOnce(t *testing.T) {
1919
tests := []struct {
20-
cpus [][]float64
21-
scoreOrder []int
22-
balanceCount int
20+
cpus [][]float64
21+
scoreOrder []int
22+
balanced bool
2323
}{
2424
{
25-
cpus: [][]float64{{0}, {0.2}},
26-
scoreOrder: []int{0, 1},
27-
balanceCount: 0,
25+
cpus: [][]float64{{0}, {0.2}},
26+
scoreOrder: []int{0, 1},
27+
balanced: true,
2828
},
2929
{
30-
cpus: [][]float64{{1, 0.5, 0.25}, {0, 0.25, 0.5}, {0, 0, 0}, {1, 1, 1}},
31-
scoreOrder: []int{2, 0, 1, 3},
32-
balanceCount: 1,
30+
cpus: [][]float64{{1, 0.5, 0.25}, {0, 0.25, 0.5}, {0, 0, 0}, {1, 1, 1}},
31+
scoreOrder: []int{2, 0, 1, 3},
32+
balanced: false,
3333
},
3434
{
35-
cpus: [][]float64{{0.25}, {}, {0.5}},
36-
scoreOrder: []int{0, 2, 1},
37-
balanceCount: 1,
35+
cpus: [][]float64{{0.25}, {}, {0.5}},
36+
scoreOrder: []int{0, 2, 1},
37+
balanced: false,
3838
},
3939
{
40-
cpus: [][]float64{{0.95, 0.92, 0.93, 0.94, 0.92, 0.94}, {0.81, 0.79, 0.82, 0.83, 0.76, 0.78}},
41-
scoreOrder: []int{1, 0},
42-
balanceCount: 1,
40+
cpus: [][]float64{{0.95, 0.92, 0.93, 0.94, 0.92, 0.94}, {0.81, 0.79, 0.82, 0.83, 0.76, 0.78}},
41+
scoreOrder: []int{1, 0},
42+
balanced: false,
4343
},
4444
{
45-
cpus: [][]float64{{0.35, 0.42, 0.37, 0.45, 0.42, 0.44}, {0.56, 0.62, 0.58, 0.57, 0.59, 0.63}},
46-
scoreOrder: []int{0, 1},
47-
balanceCount: 1,
45+
cpus: [][]float64{{0.35, 0.42, 0.37, 0.45, 0.42, 0.44}, {0.56, 0.62, 0.58, 0.57, 0.59, 0.63}},
46+
scoreOrder: []int{0, 1},
47+
balanced: false,
4848
},
4949
{
50-
cpus: [][]float64{{0, 0.1, 0, 0.1}, {0.15, 0.1, 0.15, 0.15}, {0.1, 0, 0.1, 0}},
51-
scoreOrder: []int{2, 0, 1},
52-
balanceCount: 0,
50+
cpus: [][]float64{{0, 0.1, 0, 0.1}, {0.15, 0.1, 0.15, 0.15}, {0.1, 0, 0.1, 0}},
51+
scoreOrder: []int{2, 0, 1},
52+
balanced: true,
5353
},
5454
{
55-
cpus: [][]float64{{0.5}, {}, {0.1, 0.3, 0.4}},
56-
scoreOrder: []int{2, 0, 1},
57-
balanceCount: 1,
55+
cpus: [][]float64{{0.5}, {}, {0.1, 0.3, 0.4}},
56+
scoreOrder: []int{2, 0, 1},
57+
balanced: false,
5858
},
5959
{
60-
cpus: [][]float64{{1.0}, {0.97}},
61-
scoreOrder: []int{1, 0},
62-
balanceCount: 0,
60+
cpus: [][]float64{{1.0}, {0.97}},
61+
scoreOrder: []int{1, 0},
62+
balanced: true,
6363
},
6464
{
65-
cpus: [][]float64{{0.8, 0.2, 0.8, 0.2}, {0.3, 0.5, 0.3, 0.5}},
66-
scoreOrder: []int{0, 1},
67-
balanceCount: 0,
65+
cpus: [][]float64{{0.8, 0.2, 0.8, 0.2}, {0.3, 0.5, 0.3, 0.5}},
66+
scoreOrder: []int{0, 1},
67+
balanced: true,
6868
},
6969
{
70-
cpus: [][]float64{{1.0}, {0.9}, {0.8}, {0.7}, {0.6}, {0.5}, {0.4}, {0.3}, {0.1}},
71-
scoreOrder: []int{8, 7, 6, 5, 4, 3, 2, 1, 0},
72-
balanceCount: 1,
70+
cpus: [][]float64{{1.0}, {0.9}, {0.8}, {0.7}, {0.6}, {0.5}, {0.4}, {0.3}, {0.1}},
71+
scoreOrder: []int{8, 7, 6, 5, 4, 3, 2, 1, 0},
72+
balanced: false,
7373
},
7474
}
7575

@@ -99,7 +99,7 @@ func TestCPUBalanceOnce(t *testing.T) {
9999
require.Equal(t, test.scoreOrder, sortedIdx, "test index %d", i)
100100
from, to := backends[len(backends)-1], backends[0]
101101
balanceCount := fc.BalanceCount(from, to)
102-
require.Equal(t, test.balanceCount, balanceCount, "test index %d", i)
102+
require.Equal(t, test.balanced, balanceCount < 0.0001, "test index %d", i)
103103
}
104104
}
105105

@@ -209,8 +209,9 @@ func TestCPUBalanceContinuously(t *testing.T) {
209209
if balanceCount == 0 {
210210
break
211211
}
212-
backends[len(backends)-1].BackendCtx.(*mockBackend).connScore -= balanceCount
213-
backends[0].BackendCtx.(*mockBackend).connScore += balanceCount
212+
count := int(balanceCount + 0.9999)
213+
backends[len(backends)-1].BackendCtx.(*mockBackend).connScore -= count
214+
backends[0].BackendCtx.(*mockBackend).connScore += count
214215
}
215216
connScores := make([]int, len(test.connScores))
216217
for _, backend := range backends {

pkg/balance/factor/factor_health.go

Lines changed: 22 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ import (
1616
const (
1717
errMetricExpDuration = 1 * time.Minute
1818
// balanceSeconds4Health indicates the time (in seconds) to migrate all the connections.
19-
balanceSeconds4Health = 10
19+
balanceSeconds4Health = 5.0
2020
)
2121

2222
type valueRange int
@@ -79,6 +79,8 @@ var _ Factor = (*FactorHealth)(nil)
7979
type healthBackendSnapshot struct {
8080
updatedTime monotime.Time
8181
valueRange valueRange
82+
// Record the balance count when the backend becomes unhealthy so that it won't be smaller in the next rounds.
83+
balanceCount float64
8284
}
8385

8486
type errIndicator struct {
@@ -185,9 +187,20 @@ func (fh *FactorHealth) updateSnapshot(backends []scoredBackend) {
185187
}
186188
continue
187189
}
190+
// Set balance count if the backend is unhealthy, otherwise reset it to 0.
191+
var balanceCount float64
192+
if valueRange >= valueRangeAbnormal {
193+
if existSnapshot && snapshot.balanceCount > 0.0001 {
194+
balanceCount = snapshot.balanceCount
195+
} else {
196+
balanceCount = float64(backend.ConnScore()) / balanceSeconds4Health
197+
}
198+
}
199+
188200
snapshots[addr] = healthBackendSnapshot{
189-
updatedTime: updatedTime,
190-
valueRange: valueRange,
201+
updatedTime: updatedTime,
202+
valueRange: valueRange,
203+
balanceCount: balanceCount,
191204
}
192205
}
193206
fh.snapshot = snapshots
@@ -221,39 +234,22 @@ func calcValueRange(sample *model.Sample, indicator errIndicator) valueRange {
221234
}
222235

223236
func (fh *FactorHealth) caclErrScore(addr string) int {
224-
snapshot, ok := fh.snapshot[addr]
225-
if !ok {
226-
return 2
227-
}
228-
switch snapshot.valueRange {
229-
case valueRangeNormal:
230-
return 0
231-
case valueRangeMid:
232-
return 1
233-
default:
234-
return 2
235-
}
237+
// If the backend has no metrics (not in snapshot), take it as healthy.
238+
return int(fh.snapshot[addr].valueRange)
236239
}
237240

238241
func (fh *FactorHealth) ScoreBitNum() int {
239242
return fh.bitNum
240243
}
241244

242-
func (fh *FactorHealth) BalanceCount(from, to scoredBackend) int {
245+
func (fh *FactorHealth) BalanceCount(from, to scoredBackend) float64 {
243246
// Only migrate connections when one is valueRangeNormal and the other is valueRangeAbnormal.
244247
fromScore := fh.caclErrScore(from.Addr())
245248
toScore := fh.caclErrScore(to.Addr())
246-
if fromScore-toScore > 1 {
247-
// Assuming that the source and target backends have similar connections at first.
248-
// We wish the connections to be migrated in 10 seconds but only a few are migrated in each round.
249-
// If we use from.ConnScore() / 10, the migration will be slower and slower.
250-
conns := (from.ConnScore() + to.ConnScore()) / (balanceSeconds4Health * 2)
251-
if conns > 0 {
252-
return conns
253-
}
254-
return 1
249+
if fromScore-toScore <= 1 {
250+
return 0
255251
}
256-
return 0
252+
return fh.snapshot[from.Addr()].balanceCount
257253
}
258254

259255
func (fh *FactorHealth) SetConfig(cfg *config.Config) {

0 commit comments

Comments
 (0)