Skip to content

Commit 6858c25

Browse files
authored
Merge pull request kubernetes#85861 from ingvagabund/scheduler-perf-collect-data-items-from-metrics
Collect some of scheduling metrics and scheduling throughput
2 parents a67238e + 8a1c4a5 commit 6858c25

File tree

3 files changed

+314
-2
lines changed

3 files changed

+314
-2
lines changed

test/integration/scheduler_perf/BUILD

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,10 @@ go_library(
2020
"//staging/src/k8s.io/client-go/informers/core/v1:go_default_library",
2121
"//staging/src/k8s.io/client-go/kubernetes:go_default_library",
2222
"//staging/src/k8s.io/client-go/rest:go_default_library",
23+
"//staging/src/k8s.io/component-base/metrics/legacyregistry:go_default_library",
2324
"//test/integration/util:go_default_library",
25+
"//vendor/github.com/prometheus/client_model/go:go_default_library",
26+
"//vendor/k8s.io/klog:go_default_library",
2427
],
2528
)
2629

test/integration/scheduler_perf/scheduler_perf_test.go

Lines changed: 45 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,15 @@ const (
3838
configFile = "config/performance-config.yaml"
3939
)
4040

41+
var (
42+
defaultMetrics = []string{
43+
"scheduler_scheduling_algorithm_predicate_evaluation_seconds",
44+
"scheduler_scheduling_algorithm_priority_evaluation_seconds",
45+
"scheduler_binding_duration_seconds",
46+
"scheduler_e2e_scheduling_duration_seconds",
47+
}
48+
)
49+
4150
// testCase configures a test case to run the scheduler performance test. Users should be able to
4251
// provide this via a YAML file.
4352
//
@@ -92,6 +101,7 @@ type testParams struct {
92101
}
93102

94103
func BenchmarkPerfScheduling(b *testing.B) {
104+
dataItems := DataItems{Version: "v1"}
95105
tests := getSimpleTestCases(configFile)
96106

97107
for _, test := range tests {
@@ -100,12 +110,15 @@ func BenchmarkPerfScheduling(b *testing.B) {
100110
for feature, flag := range test.FeatureGates {
101111
defer featuregatetesting.SetFeatureGateDuringTest(b, utilfeature.DefaultFeatureGate, feature, flag)()
102112
}
103-
perfScheduling(test, b)
113+
dataItems.DataItems = append(dataItems.DataItems, perfScheduling(test, b)...)
104114
})
105115
}
116+
if err := dataItems2JSONFile(dataItems, b.Name()); err != nil {
117+
klog.Fatalf("%v: unable to write measured data: %v", b.Name(), err)
118+
}
106119
}
107120

108-
func perfScheduling(test testCase, b *testing.B) {
121+
func perfScheduling(test testCase, b *testing.B) []DataItem {
109122
var nodeStrategy testutils.PrepareNodeStrategy = &testutils.TrivialNodePrepareStrategy{}
110123
if test.Nodes.NodeAllocatableStrategy != nil {
111124
nodeStrategy = test.Nodes.NodeAllocatableStrategy
@@ -180,15 +193,45 @@ func perfScheduling(test testCase, b *testing.B) {
180193

181194
// start benchmark
182195
b.ResetTimer()
196+
197+
// Start measuring throughput
198+
stopCh := make(chan struct{})
199+
throughputCollector := newThroughputCollector(podInformer)
200+
go throughputCollector.run(stopCh)
201+
202+
// Scheduling the main workload
183203
config = testutils.NewTestPodCreatorConfig()
184204
config.AddStrategy(testNamespace, test.PodsToSchedule.Num, testPodStrategy)
185205
podCreator = testutils.NewTestPodCreator(clientset, config)
186206
podCreator.CreatePods()
187207

188208
<-completedCh
209+
close(stopCh)
189210

190211
// Note: without this line we're taking the overhead of defer() into account.
191212
b.StopTimer()
213+
214+
setNameLabel := func(dataItem *DataItem) DataItem {
215+
if dataItem.Labels == nil {
216+
dataItem.Labels = map[string]string{}
217+
}
218+
dataItem.Labels["Name"] = b.Name()
219+
return *dataItem
220+
}
221+
222+
dataItems := []DataItem{
223+
setNameLabel(throughputCollector.collect()),
224+
}
225+
226+
for _, metric := range defaultMetrics {
227+
dataItem := newPrometheusCollector(metric).collect()
228+
if dataItem == nil {
229+
continue
230+
}
231+
dataItems = append(dataItems, setNameLabel(dataItem))
232+
}
233+
234+
return dataItems
192235
}
193236

194237
func getPodStrategy(pc podCase) testutils.TestPodCreateStrategy {

test/integration/scheduler_perf/util.go

Lines changed: 266 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,15 +17,34 @@ limitations under the License.
1717
package benchmark
1818

1919
import (
20+
"encoding/json"
21+
"flag"
22+
"fmt"
23+
"io/ioutil"
24+
"math"
25+
"path"
26+
"sort"
27+
"time"
28+
29+
dto "github.com/prometheus/client_model/go"
2030
v1 "k8s.io/api/core/v1"
2131
"k8s.io/apimachinery/pkg/labels"
2232
"k8s.io/apimachinery/pkg/runtime/schema"
2333
coreinformers "k8s.io/client-go/informers/core/v1"
2434
clientset "k8s.io/client-go/kubernetes"
2535
restclient "k8s.io/client-go/rest"
36+
"k8s.io/component-base/metrics/legacyregistry"
37+
"k8s.io/klog"
2638
"k8s.io/kubernetes/test/integration/util"
2739
)
2840

41+
const (
42+
dateFormat = "2006-01-02T15:04:05Z"
43+
throughputSampleFrequency = time.Second
44+
)
45+
46+
var dataItemsDir = flag.String("data-items-dir", "", "destination directory for storing generated data items for perf dashboard")
47+
2948
// mustSetupScheduler starts the following components:
3049
// - k8s api server (a.k.a. master)
3150
// - scheduler
@@ -66,3 +85,250 @@ func getScheduledPods(podInformer coreinformers.PodInformer) ([]*v1.Pod, error)
6685
}
6786
return scheduled, nil
6887
}
88+
89+
// DataItem is the data point.
90+
type DataItem struct {
91+
// Data is a map from bucket to real data point (e.g. "Perc90" -> 23.5). Notice
92+
// that all data items with the same label combination should have the same buckets.
93+
Data map[string]float64 `json:"data"`
94+
// Unit is the data unit. Notice that all data items with the same label combination
95+
// should have the same unit.
96+
Unit string `json:"unit"`
97+
// Labels is the labels of the data item.
98+
Labels map[string]string `json:"labels,omitempty"`
99+
}
100+
101+
// DataItems is the data point set. It is the struct that perf dashboard expects.
102+
type DataItems struct {
103+
Version string `json:"version"`
104+
DataItems []DataItem `json:"dataItems"`
105+
}
106+
107+
func dataItems2JSONFile(dataItems DataItems, namePrefix string) error {
108+
b, err := json.Marshal(dataItems)
109+
if err != nil {
110+
return err
111+
}
112+
113+
destFile := fmt.Sprintf("%v_%v.json", namePrefix, time.Now().Format(dateFormat))
114+
if *dataItemsDir != "" {
115+
destFile = path.Join(*dataItemsDir, destFile)
116+
}
117+
118+
return ioutil.WriteFile(destFile, b, 0644)
119+
}
120+
121+
// prometheusCollector collects metrics from legacyregistry.DefaultGatherer.Gather() endpoint.
122+
// Currently only Histrogram metrics are supported.
123+
type prometheusCollector struct {
124+
metric string
125+
cache *dto.MetricFamily
126+
}
127+
128+
func newPrometheusCollector(metric string) *prometheusCollector {
129+
return &prometheusCollector{
130+
metric: metric,
131+
}
132+
}
133+
134+
func (pc *prometheusCollector) collect() *DataItem {
135+
var metricFamily *dto.MetricFamily
136+
m, err := legacyregistry.DefaultGatherer.Gather()
137+
if err != nil {
138+
klog.Error(err)
139+
return nil
140+
}
141+
for _, mFamily := range m {
142+
if mFamily.Name != nil && *mFamily.Name == pc.metric {
143+
metricFamily = mFamily
144+
break
145+
}
146+
}
147+
148+
if metricFamily == nil {
149+
klog.Infof("Metric %q not found", pc.metric)
150+
return nil
151+
}
152+
153+
if metricFamily.GetMetric() == nil {
154+
klog.Infof("Metric %q is empty", pc.metric)
155+
return nil
156+
}
157+
158+
if len(metricFamily.GetMetric()) == 0 {
159+
klog.Infof("Metric %q is empty", pc.metric)
160+
return nil
161+
}
162+
163+
// Histograms are stored under the first index (based on observation).
164+
// Given there's only one histogram registered per each metric name, accessaing
165+
// the first index is sufficient.
166+
dataItem := pc.promHist2Summary(metricFamily.GetMetric()[0].GetHistogram())
167+
if dataItem.Data == nil {
168+
return nil
169+
}
170+
171+
// clear the metrics so that next test always starts with empty prometheus
172+
// metrics (since the metrics are shared among all tests run inside the same binary)
173+
clearPromHistogram(metricFamily.GetMetric()[0].GetHistogram())
174+
175+
return dataItem
176+
}
177+
178+
// Bucket of a histogram
179+
type bucket struct {
180+
upperBound float64
181+
count float64
182+
}
183+
184+
func bucketQuantile(q float64, buckets []bucket) float64 {
185+
if q < 0 {
186+
return math.Inf(-1)
187+
}
188+
if q > 1 {
189+
return math.Inf(+1)
190+
}
191+
192+
if len(buckets) < 2 {
193+
return math.NaN()
194+
}
195+
196+
rank := q * buckets[len(buckets)-1].count
197+
b := sort.Search(len(buckets)-1, func(i int) bool { return buckets[i].count >= rank })
198+
199+
if b == 0 {
200+
return buckets[0].upperBound * (rank / buckets[0].count)
201+
}
202+
203+
// linear approximation of b-th bucket
204+
brank := rank - buckets[b-1].count
205+
bSize := buckets[b].upperBound - buckets[b-1].upperBound
206+
bCount := buckets[b].count - buckets[b-1].count
207+
208+
return buckets[b-1].upperBound + bSize*(brank/bCount)
209+
}
210+
211+
func (pc *prometheusCollector) promHist2Summary(hist *dto.Histogram) *DataItem {
212+
buckets := []bucket{}
213+
214+
if hist.SampleCount == nil || *hist.SampleCount == 0 {
215+
return &DataItem{}
216+
}
217+
218+
if hist.SampleSum == nil || *hist.SampleSum == 0 {
219+
return &DataItem{}
220+
}
221+
222+
for _, bckt := range hist.Bucket {
223+
if bckt == nil {
224+
return &DataItem{}
225+
}
226+
if bckt.UpperBound == nil || *bckt.UpperBound < 0 {
227+
return &DataItem{}
228+
}
229+
buckets = append(buckets, bucket{
230+
count: float64(*bckt.CumulativeCount),
231+
upperBound: *bckt.UpperBound,
232+
})
233+
}
234+
235+
// bucketQuantile expects the upper bound of the last bucket to be +inf
236+
buckets[len(buckets)-1].upperBound = math.Inf(+1)
237+
238+
q50 := bucketQuantile(0.50, buckets)
239+
q90 := bucketQuantile(0.90, buckets)
240+
q99 := bucketQuantile(0.95, buckets)
241+
242+
msFactor := float64(time.Second) / float64(time.Millisecond)
243+
244+
return &DataItem{
245+
Labels: map[string]string{
246+
"Metric": pc.metric,
247+
},
248+
Data: map[string]float64{
249+
"Perc50": q50 * msFactor,
250+
"Perc90": q90 * msFactor,
251+
"Perc99": q99 * msFactor,
252+
"Average": (*hist.SampleSum / float64(*hist.SampleCount)) * msFactor,
253+
},
254+
Unit: "ms",
255+
}
256+
}
257+
258+
func clearPromHistogram(hist *dto.Histogram) {
259+
if hist.SampleCount != nil {
260+
*hist.SampleCount = 0
261+
}
262+
if hist.SampleSum != nil {
263+
*hist.SampleSum = 0
264+
}
265+
for _, b := range hist.Bucket {
266+
if b.CumulativeCount != nil {
267+
*b.CumulativeCount = 0
268+
}
269+
if b.UpperBound != nil {
270+
*b.UpperBound = 0
271+
}
272+
}
273+
}
274+
275+
type throughputCollector struct {
276+
podInformer coreinformers.PodInformer
277+
schedulingThroughputs []float64
278+
}
279+
280+
func newThroughputCollector(podInformer coreinformers.PodInformer) *throughputCollector {
281+
return &throughputCollector{
282+
podInformer: podInformer,
283+
}
284+
}
285+
286+
func (tc *throughputCollector) run(stopCh chan struct{}) {
287+
podsScheduled, err := getScheduledPods(tc.podInformer)
288+
if err != nil {
289+
klog.Fatalf("%v", err)
290+
}
291+
lastScheduledCount := len(podsScheduled)
292+
for {
293+
select {
294+
case <-stopCh:
295+
return
296+
case <-time.After(throughputSampleFrequency):
297+
podsScheduled, err := getScheduledPods(tc.podInformer)
298+
if err != nil {
299+
klog.Fatalf("%v", err)
300+
}
301+
302+
scheduled := len(podsScheduled)
303+
samplingRatioSeconds := float64(throughputSampleFrequency) / float64(time.Second)
304+
throughput := float64(scheduled-lastScheduledCount) / samplingRatioSeconds
305+
tc.schedulingThroughputs = append(tc.schedulingThroughputs, throughput)
306+
lastScheduledCount = scheduled
307+
308+
klog.Infof("%d pods scheduled", lastScheduledCount)
309+
}
310+
}
311+
}
312+
313+
func (tc *throughputCollector) collect() *DataItem {
314+
throughputSummary := &DataItem{}
315+
if length := len(tc.schedulingThroughputs); length > 0 {
316+
sort.Float64s(tc.schedulingThroughputs)
317+
sum := 0.0
318+
for i := range tc.schedulingThroughputs {
319+
sum += tc.schedulingThroughputs[i]
320+
}
321+
322+
throughputSummary.Labels = map[string]string{
323+
"Metric": "SchedulingThroughput",
324+
}
325+
throughputSummary.Data = map[string]float64{
326+
"Average": sum / float64(length),
327+
"Perc50": tc.schedulingThroughputs[int(math.Ceil(float64(length*50)/100))-1],
328+
"Perc90": tc.schedulingThroughputs[int(math.Ceil(float64(length*90)/100))-1],
329+
"Perc99": tc.schedulingThroughputs[int(math.Ceil(float64(length*99)/100))-1],
330+
}
331+
throughputSummary.Unit = "pods/s"
332+
}
333+
return throughputSummary
334+
}

0 commit comments

Comments
 (0)