@@ -72,17 +72,19 @@ import (
72
72
type operationCode string
73
73
74
74
const (
75
- createAnyOpcode operationCode = "createAny"
76
- createNodesOpcode operationCode = "createNodes"
77
- createNamespacesOpcode operationCode = "createNamespaces"
78
- createPodsOpcode operationCode = "createPods"
79
- createPodSetsOpcode operationCode = "createPodSets"
80
- deletePodsOpcode operationCode = "deletePods"
81
- createResourceClaimsOpcode operationCode = "createResourceClaims"
82
- createResourceDriverOpcode operationCode = "createResourceDriver"
83
- churnOpcode operationCode = "churn"
84
- barrierOpcode operationCode = "barrier"
85
- sleepOpcode operationCode = "sleep"
75
+ createAnyOpcode operationCode = "createAny"
76
+ createNodesOpcode operationCode = "createNodes"
77
+ createNamespacesOpcode operationCode = "createNamespaces"
78
+ createPodsOpcode operationCode = "createPods"
79
+ createPodSetsOpcode operationCode = "createPodSets"
80
+ deletePodsOpcode operationCode = "deletePods"
81
+ createResourceClaimsOpcode operationCode = "createResourceClaims"
82
+ createResourceDriverOpcode operationCode = "createResourceDriver"
83
+ churnOpcode operationCode = "churn"
84
+ barrierOpcode operationCode = "barrier"
85
+ sleepOpcode operationCode = "sleep"
86
+ startCollectingMetricsOpcode operationCode = "startCollectingMetrics"
87
+ stopCollectingMetricsOpcode operationCode = "stopCollectingMetrics"
86
88
)
87
89
88
90
const (
@@ -414,6 +416,8 @@ func (op *op) UnmarshalJSON(b []byte) error {
414
416
& churnOp {},
415
417
& barrierOp {},
416
418
& sleepOp {},
419
+ & startCollectingMetricsOp {},
420
+ & stopCollectingMetricsOp {},
417
421
// TODO(#94601): add a delete nodes op to simulate scaling behaviour?
418
422
}
419
423
var firstError error
@@ -815,6 +819,58 @@ func (so sleepOp) patchParams(_ *workload) (realOp, error) {
815
819
return & so , nil
816
820
}
817
821
822
+ // startCollectingMetricsOp defines an op that starts metrics collectors.
823
+ // stopCollectingMetricsOp has to be used after this op to finish collecting.
824
+ type startCollectingMetricsOp struct {
825
+ // Must be "startCollectingMetrics".
826
+ Opcode operationCode
827
+ // Name appended to workload's name in results.
828
+ Name string
829
+ // Namespaces for which the scheduling throughput metric is calculated.
830
+ Namespaces []string
831
+ }
832
+
833
+ func (scm * startCollectingMetricsOp ) isValid (_ bool ) error {
834
+ if scm .Opcode != startCollectingMetricsOpcode {
835
+ return fmt .Errorf ("invalid opcode %q; expected %q" , scm .Opcode , startCollectingMetricsOpcode )
836
+ }
837
+ if len (scm .Namespaces ) == 0 {
838
+ return fmt .Errorf ("namespaces cannot be empty" )
839
+ }
840
+ return nil
841
+ }
842
+
843
+ func (* startCollectingMetricsOp ) collectsMetrics () bool {
844
+ return false
845
+ }
846
+
847
+ func (scm startCollectingMetricsOp ) patchParams (_ * workload ) (realOp , error ) {
848
+ return & scm , nil
849
+ }
850
+
851
+ // stopCollectingMetricsOp defines an op that stops collecting the metrics
852
+ // and writes them into the result slice.
853
+ // startCollectingMetricsOp has be used before this op to begin collecting.
854
+ type stopCollectingMetricsOp struct {
855
+ // Must be "stopCollectingMetrics".
856
+ Opcode operationCode
857
+ }
858
+
859
+ func (scm * stopCollectingMetricsOp ) isValid (_ bool ) error {
860
+ if scm .Opcode != stopCollectingMetricsOpcode {
861
+ return fmt .Errorf ("invalid opcode %q; expected %q" , scm .Opcode , stopCollectingMetricsOpcode )
862
+ }
863
+ return nil
864
+ }
865
+
866
+ func (* stopCollectingMetricsOp ) collectsMetrics () bool {
867
+ return true
868
+ }
869
+
870
+ func (scm stopCollectingMetricsOp ) patchParams (_ * workload ) (realOp , error ) {
871
+ return & scm , nil
872
+ }
873
+
818
874
var useTestingLog = flag .Bool ("use-testing-log" , false , "Write log entries with testing.TB.Log. This is more suitable for unit testing and debugging, but less realistic in real benchmarks." )
819
875
820
876
func initTestOutput (tb testing.TB ) io.Writer {
@@ -1110,6 +1166,46 @@ func checkEmptyInFlightEvents() error {
1110
1166
return nil
1111
1167
}
1112
1168
1169
+ func startCollectingMetrics (tCtx ktesting.TContext , collectorWG * sync.WaitGroup , podInformer coreinformers.PodInformer , mcc * metricsCollectorConfig , throughputErrorMargin float64 , opIndex int , name string , namespaces []string ) (ktesting.TContext , []testDataCollector ) {
1170
+ collectorCtx := ktesting .WithCancel (tCtx )
1171
+ workloadName := tCtx .Name ()
1172
+ // The first part is the same for each workload, therefore we can strip it.
1173
+ workloadName = workloadName [strings .Index (name , "/" )+ 1 :]
1174
+ collectors := getTestDataCollectors (podInformer , fmt .Sprintf ("%s/%s" , workloadName , name ), namespaces , mcc , throughputErrorMargin )
1175
+ for _ , collector := range collectors {
1176
+ // Need loop-local variable for function below.
1177
+ collector := collector
1178
+ err := collector .init ()
1179
+ if err != nil {
1180
+ tCtx .Fatalf ("op %d: Failed to initialize data collector: %v" , opIndex , err )
1181
+ }
1182
+ collectorWG .Add (1 )
1183
+ go func () {
1184
+ defer collectorWG .Done ()
1185
+ collector .run (collectorCtx )
1186
+ }()
1187
+ }
1188
+ return collectorCtx , collectors
1189
+ }
1190
+
1191
+ func stopCollectingMetrics (tCtx ktesting.TContext , collectorCtx ktesting.TContext , collectorWG * sync.WaitGroup , threshold float64 , tms thresholdMetricSelector , opIndex int , collectors []testDataCollector ) []DataItem {
1192
+ if collectorCtx == nil {
1193
+ tCtx .Fatalf ("op %d: Missing startCollectingMetrics operation before stopping" , opIndex )
1194
+ }
1195
+ collectorCtx .Cancel ("collecting metrics, collector must stop first" )
1196
+ collectorWG .Wait ()
1197
+ var dataItems []DataItem
1198
+ for _ , collector := range collectors {
1199
+ items := collector .collect ()
1200
+ dataItems = append (dataItems , items ... )
1201
+ err := compareMetricWithThreshold (items , threshold , tms )
1202
+ if err != nil {
1203
+ tCtx .Errorf ("op %d: %s" , opIndex , err )
1204
+ }
1205
+ }
1206
+ return dataItems
1207
+ }
1208
+
1113
1209
func runWorkload (tCtx ktesting.TContext , tc * testCase , w * workload , informerFactory informers.SharedInformerFactory ) []DataItem {
1114
1210
b , benchmarking := tCtx .TB ().(* testing.B )
1115
1211
if benchmarking {
@@ -1145,13 +1241,20 @@ func runWorkload(tCtx ktesting.TContext, tc *testCase, w *workload, informerFact
1145
1241
defer wg .Wait ()
1146
1242
defer tCtx .Cancel ("workload is done" )
1147
1243
1148
- var mu sync.Mutex
1149
1244
var dataItems []DataItem
1150
1245
nextNodeIndex := 0
1151
1246
// numPodsScheduledPerNamespace has all namespaces created in workload and the number of pods they (will) have.
1152
1247
// All namespaces listed in numPodsScheduledPerNamespace will be cleaned up.
1153
1248
numPodsScheduledPerNamespace := make (map [string ]int )
1154
1249
1250
+ var collectors []testDataCollector
1251
+ // This needs a separate context and wait group because
1252
+ // the metrics collecting needs to be sure that the goroutines
1253
+ // are stopped.
1254
+ var collectorCtx ktesting.TContext
1255
+ var collectorWG sync.WaitGroup
1256
+ defer collectorWG .Wait ()
1257
+
1155
1258
for opIndex , op := range unrollWorkloadTemplate (tCtx , tc .WorkloadTemplate , w ) {
1156
1259
realOp , err := op .realOp .patchParams (w )
1157
1260
if err != nil {
@@ -1204,34 +1307,13 @@ func runWorkload(tCtx ktesting.TContext, tc *testCase, w *workload, informerFact
1204
1307
if concreteOp .PodTemplatePath == nil {
1205
1308
concreteOp .PodTemplatePath = tc .DefaultPodTemplatePath
1206
1309
}
1207
- var collectors []testDataCollector
1208
- // This needs a separate context and wait group because
1209
- // the code below needs to be sure that the goroutines
1210
- // are stopped.
1211
- var collectorCtx ktesting.TContext
1212
- var collectorWG sync.WaitGroup
1213
- defer collectorWG .Wait ()
1214
1310
1215
1311
if concreteOp .CollectMetrics {
1216
- collectorCtx = ktesting .WithCancel (tCtx )
1217
- defer collectorCtx .Cancel ("cleaning up" )
1218
- name := tCtx .Name ()
1219
- // The first part is the same for each work load, therefore we can strip it.
1220
- name = name [strings .Index (name , "/" )+ 1 :]
1221
- collectors = getTestDataCollectors (podInformer , fmt .Sprintf ("%s/%s" , name , namespace ), namespace , tc .MetricsCollectorConfig , throughputErrorMargin )
1222
- for _ , collector := range collectors {
1223
- // Need loop-local variable for function below.
1224
- collector := collector
1225
- err = collector .init ()
1226
- if err != nil {
1227
- tCtx .Fatalf ("op %d: Failed to initialize data collector: %v" , opIndex , err )
1228
- }
1229
- collectorWG .Add (1 )
1230
- go func () {
1231
- defer collectorWG .Done ()
1232
- collector .run (collectorCtx )
1233
- }()
1312
+ if collectorCtx != nil {
1313
+ tCtx .Fatalf ("op %d: Metrics collection is overlapping. Probably second collector was started before stopping a previous one" , opIndex )
1234
1314
}
1315
+ collectorCtx , collectors = startCollectingMetrics (tCtx , & collectorWG , podInformer , tc .MetricsCollectorConfig , throughputErrorMargin , opIndex , namespace , []string {namespace })
1316
+ defer collectorCtx .Cancel ("cleaning up" )
1235
1317
}
1236
1318
if err := createPods (tCtx , namespace , concreteOp ); err != nil {
1237
1319
tCtx .Fatalf ("op %d: %v" , opIndex , err )
@@ -1249,18 +1331,9 @@ func runWorkload(tCtx ktesting.TContext, tc *testCase, w *workload, informerFact
1249
1331
// CollectMetrics and SkipWaitToCompletion can never be true at the
1250
1332
// same time, so if we're here, it means that all pods have been
1251
1333
// scheduled.
1252
- collectorCtx .Cancel ("collecting metrix, collector must stop first" )
1253
- collectorWG .Wait ()
1254
- mu .Lock ()
1255
- for _ , collector := range collectors {
1256
- items := collector .collect ()
1257
- dataItems = append (dataItems , items ... )
1258
- err := compareMetricWithThreshold (items , w .Threshold , * w .ThresholdMetricSelector )
1259
- if err != nil {
1260
- tCtx .Errorf ("op %d: %s" , opIndex , err )
1261
- }
1262
- }
1263
- mu .Unlock ()
1334
+ items := stopCollectingMetrics (tCtx , collectorCtx , & collectorWG , w .Threshold , * w .ThresholdMetricSelector , opIndex , collectors )
1335
+ dataItems = append (dataItems , items ... )
1336
+ collectorCtx = nil
1264
1337
}
1265
1338
1266
1339
case * deletePodsOp :
@@ -1440,6 +1513,19 @@ func runWorkload(tCtx ktesting.TContext, tc *testCase, w *workload, informerFact
1440
1513
case <- tCtx .Done ():
1441
1514
case <- time .After (concreteOp .Duration ):
1442
1515
}
1516
+
1517
+ case * startCollectingMetricsOp :
1518
+ if collectorCtx != nil {
1519
+ tCtx .Fatalf ("op %d: Metrics collection is overlapping. Probably second collector was started before stopping a previous one" , opIndex )
1520
+ }
1521
+ collectorCtx , collectors = startCollectingMetrics (tCtx , & collectorWG , podInformer , tc .MetricsCollectorConfig , throughputErrorMargin , opIndex , concreteOp .Name , concreteOp .Namespaces )
1522
+ defer collectorCtx .Cancel ("cleaning up" )
1523
+
1524
+ case * stopCollectingMetricsOp :
1525
+ items := stopCollectingMetrics (tCtx , collectorCtx , & collectorWG , w .Threshold , * w .ThresholdMetricSelector , opIndex , collectors )
1526
+ dataItems = append (dataItems , items ... )
1527
+ collectorCtx = nil
1528
+
1443
1529
default :
1444
1530
runable , ok := concreteOp .(runnableOp )
1445
1531
if ! ok {
@@ -1481,12 +1567,12 @@ type testDataCollector interface {
1481
1567
collect () []DataItem
1482
1568
}
1483
1569
1484
- func getTestDataCollectors (podInformer coreinformers.PodInformer , name , namespace string , mcc * metricsCollectorConfig , throughputErrorMargin float64 ) []testDataCollector {
1570
+ func getTestDataCollectors (podInformer coreinformers.PodInformer , name string , namespaces [] string , mcc * metricsCollectorConfig , throughputErrorMargin float64 ) []testDataCollector {
1485
1571
if mcc == nil {
1486
1572
mcc = & defaultMetricsCollectorConfig
1487
1573
}
1488
1574
return []testDataCollector {
1489
- newThroughputCollector (podInformer , map [string ]string {"Name" : name }, [] string { namespace } , throughputErrorMargin ),
1575
+ newThroughputCollector (podInformer , map [string ]string {"Name" : name }, namespaces , throughputErrorMargin ),
1490
1576
newMetricsCollector (mcc , map [string ]string {"Name" : name }),
1491
1577
}
1492
1578
}
0 commit comments