@@ -52,10 +52,13 @@ import (
52
52
featuregatetesting "k8s.io/component-base/featuregate/testing"
53
53
logsapi "k8s.io/component-base/logs/api/v1"
54
54
"k8s.io/component-base/metrics/legacyregistry"
55
+ "k8s.io/component-base/metrics/testutil"
55
56
"k8s.io/klog/v2"
57
+ "k8s.io/kubernetes/pkg/features"
56
58
"k8s.io/kubernetes/pkg/scheduler/apis/config"
57
59
"k8s.io/kubernetes/pkg/scheduler/apis/config/scheme"
58
60
"k8s.io/kubernetes/pkg/scheduler/apis/config/validation"
61
+ schedframework "k8s.io/kubernetes/pkg/scheduler/framework"
59
62
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/names"
60
63
frameworkruntime "k8s.io/kubernetes/pkg/scheduler/framework/runtime"
61
64
"k8s.io/kubernetes/pkg/scheduler/metrics"
@@ -927,6 +930,13 @@ func RunBenchmarkPerfScheduling(b *testing.B, outOfTreePluginRegistry frameworkr
927
930
}
928
931
}
929
932
933
+ if tc .FeatureGates [features .SchedulerQueueingHints ] {
934
+ // In any case, we should make sure InFlightEvents is empty after running the scenario.
935
+ if err = checkEmptyInFlightEvents (); err != nil {
936
+ tCtx .Errorf ("%s: %s" , w .Name , err )
937
+ }
938
+ }
939
+
930
940
// Reset metrics to prevent metrics generated in current workload gets
931
941
// carried over to the next workload.
932
942
legacyregistry .Reset ()
@@ -1027,6 +1037,23 @@ func compareMetricWithThreshold(items []DataItem, threshold float64, metricSelec
1027
1037
return nil
1028
1038
}
1029
1039
1040
+ func checkEmptyInFlightEvents () error {
1041
+ labels := []string {metrics .PodPoppedInFlightEvent }
1042
+ for _ , event := range schedframework .AllEvents {
1043
+ labels = append (labels , event .Label )
1044
+ }
1045
+ for _ , label := range labels {
1046
+ value , err := testutil .GetGaugeMetricValue (metrics .InFlightEvents .WithLabelValues (label ))
1047
+ if err != nil {
1048
+ return fmt .Errorf ("failed to get InFlightEvents metric for label %s" , label )
1049
+ }
1050
+ if value > 0 {
1051
+ return fmt .Errorf ("InFlightEvents for label %s should be empty, but has %v items" , label , value )
1052
+ }
1053
+ }
1054
+ return nil
1055
+ }
1056
+
1030
1057
func runWorkload (tCtx ktesting.TContext , tc * testCase , w * workload , informerFactory informers.SharedInformerFactory ) []DataItem {
1031
1058
b , benchmarking := tCtx .TB ().(* testing.B )
1032
1059
if benchmarking {
@@ -1139,7 +1166,10 @@ func runWorkload(tCtx ktesting.TContext, tc *testCase, w *workload, informerFact
1139
1166
for _ , collector := range collectors {
1140
1167
// Need loop-local variable for function below.
1141
1168
collector := collector
1142
- collector .init ()
1169
+ err = collector .init ()
1170
+ if err != nil {
1171
+ tCtx .Fatalf ("op %d: Failed to initialize data collector: %v" , opIndex , err )
1172
+ }
1143
1173
collectorWG .Add (1 )
1144
1174
go func () {
1145
1175
defer collectorWG .Done ()
@@ -1205,13 +1235,6 @@ func runWorkload(tCtx ktesting.TContext, tc *testCase, w *workload, informerFact
1205
1235
}()
1206
1236
}
1207
1237
1208
- if ! concreteOp .SkipWaitToCompletion {
1209
- // SkipWaitToCompletion=false indicates this step has waited for the Pods to be scheduled.
1210
- // So we reset the metrics in global registry; otherwise metrics gathered in this step
1211
- // will be carried over to next step.
1212
- legacyregistry .Reset ()
1213
- }
1214
-
1215
1238
case * churnOp :
1216
1239
var namespace string
1217
1240
if concreteOp .Namespace != nil {
@@ -1376,7 +1399,7 @@ func createNamespaceIfNotPresent(tCtx ktesting.TContext, namespace string, podsP
1376
1399
}
1377
1400
1378
1401
type testDataCollector interface {
1379
- init ()
1402
+ init () error
1380
1403
run (tCtx ktesting.TContext )
1381
1404
collect () []DataItem
1382
1405
}
0 commit comments