Skip to content

Commit 7c6ad51

Browse files
authored
factor: query Prometheus more frequently to update metrics timely (#563)
1 parent 8ea04a7 commit 7c6ad51

File tree

6 files changed

+93
-41
lines changed

6 files changed

+93
-41
lines changed

lib/config/namespace.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ const (
3232
healthCheckMaxRetries = 3
3333
healthCheckRetryInterval = 1 * time.Second
3434
healthCheckTimeout = 2 * time.Second
35-
readMetricsInterval = 15 * time.Second
35+
readMetricsInterval = 5 * time.Second
3636
readMetricsTimeout = 3 * time.Second
3737
)
3838

pkg/balance/factor/factor_cpu.go

Lines changed: 16 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,8 @@ var (
3737
)
3838

3939
type cpuBackendSnapshot struct {
40-
updatedTime monotime.Time
40+
// the latest time in the query result
41+
updatedTime time.Time
4142
// smoothed CPU usage, used to decide whether to migrate
4243
avgUsage float64
4344
// timely CPU usage, used to score and decide the balance count
@@ -105,21 +106,26 @@ func (fc *FactorCPU) updateSnapshot(qr metricsreader.QueryResult, backends []sco
105106
// The backend will be in the next round if it's healthy.
106107
pairs := qr.GetSamplePair4Backend(backend)
107108
if len(pairs) > 0 {
108-
avgUsage, latestUsage := calcAvgUsage(pairs)
109-
if avgUsage >= 0 {
110-
snapshots[addr] = cpuBackendSnapshot{
111-
avgUsage: avgUsage,
112-
latestUsage: latestUsage,
113-
connCount: backend.ConnCount(),
114-
updatedTime: qr.UpdateTime,
109+
updateTime := time.UnixMilli(int64(pairs[len(pairs)-1].Timestamp))
110+
// The time point of updating each backend is different, so only partial of the backends are updated every time.
111+
// If this backend is not updated, ignore it.
112+
if snapshot, ok := fc.snapshot[addr]; !ok || snapshot.updatedTime.Before(updateTime) {
113+
avgUsage, latestUsage := calcAvgUsage(pairs)
114+
if avgUsage >= 0 {
115+
snapshots[addr] = cpuBackendSnapshot{
116+
avgUsage: avgUsage,
117+
latestUsage: latestUsage,
118+
connCount: backend.ConnCount(),
119+
updatedTime: updateTime,
120+
}
121+
valid = true
115122
}
116-
valid = true
117123
}
118124
}
119125
// Merge the old snapshot just in case some metrics have missed for a short period.
120126
if !valid {
121127
if snapshot, ok := fc.snapshot[addr]; ok {
122-
if monotime.Since(snapshot.updatedTime) < cpuMetricExpDuration {
128+
if time.Since(snapshot.updatedTime) < cpuMetricExpDuration {
123129
snapshots[addr] = snapshot
124130
}
125131
}

pkg/balance/factor/factor_cpu_test.go

Lines changed: 53 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"math"
88
"strconv"
99
"testing"
10+
"time"
1011

1112
"github.com/pingcap/tiproxy/pkg/balance/metricsreader"
1213
"github.com/pingcap/tiproxy/pkg/util/monotime"
@@ -77,7 +78,7 @@ func TestCPUBalanceOnce(t *testing.T) {
7778
values := make([]*model.SampleStream, 0, len(test.cpus))
7879
for j := 0; j < len(test.cpus); j++ {
7980
backends = append(backends, createBackend(j, 0, 0))
80-
values = append(values, createSampleStream(test.cpus[j], j))
81+
values = append(values, createSampleStream(test.cpus[j], j, model.Now()))
8182
}
8283
mmr := &mockMetricsReader{
8384
qrs: map[uint64]metricsreader.QueryResult{
@@ -185,13 +186,15 @@ func TestCPUBalanceContinuously(t *testing.T) {
185186

186187
mmr := newMockMetricsReader()
187188
fc := NewFactorCPU(mmr)
189+
curTime := model.Now().Add(-10 * time.Second)
188190
for i, test := range tests {
189191
backends := make([]scoredBackend, 0, len(test.cpus))
190192
values := make([]*model.SampleStream, 0, len(test.cpus))
191193
for j := 0; j < len(test.cpus); j++ {
192194
backends = append(backends, createBackend(j, test.connCounts[j], test.connScores[j]))
193-
values = append(values, createSampleStream(test.cpus[j], j))
195+
values = append(values, createSampleStream(test.cpus[j], j, curTime))
194196
}
197+
curTime = curTime.Add(time.Millisecond)
195198
mmr.qrs[1] = metricsreader.QueryResult{
196199
UpdateTime: monotime.Now(),
197200
Value: model.Matrix(values),
@@ -221,38 +224,74 @@ func TestCPUBalanceContinuously(t *testing.T) {
221224

222225
func TestNoCPUMetric(t *testing.T) {
223226
tests := []struct {
224-
cpus []float64
227+
cpus [][]float64
225228
updateTime monotime.Time
226229
}{
227230
{
228231
cpus: nil,
229232
},
230233
{
231-
cpus: []float64{1.0, 0.0},
232-
updateTime: monotime.Now().Sub(cpuMetricExpDuration * 2),
234+
cpus: [][]float64{{1.0}, {0.0}},
235+
updateTime: monotime.Now().Add(-cpuMetricExpDuration * 2),
233236
},
234237
{
235-
cpus: []float64{math.NaN(), math.NaN()},
238+
cpus: [][]float64{{math.NaN()}, {math.NaN()}},
236239
updateTime: monotime.Now(),
237240
},
238241
}
239-
240242
mmr := newMockMetricsReader()
241243
fc := NewFactorCPU(mmr)
242-
backends := make([]scoredBackend, 0, 2)
243-
for i := 0; i < 2; i++ {
244-
backends = append(backends, createBackend(i, i*100, i*100))
245-
}
244+
backends := []scoredBackend{createBackend(0, 0, 0), createBackend(1, 0, 0)}
246245
for i, test := range tests {
247-
values := make([]*model.Sample, 0, len(test.cpus))
246+
values := make([]*model.SampleStream, 0, len(test.cpus))
248247
for j := 0; j < len(test.cpus); j++ {
249-
values = append(values, createSample(test.cpus[j], j))
248+
ss := createSampleStream(test.cpus[j], j, model.Time(test.updateTime/monotime.Time(time.Millisecond)))
249+
values = append(values, ss)
250250
}
251251
mmr.qrs[1] = metricsreader.QueryResult{
252252
UpdateTime: test.updateTime,
253-
Value: model.Vector(values),
253+
Value: model.Matrix(values),
254254
}
255255
updateScore(fc, backends)
256256
require.Equal(t, backends[0].score(), backends[1].score(), "test index %d", i)
257257
}
258258
}
259+
260+
func TestCPUResultNotUpdated(t *testing.T) {
261+
now := model.Now()
262+
tests := []struct {
263+
cpu float64
264+
updateTime model.Time
265+
expectedScore int
266+
}{
267+
{
268+
cpu: 0.1,
269+
updateTime: now,
270+
expectedScore: 2,
271+
},
272+
{
273+
cpu: 0.1,
274+
updateTime: now,
275+
expectedScore: 2,
276+
},
277+
{
278+
cpu: 0.3,
279+
updateTime: now.Add(time.Second),
280+
expectedScore: 6,
281+
},
282+
}
283+
284+
mmr := newMockMetricsReader()
285+
fc := NewFactorCPU(mmr)
286+
backends := []scoredBackend{createBackend(0, 0, 0), createBackend(1, 0, 0)}
287+
for i, test := range tests {
288+
array := []float64{test.cpu}
289+
values := []*model.SampleStream{createSampleStream(array, 0, test.updateTime), createSampleStream(array, 1, test.updateTime)}
290+
mmr.qrs[1] = metricsreader.QueryResult{
291+
UpdateTime: monotime.Now(),
292+
Value: model.Matrix(values),
293+
}
294+
updateScore(fc, backends)
295+
require.EqualValues(t, test.expectedScore, backends[0].score(), "test index %d", i)
296+
}
297+
}

pkg/balance/factor/factor_memory.go

Lines changed: 13 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ var (
5151
)
5252

5353
type memBackendSnapshot struct {
54-
updatedTime monotime.Time
54+
updatedTime time.Time
5555
memUsage float64
5656
timeToOOM time.Duration
5757
}
@@ -121,20 +121,24 @@ func (fm *FactorMemory) updateSnapshot(qr metricsreader.QueryResult, backends []
121121
// The backend will be in the next round if it's healthy.
122122
pairs := qr.GetSamplePair4Backend(backend)
123123
if len(pairs) > 0 {
124-
latestUsage, timeToOOM := calcMemUsage(pairs)
125-
if latestUsage >= 0 {
126-
snapshots[addr] = memBackendSnapshot{
127-
updatedTime: qr.UpdateTime,
128-
memUsage: latestUsage,
129-
timeToOOM: timeToOOM,
124+
updateTime := time.UnixMilli(int64(pairs[len(pairs)-1].Timestamp))
125+
// If this backend is not updated, ignore it.
126+
if snapshot, ok := fm.snapshot[addr]; !ok || snapshot.updatedTime.Before(updateTime) {
127+
latestUsage, timeToOOM := calcMemUsage(pairs)
128+
if latestUsage >= 0 {
129+
snapshots[addr] = memBackendSnapshot{
130+
updatedTime: updateTime,
131+
memUsage: latestUsage,
132+
timeToOOM: timeToOOM,
133+
}
134+
valid = true
130135
}
131-
valid = true
132136
}
133137
}
134138
// Merge the old snapshot just in case some metrics have missed for a short period.
135139
if !valid {
136140
if snapshot, ok := fm.snapshot[addr]; ok {
137-
if monotime.Since(snapshot.updatedTime) < memMetricExpDuration {
141+
if time.Since(snapshot.updatedTime) < memMetricExpDuration {
138142
snapshots[addr] = snapshot
139143
}
140144
}

pkg/balance/factor/factor_memory_test.go

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"math"
88
"sort"
99
"testing"
10+
"time"
1011

1112
"github.com/pingcap/tiproxy/pkg/balance/metricsreader"
1213
"github.com/pingcap/tiproxy/pkg/util/monotime"
@@ -89,7 +90,7 @@ func TestMemoryScore(t *testing.T) {
8990
values := make([]*model.SampleStream, 0, len(tests))
9091
for i, test := range tests {
9192
backends = append(backends, createBackend(i, 0, 0))
92-
values = append(values, createSampleStream(test.memory, i))
93+
values = append(values, createSampleStream(test.memory, i, model.Now()))
9394
}
9495
mmr := &mockMetricsReader{
9596
qrs: map[uint64]metricsreader.QueryResult{
@@ -154,7 +155,7 @@ func TestMemoryBalance(t *testing.T) {
154155
values := make([]*model.SampleStream, 0, len(test.memory))
155156
for j := 0; j < len(test.memory); j++ {
156157
backends = append(backends, createBackend(j, 0, 0))
157-
values = append(values, createSampleStream(test.memory[j], j))
158+
values = append(values, createSampleStream(test.memory[j], j, model.Now()))
158159
}
159160
mmr := &mockMetricsReader{
160161
qrs: map[uint64]metricsreader.QueryResult{
@@ -207,7 +208,9 @@ func TestNoMemMetrics(t *testing.T) {
207208
for i, test := range tests {
208209
values := make([]*model.SampleStream, 0, len(test.mem))
209210
for j := 0; j < len(test.mem); j++ {
210-
values = append(values, createSampleStream(test.mem[j], j))
211+
ss := createSampleStream(test.mem[j], j, model.Now())
212+
ss.Values[0].Timestamp = model.Time(test.updateTime / monotime.Time(time.Millisecond))
213+
values = append(values, ss)
211214
}
212215
mmr.qrs[1] = metricsreader.QueryResult{
213216
UpdateTime: test.updateTime,
@@ -267,8 +270,8 @@ func TestMemoryBalanceCount(t *testing.T) {
267270
backend1 = []float64{0.9, 0.9}
268271
}
269272
values := []*model.SampleStream{
270-
createSampleStream(backend1, 0),
271-
createSampleStream([]float64{0, 0}, 1),
273+
createSampleStream(backend1, 0, model.Now()),
274+
createSampleStream([]float64{0, 0}, 1, model.Now()),
272275
}
273276
mmr := &mockMetricsReader{
274277
qrs: map[uint64]metricsreader.QueryResult{

pkg/balance/factor/mock_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -144,12 +144,12 @@ func createBackend(backendIdx, connCount, connScore int) scoredBackend {
144144
}
145145
}
146146

147-
func createSampleStream(values []float64, backendIdx int) *model.SampleStream {
147+
func createSampleStream(values []float64, backendIdx int, curTime model.Time) *model.SampleStream {
148148
host := strconv.Itoa(backendIdx)
149149
labelSet := model.Metric{metricsreader.LabelNameInstance: model.LabelValue(host + ":10080")}
150150
pairs := make([]model.SamplePair, 0, len(values))
151151
for i, cpu := range values {
152-
ts := model.Time(time.Now().UnixMilli() - int64(15000*(len(values)-i)))
152+
ts := curTime.Add(15 * time.Second * time.Duration(i-len(values)))
153153
pairs = append(pairs, model.SamplePair{Timestamp: ts, Value: model.SampleValue(cpu)})
154154
}
155155
return &model.SampleStream{Metric: labelSet, Values: pairs}

0 commit comments

Comments
 (0)