Skip to content

Commit 8a1c4a5

Browse files
committed
Collect some of scheduling metrics and scheduling throughput
In addition to getting overall performance measurements from golang benchmark, collect metrics that provides information about insides of the scheduler itself. This is a first step towards improving what we collect about the scheduler. Metrics in question: - scheduler_scheduling_algorithm_predicate_evaluation_seconds - scheduler_scheduling_algorithm_priority_evaluation_seconds - scheduler_binding_duration_seconds - scheduler_e2e_scheduling_duration_seconds Scheduling throughput is computed on the fly inside perfScheduling.
1 parent db1990f commit 8a1c4a5

File tree

3 files changed

+314
-2
lines changed

3 files changed

+314
-2
lines changed

test/integration/scheduler_perf/BUILD

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,10 @@ go_library(
2020
"//staging/src/k8s.io/client-go/informers/core/v1:go_default_library",
2121
"//staging/src/k8s.io/client-go/kubernetes:go_default_library",
2222
"//staging/src/k8s.io/client-go/rest:go_default_library",
23+
"//staging/src/k8s.io/component-base/metrics/legacyregistry:go_default_library",
2324
"//test/integration/util:go_default_library",
25+
"//vendor/github.com/prometheus/client_model/go:go_default_library",
26+
"//vendor/k8s.io/klog:go_default_library",
2427
],
2528
)
2629

test/integration/scheduler_perf/scheduler_perf_test.go

Lines changed: 45 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,15 @@ const (
3838
configFile = "config/performance-config.yaml"
3939
)
4040

41+
var (
42+
defaultMetrics = []string{
43+
"scheduler_scheduling_algorithm_predicate_evaluation_seconds",
44+
"scheduler_scheduling_algorithm_priority_evaluation_seconds",
45+
"scheduler_binding_duration_seconds",
46+
"scheduler_e2e_scheduling_duration_seconds",
47+
}
48+
)
49+
4150
// testCase configures a test case to run the scheduler performance test. Users should be able to
4251
// provide this via a YAML file.
4352
//
@@ -92,6 +101,7 @@ type testParams struct {
92101
}
93102

94103
func BenchmarkPerfScheduling(b *testing.B) {
104+
dataItems := DataItems{Version: "v1"}
95105
tests := getSimpleTestCases(configFile)
96106

97107
for _, test := range tests {
@@ -100,12 +110,15 @@ func BenchmarkPerfScheduling(b *testing.B) {
100110
for feature, flag := range test.FeatureGates {
101111
defer featuregatetesting.SetFeatureGateDuringTest(b, utilfeature.DefaultFeatureGate, feature, flag)()
102112
}
103-
perfScheduling(test, b)
113+
dataItems.DataItems = append(dataItems.DataItems, perfScheduling(test, b)...)
104114
})
105115
}
116+
if err := dataItems2JSONFile(dataItems, b.Name()); err != nil {
117+
klog.Fatalf("%v: unable to write measured data: %v", b.Name(), err)
118+
}
106119
}
107120

108-
func perfScheduling(test testCase, b *testing.B) {
121+
func perfScheduling(test testCase, b *testing.B) []DataItem {
109122
var nodeStrategy testutils.PrepareNodeStrategy = &testutils.TrivialNodePrepareStrategy{}
110123
if test.Nodes.NodeAllocatableStrategy != nil {
111124
nodeStrategy = test.Nodes.NodeAllocatableStrategy
@@ -180,15 +193,45 @@ func perfScheduling(test testCase, b *testing.B) {
180193

181194
// start benchmark
182195
b.ResetTimer()
196+
197+
// Start measuring throughput
198+
stopCh := make(chan struct{})
199+
throughputCollector := newThroughputCollector(podInformer)
200+
go throughputCollector.run(stopCh)
201+
202+
// Scheduling the main workload
183203
config = testutils.NewTestPodCreatorConfig()
184204
config.AddStrategy(testNamespace, test.PodsToSchedule.Num, testPodStrategy)
185205
podCreator = testutils.NewTestPodCreator(clientset, config)
186206
podCreator.CreatePods()
187207

188208
<-completedCh
209+
close(stopCh)
189210

190211
// Note: without this line we're taking the overhead of defer() into account.
191212
b.StopTimer()
213+
214+
setNameLabel := func(dataItem *DataItem) DataItem {
215+
if dataItem.Labels == nil {
216+
dataItem.Labels = map[string]string{}
217+
}
218+
dataItem.Labels["Name"] = b.Name()
219+
return *dataItem
220+
}
221+
222+
dataItems := []DataItem{
223+
setNameLabel(throughputCollector.collect()),
224+
}
225+
226+
for _, metric := range defaultMetrics {
227+
dataItem := newPrometheusCollector(metric).collect()
228+
if dataItem == nil {
229+
continue
230+
}
231+
dataItems = append(dataItems, setNameLabel(dataItem))
232+
}
233+
234+
return dataItems
192235
}
193236

194237
func getPodStrategy(pc podCase) testutils.TestPodCreateStrategy {

test/integration/scheduler_perf/util.go

Lines changed: 266 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,15 +17,34 @@ limitations under the License.
1717
package benchmark
1818

1919
import (
20+
"encoding/json"
21+
"flag"
22+
"fmt"
23+
"io/ioutil"
24+
"math"
25+
"path"
26+
"sort"
27+
"time"
28+
29+
dto "github.com/prometheus/client_model/go"
2030
v1 "k8s.io/api/core/v1"
2131
"k8s.io/apimachinery/pkg/labels"
2232
"k8s.io/apimachinery/pkg/runtime/schema"
2333
coreinformers "k8s.io/client-go/informers/core/v1"
2434
clientset "k8s.io/client-go/kubernetes"
2535
restclient "k8s.io/client-go/rest"
36+
"k8s.io/component-base/metrics/legacyregistry"
37+
"k8s.io/klog"
2638
"k8s.io/kubernetes/test/integration/util"
2739
)
2840

41+
const (
42+
dateFormat = "2006-01-02T15:04:05Z"
43+
throughputSampleFrequency = time.Second
44+
)
45+
46+
var dataItemsDir = flag.String("data-items-dir", "", "destination directory for storing generated data items for perf dashboard")
47+
2948
// mustSetupScheduler starts the following components:
3049
// - k8s api server (a.k.a. master)
3150
// - scheduler
@@ -66,3 +85,250 @@ func getScheduledPods(podInformer coreinformers.PodInformer) ([]*v1.Pod, error)
6685
}
6786
return scheduled, nil
6887
}
88+
89+
// DataItem is the data point.
90+
type DataItem struct {
91+
// Data is a map from bucket to real data point (e.g. "Perc90" -> 23.5). Notice
92+
// that all data items with the same label combination should have the same buckets.
93+
Data map[string]float64 `json:"data"`
94+
// Unit is the data unit. Notice that all data items with the same label combination
95+
// should have the same unit.
96+
Unit string `json:"unit"`
97+
// Labels is the labels of the data item.
98+
Labels map[string]string `json:"labels,omitempty"`
99+
}
100+
101+
// DataItems is the data point set. It is the struct that perf dashboard expects.
102+
type DataItems struct {
103+
Version string `json:"version"`
104+
DataItems []DataItem `json:"dataItems"`
105+
}
106+
107+
func dataItems2JSONFile(dataItems DataItems, namePrefix string) error {
108+
b, err := json.Marshal(dataItems)
109+
if err != nil {
110+
return err
111+
}
112+
113+
destFile := fmt.Sprintf("%v_%v.json", namePrefix, time.Now().Format(dateFormat))
114+
if *dataItemsDir != "" {
115+
destFile = path.Join(*dataItemsDir, destFile)
116+
}
117+
118+
return ioutil.WriteFile(destFile, b, 0644)
119+
}
120+
121+
// prometheusCollector collects metrics from legacyregistry.DefaultGatherer.Gather() endpoint.
122+
// Currently only Histrogram metrics are supported.
123+
type prometheusCollector struct {
124+
metric string
125+
cache *dto.MetricFamily
126+
}
127+
128+
func newPrometheusCollector(metric string) *prometheusCollector {
129+
return &prometheusCollector{
130+
metric: metric,
131+
}
132+
}
133+
134+
func (pc *prometheusCollector) collect() *DataItem {
135+
var metricFamily *dto.MetricFamily
136+
m, err := legacyregistry.DefaultGatherer.Gather()
137+
if err != nil {
138+
klog.Error(err)
139+
return nil
140+
}
141+
for _, mFamily := range m {
142+
if mFamily.Name != nil && *mFamily.Name == pc.metric {
143+
metricFamily = mFamily
144+
break
145+
}
146+
}
147+
148+
if metricFamily == nil {
149+
klog.Infof("Metric %q not found", pc.metric)
150+
return nil
151+
}
152+
153+
if metricFamily.GetMetric() == nil {
154+
klog.Infof("Metric %q is empty", pc.metric)
155+
return nil
156+
}
157+
158+
if len(metricFamily.GetMetric()) == 0 {
159+
klog.Infof("Metric %q is empty", pc.metric)
160+
return nil
161+
}
162+
163+
// Histograms are stored under the first index (based on observation).
164+
// Given there's only one histogram registered per each metric name, accessaing
165+
// the first index is sufficient.
166+
dataItem := pc.promHist2Summary(metricFamily.GetMetric()[0].GetHistogram())
167+
if dataItem.Data == nil {
168+
return nil
169+
}
170+
171+
// clear the metrics so that next test always starts with empty prometheus
172+
// metrics (since the metrics are shared among all tests run inside the same binary)
173+
clearPromHistogram(metricFamily.GetMetric()[0].GetHistogram())
174+
175+
return dataItem
176+
}
177+
178+
// Bucket of a histogram
179+
type bucket struct {
180+
upperBound float64
181+
count float64
182+
}
183+
184+
func bucketQuantile(q float64, buckets []bucket) float64 {
185+
if q < 0 {
186+
return math.Inf(-1)
187+
}
188+
if q > 1 {
189+
return math.Inf(+1)
190+
}
191+
192+
if len(buckets) < 2 {
193+
return math.NaN()
194+
}
195+
196+
rank := q * buckets[len(buckets)-1].count
197+
b := sort.Search(len(buckets)-1, func(i int) bool { return buckets[i].count >= rank })
198+
199+
if b == 0 {
200+
return buckets[0].upperBound * (rank / buckets[0].count)
201+
}
202+
203+
// linear approximation of b-th bucket
204+
brank := rank - buckets[b-1].count
205+
bSize := buckets[b].upperBound - buckets[b-1].upperBound
206+
bCount := buckets[b].count - buckets[b-1].count
207+
208+
return buckets[b-1].upperBound + bSize*(brank/bCount)
209+
}
210+
211+
func (pc *prometheusCollector) promHist2Summary(hist *dto.Histogram) *DataItem {
212+
buckets := []bucket{}
213+
214+
if hist.SampleCount == nil || *hist.SampleCount == 0 {
215+
return &DataItem{}
216+
}
217+
218+
if hist.SampleSum == nil || *hist.SampleSum == 0 {
219+
return &DataItem{}
220+
}
221+
222+
for _, bckt := range hist.Bucket {
223+
if bckt == nil {
224+
return &DataItem{}
225+
}
226+
if bckt.UpperBound == nil || *bckt.UpperBound < 0 {
227+
return &DataItem{}
228+
}
229+
buckets = append(buckets, bucket{
230+
count: float64(*bckt.CumulativeCount),
231+
upperBound: *bckt.UpperBound,
232+
})
233+
}
234+
235+
// bucketQuantile expects the upper bound of the last bucket to be +inf
236+
buckets[len(buckets)-1].upperBound = math.Inf(+1)
237+
238+
q50 := bucketQuantile(0.50, buckets)
239+
q90 := bucketQuantile(0.90, buckets)
240+
q99 := bucketQuantile(0.95, buckets)
241+
242+
msFactor := float64(time.Second) / float64(time.Millisecond)
243+
244+
return &DataItem{
245+
Labels: map[string]string{
246+
"Metric": pc.metric,
247+
},
248+
Data: map[string]float64{
249+
"Perc50": q50 * msFactor,
250+
"Perc90": q90 * msFactor,
251+
"Perc99": q99 * msFactor,
252+
"Average": (*hist.SampleSum / float64(*hist.SampleCount)) * msFactor,
253+
},
254+
Unit: "ms",
255+
}
256+
}
257+
258+
func clearPromHistogram(hist *dto.Histogram) {
259+
if hist.SampleCount != nil {
260+
*hist.SampleCount = 0
261+
}
262+
if hist.SampleSum != nil {
263+
*hist.SampleSum = 0
264+
}
265+
for _, b := range hist.Bucket {
266+
if b.CumulativeCount != nil {
267+
*b.CumulativeCount = 0
268+
}
269+
if b.UpperBound != nil {
270+
*b.UpperBound = 0
271+
}
272+
}
273+
}
274+
275+
type throughputCollector struct {
276+
podInformer coreinformers.PodInformer
277+
schedulingThroughputs []float64
278+
}
279+
280+
func newThroughputCollector(podInformer coreinformers.PodInformer) *throughputCollector {
281+
return &throughputCollector{
282+
podInformer: podInformer,
283+
}
284+
}
285+
286+
func (tc *throughputCollector) run(stopCh chan struct{}) {
287+
podsScheduled, err := getScheduledPods(tc.podInformer)
288+
if err != nil {
289+
klog.Fatalf("%v", err)
290+
}
291+
lastScheduledCount := len(podsScheduled)
292+
for {
293+
select {
294+
case <-stopCh:
295+
return
296+
case <-time.After(throughputSampleFrequency):
297+
podsScheduled, err := getScheduledPods(tc.podInformer)
298+
if err != nil {
299+
klog.Fatalf("%v", err)
300+
}
301+
302+
scheduled := len(podsScheduled)
303+
samplingRatioSeconds := float64(throughputSampleFrequency) / float64(time.Second)
304+
throughput := float64(scheduled-lastScheduledCount) / samplingRatioSeconds
305+
tc.schedulingThroughputs = append(tc.schedulingThroughputs, throughput)
306+
lastScheduledCount = scheduled
307+
308+
klog.Infof("%d pods scheduled", lastScheduledCount)
309+
}
310+
}
311+
}
312+
313+
func (tc *throughputCollector) collect() *DataItem {
314+
throughputSummary := &DataItem{}
315+
if length := len(tc.schedulingThroughputs); length > 0 {
316+
sort.Float64s(tc.schedulingThroughputs)
317+
sum := 0.0
318+
for i := range tc.schedulingThroughputs {
319+
sum += tc.schedulingThroughputs[i]
320+
}
321+
322+
throughputSummary.Labels = map[string]string{
323+
"Metric": "SchedulingThroughput",
324+
}
325+
throughputSummary.Data = map[string]float64{
326+
"Average": sum / float64(length),
327+
"Perc50": tc.schedulingThroughputs[int(math.Ceil(float64(length*50)/100))-1],
328+
"Perc90": tc.schedulingThroughputs[int(math.Ceil(float64(length*90)/100))-1],
329+
"Perc99": tc.schedulingThroughputs[int(math.Ceil(float64(length*99)/100))-1],
330+
}
331+
throughputSummary.Unit = "pods/s"
332+
}
333+
return throughputSummary
334+
}

0 commit comments

Comments
 (0)