Skip to content

Commit 7005cc8

Browse files
authored
Merge pull request kubernetes#3122 from yashbhutwala/history-resolution
Add history resolution and use prometheus client
2 parents 14cf68a + 846c656 commit 7005cc8

File tree

11 files changed

+921
-528
lines changed

11 files changed

+921
-528
lines changed

vertical-pod-autoscaler/go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ require (
77
github.com/golang/mock v1.3.1
88
github.com/prometheus/client_golang v0.9.2
99
github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4 // indirect
10-
github.com/prometheus/common v0.4.1 // indirect
10+
github.com/prometheus/common v0.4.1
1111
github.com/prometheus/procfs v0.0.5 // indirect
1212
github.com/stretchr/testify v1.4.0
1313
golang.org/x/crypto v0.0.0-20200221231518-2aa609cf4a9d // indirect

vertical-pod-autoscaler/pkg/recommender/input/history/history_provider.go

Lines changed: 118 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -17,23 +17,30 @@ limitations under the License.
1717
package history
1818

1919
import (
20+
"context"
2021
"fmt"
21-
"net/http"
22+
"k8s.io/klog"
2223
"sort"
2324
"strings"
2425
"time"
2526

27+
promapi "github.com/prometheus/client_golang/api"
28+
prometheusv1 "github.com/prometheus/client_golang/api/prometheus/v1"
29+
prommodel "github.com/prometheus/common/model"
30+
2631
"k8s.io/autoscaler/vertical-pod-autoscaler/pkg/recommender/model"
2732
)
2833

2934
// PrometheusHistoryProviderConfig allow to select which metrics
3035
// should be queried to get real resource utilization.
3136
type PrometheusHistoryProviderConfig struct {
32-
Address string
33-
HistoryLength, PodLabelPrefix, PodLabelsMetricName string
34-
PodNamespaceLabel, PodNameLabel string
35-
CtrNamespaceLabel, CtrPodNameLabel, CtrNameLabel string
36-
CadvisorMetricsJobName string
37+
Address string
38+
QueryTimeout time.Duration
39+
HistoryLength, HistoryResolution string
40+
PodLabelPrefix, PodLabelsMetricName string
41+
PodNamespaceLabel, PodNameLabel string
42+
CtrNamespaceLabel, CtrPodNameLabel, CtrNameLabel string
43+
CadvisorMetricsJobName string
3744
}
3845

3946
// PodHistory represents history of usage and labels for a given pod.
@@ -58,19 +65,44 @@ type HistoryProvider interface {
5865
}
5966

6067
type prometheusHistoryProvider struct {
61-
prometheusClient PrometheusClient
62-
config PrometheusHistoryProviderConfig
68+
prometheusClient prometheusv1.API
69+
config PrometheusHistoryProviderConfig
70+
queryTimeout time.Duration
71+
historyDuration prommodel.Duration
72+
historyResolution prommodel.Duration
6373
}
6474

6575
// NewPrometheusHistoryProvider contructs a history provider that gets data from Prometheus.
66-
func NewPrometheusHistoryProvider(config PrometheusHistoryProviderConfig) HistoryProvider {
67-
return &prometheusHistoryProvider{
68-
prometheusClient: NewPrometheusClient(&http.Client{}, config.Address),
69-
config: config,
76+
func NewPrometheusHistoryProvider(config PrometheusHistoryProviderConfig) (HistoryProvider, error) {
77+
promClient, err := promapi.NewClient(promapi.Config{
78+
Address: config.Address,
79+
})
80+
if err != nil {
81+
return &prometheusHistoryProvider{}, err
82+
}
83+
84+
// Use Prometheus's model.Duration; this can additionally parse durations in days, weeks and years (as well as seconds, minutes, hours etc)
85+
historyDuration, err := prommodel.ParseDuration(config.HistoryLength)
86+
if err != nil {
87+
return &prometheusHistoryProvider{}, fmt.Errorf("history length %s is not a valid Prometheus duration: %v", config.HistoryLength, err)
88+
}
89+
90+
historyResolution, err := prommodel.ParseDuration(config.HistoryResolution)
91+
if err != nil {
92+
return &prometheusHistoryProvider{}, fmt.Errorf("history resolution %s is not a valid Prometheus duration: %v", config.HistoryResolution, err)
7093
}
94+
95+
return &prometheusHistoryProvider{
96+
prometheusClient: prometheusv1.NewAPI(promClient),
97+
config: config,
98+
queryTimeout: config.QueryTimeout,
99+
historyDuration: historyDuration,
100+
historyResolution: historyResolution,
101+
}, nil
71102
}
72103

73-
func (p *prometheusHistoryProvider) getContainerIDFromLabels(labels map[string]string) (*model.ContainerID, error) {
104+
func (p *prometheusHistoryProvider) getContainerIDFromLabels(metric prommodel.Metric) (*model.ContainerID, error) {
105+
labels := promMetricToLabelMap(metric)
74106
namespace, ok := labels[p.config.CtrNamespaceLabel]
75107
if !ok {
76108
return nil, fmt.Errorf("no %s label", p.config.CtrNamespaceLabel)
@@ -86,13 +118,12 @@ func (p *prometheusHistoryProvider) getContainerIDFromLabels(labels map[string]s
86118
return &model.ContainerID{
87119
PodID: model.PodID{
88120
Namespace: namespace,
89-
PodName: podName,
90-
},
91-
ContainerName: containerName,
92-
}, nil
121+
PodName: podName},
122+
ContainerName: containerName}, nil
93123
}
94124

95-
func (p *prometheusHistoryProvider) getPodIDFromLabels(labels map[string]string) (*model.PodID, error) {
125+
func (p *prometheusHistoryProvider) getPodIDFromLabels(metric prommodel.Metric) (*model.PodID, error) {
126+
labels := promMetricToLabelMap(metric)
96127
namespace, ok := labels[p.config.PodNamespaceLabel]
97128
if !ok {
98129
return nil, fmt.Errorf("no %s label", p.config.PodNamespaceLabel)
@@ -104,17 +135,25 @@ func (p *prometheusHistoryProvider) getPodIDFromLabels(labels map[string]string)
104135
return &model.PodID{Namespace: namespace, PodName: podName}, nil
105136
}
106137

107-
func (p *prometheusHistoryProvider) getPodLabelsMap(metricLabels map[string]string) map[string]string {
138+
func (p *prometheusHistoryProvider) getPodLabelsMap(metric prommodel.Metric) map[string]string {
108139
podLabels := make(map[string]string)
109-
for key, value := range metricLabels {
110-
podLabelKey := strings.TrimPrefix(key, p.config.PodLabelPrefix)
111-
if podLabelKey != key {
112-
podLabels[podLabelKey] = value
140+
for key, value := range metric {
141+
podLabelKey := strings.TrimPrefix(string(key), p.config.PodLabelPrefix)
142+
if podLabelKey != string(key) {
143+
podLabels[podLabelKey] = string(value)
113144
}
114145
}
115146
return podLabels
116147
}
117148

149+
func promMetricToLabelMap(metric prommodel.Metric) map[string]string {
150+
labels := map[string]string{}
151+
for k, v := range metric {
152+
labels[string(k)] = string(v)
153+
}
154+
return labels
155+
}
156+
118157
func resourceAmountFromValue(value float64, resource model.ResourceName) model.ResourceAmount {
119158
// This assumes CPU value is in cores and memory in bytes, which is true
120159
// for the metrics this class queries from Prometheus.
@@ -127,29 +166,46 @@ func resourceAmountFromValue(value float64, resource model.ResourceName) model.R
127166
return model.ResourceAmount(0)
128167
}
129168

130-
func getContainerUsageSamplesFromSamples(samples []Sample, resource model.ResourceName) []model.ContainerUsageSample {
169+
func getContainerUsageSamplesFromSamples(samples []prommodel.SamplePair, resource model.ResourceName) []model.ContainerUsageSample {
131170
res := make([]model.ContainerUsageSample, 0)
132171
for _, sample := range samples {
133172
res = append(res, model.ContainerUsageSample{
134-
MeasureStart: sample.Timestamp,
135-
Usage: resourceAmountFromValue(sample.Value, resource),
136-
Resource: resource,
137-
})
173+
MeasureStart: sample.Timestamp.Time(),
174+
Usage: resourceAmountFromValue(float64(sample.Value), resource),
175+
Resource: resource})
138176
}
139177
return res
140178
}
141179

142180
func (p *prometheusHistoryProvider) readResourceHistory(res map[model.PodID]*PodHistory, query string, resource model.ResourceName) error {
143-
tss, err := p.prometheusClient.GetTimeseries(query)
181+
end := time.Now()
182+
start := end.Add(-time.Duration(p.historyDuration))
183+
184+
ctx, cancel := context.WithTimeout(context.Background(), p.queryTimeout)
185+
defer cancel()
186+
187+
result, err := p.prometheusClient.QueryRange(ctx, query, prometheusv1.Range{
188+
Start: start,
189+
End: end,
190+
Step: time.Duration(p.historyResolution),
191+
})
192+
144193
if err != nil {
145194
return fmt.Errorf("cannot get timeseries for %v: %v", resource, err)
146195
}
147-
for _, ts := range tss {
148-
containerID, err := p.getContainerIDFromLabels(ts.Labels)
196+
197+
matrix, ok := result.(prommodel.Matrix)
198+
if !ok {
199+
return fmt.Errorf("expected query to return a matrix; got result type %T", result)
200+
}
201+
202+
for _, ts := range matrix {
203+
containerID, err := p.getContainerIDFromLabels(ts.Metric)
149204
if err != nil {
150-
return fmt.Errorf("cannot get container ID from labels: %v", ts.Labels)
205+
return fmt.Errorf("cannot get container ID from labels: %v", ts.Metric)
151206
}
152-
newSamples := getContainerUsageSamplesFromSamples(ts.Samples, resource)
207+
208+
newSamples := getContainerUsageSamplesFromSamples(ts.Values, resource)
153209
podHistory, ok := res[containerID.PodID]
154210
if !ok {
155211
podHistory = newEmptyHistory()
@@ -163,26 +219,37 @@ func (p *prometheusHistoryProvider) readResourceHistory(res map[model.PodID]*Pod
163219
}
164220

165221
func (p *prometheusHistoryProvider) readLastLabels(res map[model.PodID]*PodHistory, query string) error {
166-
tss, err := p.prometheusClient.GetTimeseries(query)
222+
ctx, cancel := context.WithTimeout(context.Background(), p.queryTimeout)
223+
defer cancel()
224+
225+
result, err := p.prometheusClient.Query(ctx, query, time.Now())
167226
if err != nil {
168227
return fmt.Errorf("cannot get timeseries for labels: %v", err)
169228
}
170-
for _, ts := range tss {
171-
podID, err := p.getPodIDFromLabels(ts.Labels)
229+
230+
matrix, ok := result.(prommodel.Matrix)
231+
if !ok {
232+
return fmt.Errorf("expected query to return a matrix; got result type %T", result)
233+
}
234+
235+
for _, ts := range matrix {
236+
podID, err := p.getPodIDFromLabels(ts.Metric)
172237
if err != nil {
173-
return fmt.Errorf("cannot get container ID from labels: %v", ts.Labels)
238+
return fmt.Errorf("cannot get container ID from labels %v: %v", ts.Metric, err)
174239
}
175240
podHistory, ok := res[*podID]
176241
if !ok {
177242
podHistory = newEmptyHistory()
178243
res[*podID] = podHistory
179244
}
180-
podLabels := p.getPodLabelsMap(ts.Labels)
181-
for _, sample := range ts.Samples {
182-
if sample.Timestamp.After(podHistory.LastSeen) {
183-
podHistory.LastSeen = sample.Timestamp
184-
podHistory.LastLabels = podLabels
185-
}
245+
podLabels := p.getPodLabelsMap(ts.Metric)
246+
247+
// time series results will always be sorted chronologically from oldest to
248+
// newest, so the last element is the latest sample
249+
lastSample := ts.Values[len(ts.Values)-1]
250+
if lastSample.Timestamp.Time().After(podHistory.LastSeen) {
251+
podHistory.LastSeen = lastSample.Timestamp.Time()
252+
podHistory.LastLabels = podLabels
186253
}
187254
}
188255
return nil
@@ -197,12 +264,16 @@ func (p *prometheusHistoryProvider) GetClusterHistory() (map[model.PodID]*PodHis
197264
podSelector = podSelector + fmt.Sprintf("%s=~\".+\", %s!=\"POD\", %s!=\"\"",
198265
p.config.CtrPodNameLabel, p.config.CtrNameLabel, p.config.CtrNameLabel)
199266

200-
// This query uses Prometheus Subquery notation, to gives us a result of a five minute cpu rate by default evaluated every 1minute for last config.HistoryLength days/hours/minutes. In order to change the evaluation step, you need change Prometheus global.evaluation_interval configuration parameter.
201-
err := p.readResourceHistory(res, fmt.Sprintf("rate(container_cpu_usage_seconds_total{%s}[5m])[%s:]", podSelector, p.config.HistoryLength), model.ResourceCPU)
267+
historicalCpuQuery := fmt.Sprintf("rate(container_cpu_usage_seconds_total{%s}[%s])", podSelector, p.config.HistoryResolution)
268+
klog.V(4).Infof("Historical CPU usage query used: %s", historicalCpuQuery)
269+
err := p.readResourceHistory(res, historicalCpuQuery, model.ResourceCPU)
202270
if err != nil {
203271
return nil, fmt.Errorf("cannot get usage history: %v", err)
204272
}
205-
err = p.readResourceHistory(res, fmt.Sprintf("container_memory_working_set_bytes{%s}[%s]", podSelector, p.config.HistoryLength), model.ResourceMemory)
273+
274+
historicalMemoryQuery := fmt.Sprintf("container_memory_working_set_bytes{%s}", podSelector)
275+
klog.V(4).Infof("Historical memory usage query used: %s", historicalMemoryQuery)
276+
err = p.readResourceHistory(res, historicalMemoryQuery, model.ResourceMemory)
206277
if err != nil {
207278
return nil, fmt.Errorf("cannot get usage history: %v", err)
208279
}
@@ -211,6 +282,6 @@ func (p *prometheusHistoryProvider) GetClusterHistory() (map[model.PodID]*PodHis
211282
sort.Slice(samples, func(i, j int) bool { return samples[i].MeasureStart.Before(samples[j].MeasureStart) })
212283
}
213284
}
214-
p.readLastLabels(res, fmt.Sprintf("%s[%s]", p.config.PodLabelsMetricName, p.config.HistoryLength))
285+
p.readLastLabels(res, p.config.PodLabelsMetricName)
215286
return res, nil
216287
}

0 commit comments

Comments
 (0)