Skip to content

Commit 6ded721

Browse files
authored
Merge pull request kubernetes#127496 from macsko/add_metricscollectionop_to_scheduler_perf
Add separate ops for collecting metrics from multiple namespaces in scheduler_perf
2 parents 5973acc + a273e53 commit 6ded721

File tree

1 file changed

+137
-51
lines changed

1 file changed

+137
-51
lines changed

test/integration/scheduler_perf/scheduler_perf.go

Lines changed: 137 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -72,17 +72,19 @@ import (
7272
type operationCode string
7373

7474
const (
75-
createAnyOpcode operationCode = "createAny"
76-
createNodesOpcode operationCode = "createNodes"
77-
createNamespacesOpcode operationCode = "createNamespaces"
78-
createPodsOpcode operationCode = "createPods"
79-
createPodSetsOpcode operationCode = "createPodSets"
80-
deletePodsOpcode operationCode = "deletePods"
81-
createResourceClaimsOpcode operationCode = "createResourceClaims"
82-
createResourceDriverOpcode operationCode = "createResourceDriver"
83-
churnOpcode operationCode = "churn"
84-
barrierOpcode operationCode = "barrier"
85-
sleepOpcode operationCode = "sleep"
75+
createAnyOpcode operationCode = "createAny"
76+
createNodesOpcode operationCode = "createNodes"
77+
createNamespacesOpcode operationCode = "createNamespaces"
78+
createPodsOpcode operationCode = "createPods"
79+
createPodSetsOpcode operationCode = "createPodSets"
80+
deletePodsOpcode operationCode = "deletePods"
81+
createResourceClaimsOpcode operationCode = "createResourceClaims"
82+
createResourceDriverOpcode operationCode = "createResourceDriver"
83+
churnOpcode operationCode = "churn"
84+
barrierOpcode operationCode = "barrier"
85+
sleepOpcode operationCode = "sleep"
86+
startCollectingMetricsOpcode operationCode = "startCollectingMetrics"
87+
stopCollectingMetricsOpcode operationCode = "stopCollectingMetrics"
8688
)
8789

8890
const (
@@ -414,6 +416,8 @@ func (op *op) UnmarshalJSON(b []byte) error {
414416
&churnOp{},
415417
&barrierOp{},
416418
&sleepOp{},
419+
&startCollectingMetricsOp{},
420+
&stopCollectingMetricsOp{},
417421
// TODO(#94601): add a delete nodes op to simulate scaling behaviour?
418422
}
419423
var firstError error
@@ -815,6 +819,58 @@ func (so sleepOp) patchParams(_ *workload) (realOp, error) {
815819
return &so, nil
816820
}
817821

822+
// startCollectingMetricsOp defines an op that starts metrics collectors.
823+
// stopCollectingMetricsOp has to be used after this op to finish collecting.
824+
type startCollectingMetricsOp struct {
825+
// Must be "startCollectingMetrics".
826+
Opcode operationCode
827+
// Name appended to workload's name in results.
828+
Name string
829+
// Namespaces for which the scheduling throughput metric is calculated.
830+
Namespaces []string
831+
}
832+
833+
func (scm *startCollectingMetricsOp) isValid(_ bool) error {
834+
if scm.Opcode != startCollectingMetricsOpcode {
835+
return fmt.Errorf("invalid opcode %q; expected %q", scm.Opcode, startCollectingMetricsOpcode)
836+
}
837+
if len(scm.Namespaces) == 0 {
838+
return fmt.Errorf("namespaces cannot be empty")
839+
}
840+
return nil
841+
}
842+
843+
func (*startCollectingMetricsOp) collectsMetrics() bool {
844+
return false
845+
}
846+
847+
func (scm startCollectingMetricsOp) patchParams(_ *workload) (realOp, error) {
848+
return &scm, nil
849+
}
850+
851+
// stopCollectingMetricsOp defines an op that stops collecting the metrics
852+
// and writes them into the result slice.
853+
// startCollectingMetricsOp has be used before this op to begin collecting.
854+
type stopCollectingMetricsOp struct {
855+
// Must be "stopCollectingMetrics".
856+
Opcode operationCode
857+
}
858+
859+
func (scm *stopCollectingMetricsOp) isValid(_ bool) error {
860+
if scm.Opcode != stopCollectingMetricsOpcode {
861+
return fmt.Errorf("invalid opcode %q; expected %q", scm.Opcode, stopCollectingMetricsOpcode)
862+
}
863+
return nil
864+
}
865+
866+
func (*stopCollectingMetricsOp) collectsMetrics() bool {
867+
return true
868+
}
869+
870+
func (scm stopCollectingMetricsOp) patchParams(_ *workload) (realOp, error) {
871+
return &scm, nil
872+
}
873+
818874
var useTestingLog = flag.Bool("use-testing-log", false, "Write log entries with testing.TB.Log. This is more suitable for unit testing and debugging, but less realistic in real benchmarks.")
819875

820876
func initTestOutput(tb testing.TB) io.Writer {
@@ -1110,6 +1166,46 @@ func checkEmptyInFlightEvents() error {
11101166
return nil
11111167
}
11121168

1169+
func startCollectingMetrics(tCtx ktesting.TContext, collectorWG *sync.WaitGroup, podInformer coreinformers.PodInformer, mcc *metricsCollectorConfig, throughputErrorMargin float64, opIndex int, name string, namespaces []string) (ktesting.TContext, []testDataCollector) {
1170+
collectorCtx := ktesting.WithCancel(tCtx)
1171+
workloadName := tCtx.Name()
1172+
// The first part is the same for each workload, therefore we can strip it.
1173+
workloadName = workloadName[strings.Index(name, "/")+1:]
1174+
collectors := getTestDataCollectors(podInformer, fmt.Sprintf("%s/%s", workloadName, name), namespaces, mcc, throughputErrorMargin)
1175+
for _, collector := range collectors {
1176+
// Need loop-local variable for function below.
1177+
collector := collector
1178+
err := collector.init()
1179+
if err != nil {
1180+
tCtx.Fatalf("op %d: Failed to initialize data collector: %v", opIndex, err)
1181+
}
1182+
collectorWG.Add(1)
1183+
go func() {
1184+
defer collectorWG.Done()
1185+
collector.run(collectorCtx)
1186+
}()
1187+
}
1188+
return collectorCtx, collectors
1189+
}
1190+
1191+
func stopCollectingMetrics(tCtx ktesting.TContext, collectorCtx ktesting.TContext, collectorWG *sync.WaitGroup, threshold float64, tms thresholdMetricSelector, opIndex int, collectors []testDataCollector) []DataItem {
1192+
if collectorCtx == nil {
1193+
tCtx.Fatalf("op %d: Missing startCollectingMetrics operation before stopping", opIndex)
1194+
}
1195+
collectorCtx.Cancel("collecting metrics, collector must stop first")
1196+
collectorWG.Wait()
1197+
var dataItems []DataItem
1198+
for _, collector := range collectors {
1199+
items := collector.collect()
1200+
dataItems = append(dataItems, items...)
1201+
err := compareMetricWithThreshold(items, threshold, tms)
1202+
if err != nil {
1203+
tCtx.Errorf("op %d: %s", opIndex, err)
1204+
}
1205+
}
1206+
return dataItems
1207+
}
1208+
11131209
func runWorkload(tCtx ktesting.TContext, tc *testCase, w *workload, informerFactory informers.SharedInformerFactory) []DataItem {
11141210
b, benchmarking := tCtx.TB().(*testing.B)
11151211
if benchmarking {
@@ -1145,13 +1241,20 @@ func runWorkload(tCtx ktesting.TContext, tc *testCase, w *workload, informerFact
11451241
defer wg.Wait()
11461242
defer tCtx.Cancel("workload is done")
11471243

1148-
var mu sync.Mutex
11491244
var dataItems []DataItem
11501245
nextNodeIndex := 0
11511246
// numPodsScheduledPerNamespace has all namespaces created in workload and the number of pods they (will) have.
11521247
// All namespaces listed in numPodsScheduledPerNamespace will be cleaned up.
11531248
numPodsScheduledPerNamespace := make(map[string]int)
11541249

1250+
var collectors []testDataCollector
1251+
// This needs a separate context and wait group because
1252+
// the metrics collecting needs to be sure that the goroutines
1253+
// are stopped.
1254+
var collectorCtx ktesting.TContext
1255+
var collectorWG sync.WaitGroup
1256+
defer collectorWG.Wait()
1257+
11551258
for opIndex, op := range unrollWorkloadTemplate(tCtx, tc.WorkloadTemplate, w) {
11561259
realOp, err := op.realOp.patchParams(w)
11571260
if err != nil {
@@ -1204,34 +1307,13 @@ func runWorkload(tCtx ktesting.TContext, tc *testCase, w *workload, informerFact
12041307
if concreteOp.PodTemplatePath == nil {
12051308
concreteOp.PodTemplatePath = tc.DefaultPodTemplatePath
12061309
}
1207-
var collectors []testDataCollector
1208-
// This needs a separate context and wait group because
1209-
// the code below needs to be sure that the goroutines
1210-
// are stopped.
1211-
var collectorCtx ktesting.TContext
1212-
var collectorWG sync.WaitGroup
1213-
defer collectorWG.Wait()
12141310

12151311
if concreteOp.CollectMetrics {
1216-
collectorCtx = ktesting.WithCancel(tCtx)
1217-
defer collectorCtx.Cancel("cleaning up")
1218-
name := tCtx.Name()
1219-
// The first part is the same for each work load, therefore we can strip it.
1220-
name = name[strings.Index(name, "/")+1:]
1221-
collectors = getTestDataCollectors(podInformer, fmt.Sprintf("%s/%s", name, namespace), namespace, tc.MetricsCollectorConfig, throughputErrorMargin)
1222-
for _, collector := range collectors {
1223-
// Need loop-local variable for function below.
1224-
collector := collector
1225-
err = collector.init()
1226-
if err != nil {
1227-
tCtx.Fatalf("op %d: Failed to initialize data collector: %v", opIndex, err)
1228-
}
1229-
collectorWG.Add(1)
1230-
go func() {
1231-
defer collectorWG.Done()
1232-
collector.run(collectorCtx)
1233-
}()
1312+
if collectorCtx != nil {
1313+
tCtx.Fatalf("op %d: Metrics collection is overlapping. Probably second collector was started before stopping a previous one", opIndex)
12341314
}
1315+
collectorCtx, collectors = startCollectingMetrics(tCtx, &collectorWG, podInformer, tc.MetricsCollectorConfig, throughputErrorMargin, opIndex, namespace, []string{namespace})
1316+
defer collectorCtx.Cancel("cleaning up")
12351317
}
12361318
if err := createPods(tCtx, namespace, concreteOp); err != nil {
12371319
tCtx.Fatalf("op %d: %v", opIndex, err)
@@ -1249,18 +1331,9 @@ func runWorkload(tCtx ktesting.TContext, tc *testCase, w *workload, informerFact
12491331
// CollectMetrics and SkipWaitToCompletion can never be true at the
12501332
// same time, so if we're here, it means that all pods have been
12511333
// scheduled.
1252-
collectorCtx.Cancel("collecting metrix, collector must stop first")
1253-
collectorWG.Wait()
1254-
mu.Lock()
1255-
for _, collector := range collectors {
1256-
items := collector.collect()
1257-
dataItems = append(dataItems, items...)
1258-
err := compareMetricWithThreshold(items, w.Threshold, *w.ThresholdMetricSelector)
1259-
if err != nil {
1260-
tCtx.Errorf("op %d: %s", opIndex, err)
1261-
}
1262-
}
1263-
mu.Unlock()
1334+
items := stopCollectingMetrics(tCtx, collectorCtx, &collectorWG, w.Threshold, *w.ThresholdMetricSelector, opIndex, collectors)
1335+
dataItems = append(dataItems, items...)
1336+
collectorCtx = nil
12641337
}
12651338

12661339
case *deletePodsOp:
@@ -1440,6 +1513,19 @@ func runWorkload(tCtx ktesting.TContext, tc *testCase, w *workload, informerFact
14401513
case <-tCtx.Done():
14411514
case <-time.After(concreteOp.Duration):
14421515
}
1516+
1517+
case *startCollectingMetricsOp:
1518+
if collectorCtx != nil {
1519+
tCtx.Fatalf("op %d: Metrics collection is overlapping. Probably second collector was started before stopping a previous one", opIndex)
1520+
}
1521+
collectorCtx, collectors = startCollectingMetrics(tCtx, &collectorWG, podInformer, tc.MetricsCollectorConfig, throughputErrorMargin, opIndex, concreteOp.Name, concreteOp.Namespaces)
1522+
defer collectorCtx.Cancel("cleaning up")
1523+
1524+
case *stopCollectingMetricsOp:
1525+
items := stopCollectingMetrics(tCtx, collectorCtx, &collectorWG, w.Threshold, *w.ThresholdMetricSelector, opIndex, collectors)
1526+
dataItems = append(dataItems, items...)
1527+
collectorCtx = nil
1528+
14431529
default:
14441530
runable, ok := concreteOp.(runnableOp)
14451531
if !ok {
@@ -1481,12 +1567,12 @@ type testDataCollector interface {
14811567
collect() []DataItem
14821568
}
14831569

1484-
func getTestDataCollectors(podInformer coreinformers.PodInformer, name, namespace string, mcc *metricsCollectorConfig, throughputErrorMargin float64) []testDataCollector {
1570+
func getTestDataCollectors(podInformer coreinformers.PodInformer, name string, namespaces []string, mcc *metricsCollectorConfig, throughputErrorMargin float64) []testDataCollector {
14851571
if mcc == nil {
14861572
mcc = &defaultMetricsCollectorConfig
14871573
}
14881574
return []testDataCollector{
1489-
newThroughputCollector(podInformer, map[string]string{"Name": name}, []string{namespace}, throughputErrorMargin),
1575+
newThroughputCollector(podInformer, map[string]string{"Name": name}, namespaces, throughputErrorMargin),
14901576
newMetricsCollector(mcc, map[string]string{"Name": name}),
14911577
}
14921578
}

0 commit comments

Comments
 (0)