Skip to content

Commit 648dc18

Browse files
authored
refactor: optimize loading historical metrics (#394)
* refactor: make it more object-oriented * refactor: optimize loading historical metrics * fix: linter issue * fix: linter issue * fix: handle close properly
1 parent 414517e commit 648dc18

File tree

5 files changed

+112
-70
lines changed

5 files changed

+112
-70
lines changed

internal/autoscaler/autoscaler.go

Lines changed: 2 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -134,19 +134,9 @@ func (s *Autoscaler) loadWorkloads(ctx context.Context) {
134134
}
135135

136136
func (s *Autoscaler) loadHistoryMetrics(ctx context.Context) error {
137-
workersMetrics, err := s.metricsProvider.GetHistoryMetrics(ctx)
138-
if err != nil {
139-
return fmt.Errorf("failed to get history metrics: %v", err)
140-
}
141-
for _, sample := range workersMetrics {
137+
return s.metricsProvider.LoadHistoryMetrics(ctx, func(sample *metrics.WorkerUsage) {
142138
s.findOrCreateWorkloadState(sample.Namespace, sample.WorkloadName).AddSample(sample)
143-
}
144-
145-
if metricsCount := len(workersMetrics); metricsCount > 0 {
146-
log.FromContext(ctx).Info("historical metrics loaded", "from",
147-
workersMetrics[0].Timestamp, "to", workersMetrics[metricsCount-1].Timestamp, "metricsCount", metricsCount)
148-
}
149-
return nil
139+
})
150140
}
151141

152142
func (s *Autoscaler) loadRealTimeMetrics(ctx context.Context) {

internal/autoscaler/autoscaler_test.go

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -487,6 +487,28 @@ func (f *FakeMetricsProvider) GetWorkersMetrics(ctx context.Context) ([]*metrics
487487
return f.Metrics, nil
488488
}
489489

490+
func (f *FakeMetricsProvider) LoadHistoryMetrics(ctx context.Context, processMetricsFunc func(*metrics.WorkerUsage)) error {
491+
startTime := time.Now().Add(-7 * 24 * time.Hour)
492+
for day := 0; day < 7; day++ {
493+
for hour := 0; hour < 1; hour++ {
494+
for minute := 0; minute < 60; minute++ {
495+
// idx := day*24 + hour
496+
sample := &metrics.WorkerUsage{
497+
Namespace: "default",
498+
WorkloadName: "workload-0",
499+
WorkerName: fmt.Sprintf("worker-%d", 1),
500+
TflopsUsage: 100.0,
501+
VramUsage: 1 * 1000 * 1000 * 1000,
502+
Timestamp: startTime.Add(time.Duration(day*24+hour)*time.Hour + time.Duration(minute)*time.Minute),
503+
}
504+
processMetricsFunc(sample)
505+
}
506+
}
507+
}
508+
509+
return nil
510+
}
511+
490512
func (f *FakeMetricsProvider) GetHistoryMetrics(ctx context.Context) ([]*metrics.WorkerUsage, error) {
491513
sample := []*metrics.WorkerUsage{}
492514
startTime := time.Now().Add(-8 * 24 * time.Hour)

internal/autoscaler/metrics/metrics_provider.go

Lines changed: 42 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ type WorkerUsage struct {
2727
type Provider interface {
2828
GetWorkersMetrics(context.Context) ([]*WorkerUsage, error)
2929
GetHistoryMetrics(context.Context) ([]*WorkerUsage, error)
30+
LoadHistoryMetrics(context.Context, func(*WorkerUsage)) error
3031
}
3132

3233
type greptimeDBProvider struct {
@@ -47,12 +48,6 @@ func NewProvider() (Provider, error) {
4748
func (g *greptimeDBProvider) GetWorkersMetrics(ctx context.Context) ([]*WorkerUsage, error) {
4849
now := time.Now()
4950

50-
log := log.FromContext(ctx)
51-
log.V(6).Info("Started querying workers metrics", "startTime", now)
52-
defer func() {
53-
log.V(6).Info("Finished querying workers metrics", "duration", time.Since(now))
54-
}()
55-
5651
timeoutCtx, cancel := context.WithTimeout(ctx, defaultQueryTimeout)
5752
defer cancel()
5853

@@ -99,17 +94,9 @@ type hypervisorWorkerUsageMetrics struct {
9994
func (g *greptimeDBProvider) GetHistoryMetrics(ctx context.Context) ([]*WorkerUsage, error) {
10095
now := time.Now()
10196

102-
log := log.FromContext(ctx)
103-
log.V(6).Info("Started querying history metrics", "startTime", now)
104-
defer func() {
105-
log.V(6).Info("Finished querying history metrics", "duration", time.Since(now))
106-
}()
107-
10897
timeoutCtx, cancel := context.WithTimeout(ctx, defaultHistoryQueryTimeout)
10998
defer cancel()
11099

111-
// TODO: replace using iteration for handling large datasets efficiently
112-
// TODO: supply history resolution to config time window
113100
data := []*hypervisorWorkerUsageMetrics{}
114101
err := g.db.WithContext(timeoutCtx).
115102
Select("namespace, workload, worker, max(compute_tflops) as compute_tflops, max(memory_bytes) as memory_bytes, date_bin('1 minute'::INTERVAL, ts) as time_window").
@@ -140,6 +127,47 @@ func (g *greptimeDBProvider) GetHistoryMetrics(ctx context.Context) ([]*WorkerUs
140127
return workersMetrics, nil
141128
}
142129

130+
func (g *greptimeDBProvider) LoadHistoryMetrics(ctx context.Context, processMetricsFunc func(*WorkerUsage)) error {
131+
now := time.Now()
132+
133+
timeoutCtx, cancel := context.WithTimeout(ctx, defaultHistoryQueryTimeout)
134+
defer cancel()
135+
136+
rows, err := g.db.WithContext(timeoutCtx).
137+
Model(&hypervisorWorkerUsageMetrics{}).
138+
Select("namespace, workload, worker, max(compute_tflops) as compute_tflops, max(memory_bytes) as memory_bytes, date_bin('1 minute'::INTERVAL, ts) as time_window").
139+
Where("ts > ? and ts <= ?", now.Add(-time.Hour*24*7).UnixNano(), now.UnixNano()).
140+
Group("namespace, workload, worker, time_window").
141+
Order("time_window asc").
142+
Rows()
143+
if err != nil {
144+
return err
145+
}
146+
defer func() {
147+
if err := rows.Close(); err != nil {
148+
log.FromContext(ctx).Error(err, "failed to close rows")
149+
}
150+
}()
151+
152+
for rows.Next() {
153+
var usage hypervisorWorkerUsageMetrics
154+
if err := g.db.ScanRows(rows, &usage); err != nil {
155+
return err
156+
}
157+
processMetricsFunc(&WorkerUsage{
158+
Namespace: usage.Namespace,
159+
WorkloadName: usage.WorkloadName,
160+
WorkerName: usage.WorkerName,
161+
TflopsUsage: usage.ComputeTflops,
162+
VramUsage: usage.VRAMBytes,
163+
Timestamp: usage.TimeWindow,
164+
})
165+
}
166+
167+
g.lastQueryTime = now
168+
return nil
169+
}
170+
143171
// Setup GreptimeDB connection
144172
func setupTimeSeriesDB() (*metrics.TimeSeriesDB, error) {
145173
timeSeriesDB := &metrics.TimeSeriesDB{}

internal/autoscaler/recommender/recommendation_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ var _ = Describe("Recommender", func() {
4343
},
4444
}
4545

46-
final := getResourcesFromRecResults(recs)
46+
final := recommenderToRecResult(recs).generateRecommendation()
4747
Expect(final.Equal(&tfv1.Resources{
4848
Requests: tfv1.Resource{
4949
Tflops: resource.MustParse("10"),
@@ -88,7 +88,7 @@ var _ = Describe("Recommender", func() {
8888
},
8989
}
9090

91-
Expect(getResourcesFromRecResults(recs)).To(BeNil())
91+
Expect(recommenderToRecResult(recs).generateRecommendation()).To(BeNil())
9292
})
9393

9494
It("should return recommendation that replaced with the current maximum allowable GPU resource", func() {

internal/autoscaler/recommender/recommender.go

Lines changed: 44 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -20,59 +20,31 @@ type RecResult struct {
2020
ScaleDownLocking bool
2121
}
2222

23-
func GetRecommendation(ctx context.Context, workload *workload.State, recommenders []Interface) (*tfv1.Resources, error) {
24-
recResults := map[string]*RecResult{}
25-
for _, recommender := range recommenders {
26-
result, err := recommender.Recommend(ctx, workload)
27-
if err != nil {
28-
return nil, fmt.Errorf("failed to get recommendation from %s: %v", recommender.Name(), err)
29-
}
30-
if result != nil {
31-
recResults[recommender.Name()] = result
32-
}
33-
}
23+
type recommenderToRecResult map[string]*RecResult
3424

35-
if len(recResults) <= 0 {
36-
return nil, nil
37-
}
38-
39-
resources := getResourcesFromRecResults(recResults)
40-
if resources != nil {
41-
curRes := workload.GetCurrentResourcesSpec()
42-
// If a resource value is zero, replace it with current value
43-
if resources.Requests.Tflops.IsZero() || resources.Limits.Tflops.IsZero() {
44-
resources.Requests.Tflops = curRes.Requests.Tflops
45-
resources.Limits.Tflops = curRes.Limits.Tflops
46-
}
47-
48-
if resources.Requests.Vram.IsZero() || resources.Limits.Vram.IsZero() {
49-
resources.Requests.Vram = curRes.Requests.Vram
50-
resources.Limits.Vram = curRes.Limits.Vram
51-
}
25+
func (r recommenderToRecResult) generateRecommendation() *tfv1.Resources {
26+
if len(r) == 0 {
27+
return nil
5228
}
5329

54-
return resources, nil
55-
}
56-
57-
func getResourcesFromRecResults(recResults map[string]*RecResult) *tfv1.Resources {
58-
targetRes := &tfv1.Resources{}
30+
recommendation := &tfv1.Resources{}
5931
minRes := &tfv1.Resources{}
60-
for _, rec := range recResults {
61-
if !rec.HasApplied {
62-
mergeResourcesByLargerRequests(targetRes, &rec.Resources)
32+
for _, result := range r {
33+
if !result.HasApplied {
34+
mergeResourcesByLargerRequests(recommendation, &result.Resources)
6335
}
64-
if rec.ScaleDownLocking {
65-
mergeResourcesByLargerRequests(minRes, &rec.Resources)
36+
if result.ScaleDownLocking {
37+
mergeResourcesByLargerRequests(minRes, &result.Resources)
6638
}
6739
}
6840

69-
if targetRes.IsZero() ||
70-
(targetRes.Requests.Tflops.Cmp(minRes.Requests.Tflops) < 0 &&
71-
targetRes.Requests.Vram.Cmp(minRes.Requests.Vram) < 0) {
41+
if recommendation.IsZero() ||
42+
(recommendation.Requests.Tflops.Cmp(minRes.Requests.Tflops) < 0 &&
43+
recommendation.Requests.Vram.Cmp(minRes.Requests.Vram) < 0) {
7244
return nil
7345
}
7446

75-
return targetRes
47+
return recommendation
7648
}
7749

7850
func mergeResourcesByLargerRequests(src *tfv1.Resources, target *tfv1.Resources) {
@@ -85,3 +57,33 @@ func mergeResourcesByLargerRequests(src *tfv1.Resources, target *tfv1.Resources)
8557
src.Limits.Vram = target.Limits.Vram
8658
}
8759
}
60+
61+
func GetRecommendation(ctx context.Context, workload *workload.State, recommenders []Interface) (*tfv1.Resources, error) {
62+
recResults := make(recommenderToRecResult)
63+
for _, recommender := range recommenders {
64+
result, err := recommender.Recommend(ctx, workload)
65+
if err != nil {
66+
return nil, fmt.Errorf("failed to get recommendation from %s: %v", recommender.Name(), err)
67+
}
68+
if result != nil {
69+
recResults[recommender.Name()] = result
70+
}
71+
}
72+
73+
recommendation := recResults.generateRecommendation()
74+
if recommendation != nil {
75+
curRes := workload.GetCurrentResourcesSpec()
76+
// If a resource value is zero, replace it with current value
77+
if recommendation.Requests.Tflops.IsZero() || recommendation.Limits.Tflops.IsZero() {
78+
recommendation.Requests.Tflops = curRes.Requests.Tflops
79+
recommendation.Limits.Tflops = curRes.Limits.Tflops
80+
}
81+
82+
if recommendation.Requests.Vram.IsZero() || recommendation.Limits.Vram.IsZero() {
83+
recommendation.Requests.Vram = curRes.Requests.Vram
84+
recommendation.Limits.Vram = curRes.Limits.Vram
85+
}
86+
}
87+
88+
return recommendation, nil
89+
}

0 commit comments

Comments
 (0)