Skip to content

Commit 7f56c75

Browse files
committed
Make MetricCollector configurable for scheduler benchmark tests
1 parent 3d09d25 commit 7f56c75

File tree

2 files changed

+123
-108
lines changed

2 files changed

+123
-108
lines changed

test/integration/scheduler_perf/scheduler_perf_test.go

Lines changed: 83 additions & 96 deletions
Original file line numberDiff line numberDiff line change
@@ -19,13 +19,13 @@ package benchmark
1919
import (
2020
"fmt"
2121
"io/ioutil"
22-
"sync/atomic"
2322
"testing"
2423
"time"
2524

2625
v1 "k8s.io/api/core/v1"
2726
utilfeature "k8s.io/apiserver/pkg/util/feature"
28-
"k8s.io/client-go/tools/cache"
27+
coreinformers "k8s.io/client-go/informers/core/v1"
28+
clientset "k8s.io/client-go/kubernetes"
2929
"k8s.io/component-base/featuregate"
3030
featuregatetesting "k8s.io/component-base/featuregate/testing"
3131
"k8s.io/klog"
@@ -39,11 +39,13 @@ const (
3939
)
4040

4141
var (
42-
defaultMetrics = []string{
43-
"scheduler_scheduling_algorithm_predicate_evaluation_seconds",
44-
"scheduler_scheduling_algorithm_priority_evaluation_seconds",
45-
"scheduler_binding_duration_seconds",
46-
"scheduler_e2e_scheduling_duration_seconds",
42+
defaultMetricsCollectorConfig = metricsCollectorConfig{
43+
Metrics: []string{
44+
"scheduler_scheduling_algorithm_predicate_evaluation_seconds",
45+
"scheduler_scheduling_algorithm_priority_evaluation_seconds",
46+
"scheduler_binding_duration_seconds",
47+
"scheduler_e2e_scheduling_duration_seconds",
48+
},
4749
}
4850
)
4951

@@ -52,8 +54,8 @@ var (
5254
//
5355
// It specifies nodes and pods in the cluster before running the test. It also specifies the pods to
5456
// schedule during the test. The config can be as simple as just specify number of nodes/pods, where
55-
// default spec will be applied. It also allows the user to specify a pod spec template for more compicated
56-
// test cases.
57+
// default spec will be applied. It also allows the user to specify a pod spec template for more
58+
// complicated test cases.
5759
//
5860
// It also specifies the metrics to be collected after the test. If nothing is specified, default metrics
5961
// such as scheduling throughput and latencies will be collected.
@@ -68,6 +70,8 @@ type testCase struct {
6870
PodsToSchedule podCase
6971
// optional, feature gates to set before running the test
7072
FeatureGates map[featuregate.Feature]bool
73+
// optional, replaces default defaultMetricsCollectorConfig if supplied.
74+
MetricsCollectorConfig *metricsCollectorConfig
7175
}
7276

7377
type nodeCase struct {
@@ -100,6 +104,11 @@ type testParams struct {
100104
NumPodsToSchedule int
101105
}
102106

107+
type testDataCollector interface {
108+
run(stopCh chan struct{})
109+
collect() []DataItem
110+
}
111+
103112
func BenchmarkPerfScheduling(b *testing.B) {
104113
dataItems := DataItems{Version: "v1"}
105114
tests := getSimpleTestCases(configFile)
@@ -119,119 +128,97 @@ func BenchmarkPerfScheduling(b *testing.B) {
119128
}
120129

121130
func perfScheduling(test testCase, b *testing.B) []DataItem {
122-
var nodeStrategy testutils.PrepareNodeStrategy = &testutils.TrivialNodePrepareStrategy{}
123-
if test.Nodes.NodeAllocatableStrategy != nil {
124-
nodeStrategy = test.Nodes.NodeAllocatableStrategy
125-
} else if test.Nodes.LabelNodePrepareStrategy != nil {
126-
nodeStrategy = test.Nodes.LabelNodePrepareStrategy
127-
} else if test.Nodes.UniqueNodeLabelStrategy != nil {
128-
nodeStrategy = test.Nodes.UniqueNodeLabelStrategy
129-
}
130-
131-
setupPodStrategy := getPodStrategy(test.InitPods)
132-
testPodStrategy := getPodStrategy(test.PodsToSchedule)
133-
134-
var nodeSpec *v1.Node
135-
if test.Nodes.NodeTemplatePath != nil {
136-
nodeSpec = getNodeSpecFromFile(test.Nodes.NodeTemplatePath)
137-
}
138-
139131
finalFunc, podInformer, clientset := mustSetupScheduler()
140132
defer finalFunc()
141133

142-
var nodePreparer testutils.TestNodePreparer
143-
if nodeSpec != nil {
144-
nodePreparer = framework.NewIntegrationTestNodePreparerWithNodeSpec(
145-
clientset,
146-
[]testutils.CountToStrategy{{Count: test.Nodes.Num, Strategy: nodeStrategy}},
147-
nodeSpec,
148-
)
149-
} else {
150-
nodePreparer = framework.NewIntegrationTestNodePreparer(
151-
clientset,
152-
[]testutils.CountToStrategy{{Count: test.Nodes.Num, Strategy: nodeStrategy}},
153-
"scheduler-perf-",
154-
)
155-
}
156-
134+
nodePreparer := getNodePreparer(test.Nodes, clientset)
157135
if err := nodePreparer.PrepareNodes(); err != nil {
158136
klog.Fatalf("%v", err)
159137
}
160138
defer nodePreparer.CleanupNodes()
161139

162-
config := testutils.NewTestPodCreatorConfig()
163-
config.AddStrategy(setupNamespace, test.InitPods.Num, setupPodStrategy)
164-
podCreator := testutils.NewTestPodCreator(clientset, config)
165-
podCreator.CreatePods()
166-
167-
for {
168-
scheduled, err := getScheduledPods(podInformer)
169-
if err != nil {
170-
klog.Fatalf("%v", err)
171-
}
172-
if len(scheduled) >= test.InitPods.Num {
173-
break
174-
}
175-
klog.Infof("got %d existing pods, required: %d", len(scheduled), test.InitPods.Num)
176-
time.Sleep(1 * time.Second)
177-
}
178-
179-
scheduled := int32(0)
180-
completedCh := make(chan struct{})
181-
podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
182-
UpdateFunc: func(old, cur interface{}) {
183-
curPod := cur.(*v1.Pod)
184-
oldPod := old.(*v1.Pod)
185-
186-
if len(oldPod.Spec.NodeName) == 0 && len(curPod.Spec.NodeName) > 0 {
187-
if atomic.AddInt32(&scheduled, 1) >= int32(test.PodsToSchedule.Num) {
188-
completedCh <- struct{}{}
189-
}
190-
}
191-
},
192-
})
140+
createPods(setupNamespace, test.InitPods, clientset)
141+
waitNumPodsScheduled(test.InitPods.Num, podInformer)
193142

194143
// start benchmark
195144
b.ResetTimer()
196145

197-
// Start measuring throughput
146+
// Start test data collectors.
198147
stopCh := make(chan struct{})
199-
throughputCollector := newThroughputCollector(podInformer)
200-
go throughputCollector.run(stopCh)
148+
collectors := getTestDataCollectors(test, podInformer, b)
149+
for _, collector := range collectors {
150+
go collector.run(stopCh)
151+
}
201152

202-
// Scheduling the main workload
203-
config = testutils.NewTestPodCreatorConfig()
204-
config.AddStrategy(testNamespace, test.PodsToSchedule.Num, testPodStrategy)
205-
podCreator = testutils.NewTestPodCreator(clientset, config)
206-
podCreator.CreatePods()
153+
// Schedule the main workload
154+
createPods(testNamespace, test.PodsToSchedule, clientset)
155+
waitNumPodsScheduled(test.InitPods.Num+test.PodsToSchedule.Num, podInformer)
207156

208-
<-completedCh
209157
close(stopCh)
210-
211158
// Note: without this line we're taking the overhead of defer() into account.
212159
b.StopTimer()
213160

214-
setNameLabel := func(dataItem *DataItem) DataItem {
215-
if dataItem.Labels == nil {
216-
dataItem.Labels = map[string]string{}
161+
var dataItems []DataItem
162+
for _, collector := range collectors {
163+
dataItems = append(dataItems, collector.collect()...)
164+
}
165+
return dataItems
166+
}
167+
168+
func waitNumPodsScheduled(num int, podInformer coreinformers.PodInformer) {
169+
for {
170+
scheduled, err := getScheduledPods(podInformer)
171+
if err != nil {
172+
klog.Fatalf("%v", err)
173+
}
174+
if len(scheduled) >= num {
175+
break
217176
}
218-
dataItem.Labels["Name"] = b.Name()
219-
return *dataItem
177+
klog.Infof("got %d existing pods, required: %d", len(scheduled), num)
178+
time.Sleep(1 * time.Second)
220179
}
180+
}
221181

222-
dataItems := []DataItem{
223-
setNameLabel(throughputCollector.collect()),
182+
func getTestDataCollectors(tc testCase, podInformer coreinformers.PodInformer, b *testing.B) []testDataCollector {
183+
collectors := []testDataCollector{newThroughputCollector(podInformer, map[string]string{"Name": b.Name()})}
184+
metricsCollectorConfig := defaultMetricsCollectorConfig
185+
if tc.MetricsCollectorConfig != nil {
186+
metricsCollectorConfig = *tc.MetricsCollectorConfig
224187
}
188+
collectors = append(collectors, newMetricsCollector(metricsCollectorConfig, map[string]string{"Name": b.Name()}))
189+
return collectors
190+
}
225191

226-
for _, metric := range defaultMetrics {
227-
dataItem := newMetricsCollector(metric).collect()
228-
if dataItem == nil {
229-
continue
230-
}
231-
dataItems = append(dataItems, setNameLabel(dataItem))
192+
func getNodePreparer(nc nodeCase, clientset clientset.Interface) testutils.TestNodePreparer {
193+
var nodeStrategy testutils.PrepareNodeStrategy = &testutils.TrivialNodePrepareStrategy{}
194+
if nc.NodeAllocatableStrategy != nil {
195+
nodeStrategy = nc.NodeAllocatableStrategy
196+
} else if nc.LabelNodePrepareStrategy != nil {
197+
nodeStrategy = nc.LabelNodePrepareStrategy
198+
} else if nc.UniqueNodeLabelStrategy != nil {
199+
nodeStrategy = nc.UniqueNodeLabelStrategy
232200
}
233201

234-
return dataItems
202+
if nc.NodeTemplatePath != nil {
203+
return framework.NewIntegrationTestNodePreparerWithNodeSpec(
204+
clientset,
205+
[]testutils.CountToStrategy{{Count: nc.Num, Strategy: nodeStrategy}},
206+
getNodeSpecFromFile(nc.NodeTemplatePath),
207+
)
208+
}
209+
return framework.NewIntegrationTestNodePreparer(
210+
clientset,
211+
[]testutils.CountToStrategy{{Count: nc.Num, Strategy: nodeStrategy}},
212+
"scheduler-perf-",
213+
)
214+
}
215+
216+
func createPods(ns string, pc podCase, clientset clientset.Interface) {
217+
strategy := getPodStrategy(pc)
218+
config := testutils.NewTestPodCreatorConfig()
219+
config.AddStrategy(ns, pc.Num, strategy)
220+
podCreator := testutils.NewTestPodCreator(clientset, config)
221+
podCreator.CreatePods()
235222
}
236223

237224
func getPodStrategy(pc podCase) testutils.TestPodCreateStrategy {

test/integration/scheduler_perf/util.go

Lines changed: 40 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -118,20 +118,42 @@ func dataItems2JSONFile(dataItems DataItems, namePrefix string) error {
118118
return ioutil.WriteFile(destFile, b, 0644)
119119
}
120120

121+
// metricsCollectorConfig is the config to be marshalled to YAML config file.
122+
type metricsCollectorConfig struct {
123+
Metrics []string
124+
}
125+
121126
// metricsCollector collects metrics from legacyregistry.DefaultGatherer.Gather() endpoint.
122127
// Currently only Histrogram metrics are supported.
123128
type metricsCollector struct {
124-
metric string
129+
metricsCollectorConfig
130+
labels map[string]string
125131
}
126132

127-
func newMetricsCollector(metric string) *metricsCollector {
133+
func newMetricsCollector(config metricsCollectorConfig, labels map[string]string) *metricsCollector {
128134
return &metricsCollector{
129-
metric: metric,
135+
metricsCollectorConfig: config,
136+
labels: labels,
130137
}
131138
}
132139

133-
func (pc *metricsCollector) collect() *DataItem {
134-
hist, err := testutil.GetHistogramFromGatherer(legacyregistry.DefaultGatherer, pc.metric)
140+
func (*metricsCollector) run(stopCh chan struct{}) {
141+
// metricCollector doesn't need to start before the tests, so nothing to do here.
142+
}
143+
144+
func (pc *metricsCollector) collect() []DataItem {
145+
var dataItems []DataItem
146+
for _, metric := range pc.Metrics {
147+
dataItem := collectHistogram(metric, pc.labels)
148+
if dataItem != nil {
149+
dataItems = append(dataItems, *dataItem)
150+
}
151+
}
152+
return dataItems
153+
}
154+
155+
func collectHistogram(metric string, labels map[string]string) *DataItem {
156+
hist, err := testutil.GetHistogramFromGatherer(legacyregistry.DefaultGatherer, metric)
135157
if err != nil {
136158
klog.Error(err)
137159
return nil
@@ -153,10 +175,13 @@ func (pc *metricsCollector) collect() *DataItem {
153175

154176
msFactor := float64(time.Second) / float64(time.Millisecond)
155177

178+
// Copy labels and add "Metric" label for this metric.
179+
labelMap := map[string]string{"Metric": metric}
180+
for k, v := range labels {
181+
labelMap[k] = v
182+
}
156183
return &DataItem{
157-
Labels: map[string]string{
158-
"Metric": pc.metric,
159-
},
184+
Labels: labelMap,
160185
Data: map[string]float64{
161186
"Perc50": q50 * msFactor,
162187
"Perc90": q90 * msFactor,
@@ -170,11 +195,13 @@ func (pc *metricsCollector) collect() *DataItem {
170195
type throughputCollector struct {
171196
podInformer coreinformers.PodInformer
172197
schedulingThroughputs []float64
198+
labels map[string]string
173199
}
174200

175-
func newThroughputCollector(podInformer coreinformers.PodInformer) *throughputCollector {
201+
func newThroughputCollector(podInformer coreinformers.PodInformer, labels map[string]string) *throughputCollector {
176202
return &throughputCollector{
177203
podInformer: podInformer,
204+
labels: labels,
178205
}
179206
}
180207

@@ -205,8 +232,8 @@ func (tc *throughputCollector) run(stopCh chan struct{}) {
205232
}
206233
}
207234

208-
func (tc *throughputCollector) collect() *DataItem {
209-
throughputSummary := &DataItem{}
235+
func (tc *throughputCollector) collect() []DataItem {
236+
throughputSummary := DataItem{Labels: tc.labels}
210237
if length := len(tc.schedulingThroughputs); length > 0 {
211238
sort.Float64s(tc.schedulingThroughputs)
212239
sum := 0.0
@@ -225,5 +252,6 @@ func (tc *throughputCollector) collect() *DataItem {
225252
}
226253
throughputSummary.Unit = "pods/s"
227254
}
228-
return throughputSummary
255+
256+
return []DataItem{throughputSummary}
229257
}

0 commit comments

Comments
 (0)