Skip to content

Commit e85d3ba

Browse files
committed
DRA scheduler: fix re-scheduling after ResourceSlice changes
Making unschedulable pods schedulable again after ResourceSlice cluster events was accidentally left out when adding structured parameters to Kubernetes 1.30. All E2E tests were defined so that a driver starts first. A new test with a different order (create pod first, wait for unschedulable, start driver) triggered the bug and now passes.
1 parent 6dd2ade commit e85d3ba

File tree

6 files changed

+210
-87
lines changed

6 files changed

+210
-87
lines changed

pkg/scheduler/eventhandlers.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -529,6 +529,15 @@ func addAllEventHandlers(
529529
)
530530
handlers = append(handlers, handlerRegistration)
531531
}
532+
case framework.ResourceSlice:
533+
if utilfeature.DefaultFeatureGate.Enabled(features.DynamicResourceAllocation) {
534+
if handlerRegistration, err = informerFactory.Resource().V1alpha3().ResourceSlices().Informer().AddEventHandler(
535+
buildEvtResHandler(at, framework.ResourceSlice, "ResourceSlice"),
536+
); err != nil {
537+
return err
538+
}
539+
handlers = append(handlers, handlerRegistration)
540+
}
532541
case framework.DeviceClass:
533542
if utilfeature.DefaultFeatureGate.Enabled(features.DynamicResourceAllocation) {
534543
if handlerRegistration, err = informerFactory.Resource().V1alpha3().DeviceClasses().Informer().AddEventHandler(

pkg/scheduler/eventhandlers_test.go

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -216,6 +216,7 @@ func TestAddAllEventHandlers(t *testing.T) {
216216
name string
217217
gvkMap map[framework.GVK]framework.ActionType
218218
enableDRA bool
219+
enableClassicDRA bool
219220
expectStaticInformers map[reflect.Type]bool
220221
expectDynamicInformers map[schema.GroupVersionResource]bool
221222
}{
@@ -234,6 +235,7 @@ func TestAddAllEventHandlers(t *testing.T) {
234235
gvkMap: map[framework.GVK]framework.ActionType{
235236
framework.PodSchedulingContext: framework.Add,
236237
framework.ResourceClaim: framework.Add,
238+
framework.ResourceSlice: framework.Add,
237239
framework.DeviceClass: framework.Add,
238240
},
239241
expectStaticInformers: map[reflect.Type]bool{
@@ -244,19 +246,41 @@ func TestAddAllEventHandlers(t *testing.T) {
244246
expectDynamicInformers: map[schema.GroupVersionResource]bool{},
245247
},
246248
{
247-
name: "DRA events enabled",
249+
name: "some DRA events enabled",
248250
gvkMap: map[framework.GVK]framework.ActionType{
249251
framework.PodSchedulingContext: framework.Add,
250252
framework.ResourceClaim: framework.Add,
253+
framework.ResourceSlice: framework.Add,
251254
framework.DeviceClass: framework.Add,
252255
},
253256
enableDRA: true,
257+
expectStaticInformers: map[reflect.Type]bool{
258+
reflect.TypeOf(&v1.Pod{}): true,
259+
reflect.TypeOf(&v1.Node{}): true,
260+
reflect.TypeOf(&v1.Namespace{}): true,
261+
reflect.TypeOf(&resourceapi.ResourceClaim{}): true,
262+
reflect.TypeOf(&resourceapi.ResourceSlice{}): true,
263+
reflect.TypeOf(&resourceapi.DeviceClass{}): true,
264+
},
265+
expectDynamicInformers: map[schema.GroupVersionResource]bool{},
266+
},
267+
{
268+
name: "all DRA events enabled",
269+
gvkMap: map[framework.GVK]framework.ActionType{
270+
framework.PodSchedulingContext: framework.Add,
271+
framework.ResourceClaim: framework.Add,
272+
framework.ResourceSlice: framework.Add,
273+
framework.DeviceClass: framework.Add,
274+
},
275+
enableDRA: true,
276+
enableClassicDRA: true,
254277
expectStaticInformers: map[reflect.Type]bool{
255278
reflect.TypeOf(&v1.Pod{}): true,
256279
reflect.TypeOf(&v1.Node{}): true,
257280
reflect.TypeOf(&v1.Namespace{}): true,
258281
reflect.TypeOf(&resourceapi.PodSchedulingContext{}): true,
259282
reflect.TypeOf(&resourceapi.ResourceClaim{}): true,
283+
reflect.TypeOf(&resourceapi.ResourceSlice{}): true,
260284
reflect.TypeOf(&resourceapi.DeviceClass{}): true,
261285
},
262286
expectDynamicInformers: map[schema.GroupVersionResource]bool{},
@@ -320,6 +344,7 @@ func TestAddAllEventHandlers(t *testing.T) {
320344
for _, tt := range tests {
321345
t.Run(tt.name, func(t *testing.T) {
322346
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.DynamicResourceAllocation, tt.enableDRA)
347+
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.DRAControlPlaneController, tt.enableClassicDRA)
323348
logger, ctx := ktesting.NewTestContext(t)
324349
ctx, cancel := context.WithCancel(ctx)
325350
defer cancel()

pkg/scheduler/framework/plugins/dynamicresources/dynamicresources.go

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -398,6 +398,8 @@ func (pl *dynamicResources) EventsToRegister(_ context.Context) ([]framework.Clu
398398
{Event: framework.ClusterEvent{Resource: framework.Node, ActionType: framework.Add | framework.UpdateNodeLabel | framework.UpdateNodeTaint}},
399399
// A pod might be waiting for a class to get created or modified.
400400
{Event: framework.ClusterEvent{Resource: framework.DeviceClass, ActionType: framework.Add | framework.Update}},
401+
// Adding or updating a ResourceSlice might make a pod schedulable because new resources became available.
402+
{Event: framework.ClusterEvent{Resource: framework.ResourceSlice, ActionType: framework.Add | framework.Update}, QueueingHintFn: pl.isSchedulableAfterResourceSliceChange},
401403
}
402404

403405
if pl.podSchedulingContextLister != nil {
@@ -491,6 +493,38 @@ func (pl *dynamicResources) isSchedulableAfterClaimChange(logger klog.Logger, po
491493
return framework.Queue, nil
492494
}
493495

496+
// isSchedulableAfterResourceSliceChange is invoked for add and update slice events reported by
497+
// an informer. Such changes can make an unschedulable pod schedulable when the pod requests a device
498+
// and the change adds a suitable device.
499+
//
500+
// For the sake of faster execution and avoiding code duplication, isSchedulableAfterResourceSliceChange
501+
// only checks whether the pod uses claims. All of the more detailed checks are done in the scheduling
502+
// attempt.
503+
//
504+
// The delete claim event will not invoke it, so newObj will never be nil.
505+
func (pl *dynamicResources) isSchedulableAfterResourceSliceChange(logger klog.Logger, pod *v1.Pod, oldObj, newObj interface{}) (framework.QueueingHint, error) {
506+
_, modifiedSlice, err := schedutil.As[*resourceapi.ResourceSlice](oldObj, newObj)
507+
if err != nil {
508+
// Shouldn't happen.
509+
return framework.Queue, fmt.Errorf("unexpected object in isSchedulableAfterResourceSliceChange: %w", err)
510+
}
511+
512+
if err := pl.foreachPodResourceClaim(pod, nil); err != nil {
513+
// This is not an unexpected error: we know that
514+
// foreachPodResourceClaim only returns errors for "not
515+
// schedulable".
516+
logger.V(6).Info("pod is not schedulable after resource slice change", "pod", klog.KObj(pod), "resourceSlice", klog.KObj(modifiedSlice), "reason", err.Error())
517+
return framework.QueueSkip, nil
518+
}
519+
520+
// We could check what got changed in the slice, but right now that's likely to be
521+
// about the spec (there's no status yet...).
522+
// We could check whether all claims use classic DRA, but that doesn't seem worth it.
523+
// Let's assume that changing the slice may make the pod schedulable.
524+
logger.V(5).Info("ResourceSlice change might make pod schedulable", "pod", klog.KObj(pod), "resourceSlice", klog.KObj(modifiedSlice))
525+
return framework.Queue, nil
526+
}
527+
494528
// isSchedulableAfterPodSchedulingContextChange is invoked for all
495529
// PodSchedulingContext events reported by an informer. It checks whether that
496530
// change made a previously unschedulable pod schedulable (updated) or a new

pkg/scheduler/framework/types.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,7 @@ const (
125125
StorageClass GVK = "storage.k8s.io/StorageClass"
126126
PodSchedulingContext GVK = "PodSchedulingContext"
127127
ResourceClaim GVK = "ResourceClaim"
128+
ResourceSlice GVK = "ResourceSlice"
128129
DeviceClass GVK = "DeviceClass"
129130

130131
// WildCard is a special GVK to match all resources.

test/e2e/dra/deploy.go

Lines changed: 108 additions & 83 deletions
Original file line numberDiff line numberDiff line change
@@ -77,69 +77,81 @@ type Nodes struct {
7777
var pluginPermissions string
7878

7979
// NewNodes selects nodes to run the test on.
80+
//
81+
// Call this outside of ginkgo.It, then use the instance inside ginkgo.It.
8082
func NewNodes(f *framework.Framework, minNodes, maxNodes int) *Nodes {
8183
nodes := &Nodes{}
8284
ginkgo.BeforeEach(func(ctx context.Context) {
83-
84-
ginkgo.By("selecting nodes")
85-
// The kubelet plugin is harder. We deploy the builtin manifest
86-
// after patching in the driver name and all nodes on which we
87-
// want the plugin to run.
88-
//
89-
// Only a subset of the nodes are picked to avoid causing
90-
// unnecessary load on a big cluster.
91-
nodeList, err := e2enode.GetBoundedReadySchedulableNodes(ctx, f.ClientSet, maxNodes)
92-
framework.ExpectNoError(err, "get nodes")
93-
numNodes := int32(len(nodeList.Items))
94-
if int(numNodes) < minNodes {
95-
e2eskipper.Skipf("%d ready nodes required, only have %d", minNodes, numNodes)
96-
}
97-
nodes.NodeNames = nil
98-
for _, node := range nodeList.Items {
99-
nodes.NodeNames = append(nodes.NodeNames, node.Name)
100-
}
101-
sort.Strings(nodes.NodeNames)
102-
framework.Logf("testing on nodes %v", nodes.NodeNames)
103-
104-
// Watch claims in the namespace. This is useful for monitoring a test
105-
// and enables additional sanity checks.
106-
claimInformer := resourceapiinformer.NewResourceClaimInformer(f.ClientSet, f.Namespace.Name, 100*time.Hour /* resync */, nil)
107-
cancelCtx, cancel := context.WithCancelCause(context.Background())
108-
var wg sync.WaitGroup
109-
ginkgo.DeferCleanup(func() {
110-
cancel(errors.New("test has completed"))
111-
wg.Wait()
112-
})
113-
_, err = claimInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
114-
AddFunc: func(obj any) {
115-
defer ginkgo.GinkgoRecover()
116-
claim := obj.(*resourceapi.ResourceClaim)
117-
framework.Logf("New claim:\n%s", format.Object(claim, 1))
118-
validateClaim(claim)
119-
},
120-
UpdateFunc: func(oldObj, newObj any) {
121-
defer ginkgo.GinkgoRecover()
122-
oldClaim := oldObj.(*resourceapi.ResourceClaim)
123-
newClaim := newObj.(*resourceapi.ResourceClaim)
124-
framework.Logf("Updated claim:\n%s\nDiff:\n%s", format.Object(newClaim, 1), cmp.Diff(oldClaim, newClaim))
125-
validateClaim(newClaim)
126-
},
127-
DeleteFunc: func(obj any) {
128-
defer ginkgo.GinkgoRecover()
129-
claim := obj.(*resourceapi.ResourceClaim)
130-
framework.Logf("Deleted claim:\n%s", format.Object(claim, 1))
131-
},
132-
})
133-
framework.ExpectNoError(err, "AddEventHandler")
134-
wg.Add(1)
135-
go func() {
136-
defer wg.Done()
137-
claimInformer.Run(cancelCtx.Done())
138-
}()
85+
nodes.init(ctx, f, minNodes, maxNodes)
13986
})
14087
return nodes
14188
}
14289

90+
// NewNodesNow is a variant of NewNodes which can be used inside a ginkgo.It.
91+
func NewNodesNow(ctx context.Context, f *framework.Framework, minNodes, maxNodes int) *Nodes {
92+
nodes := &Nodes{}
93+
nodes.init(ctx, f, minNodes, maxNodes)
94+
return nodes
95+
}
96+
97+
func (nodes *Nodes) init(ctx context.Context, f *framework.Framework, minNodes, maxNodes int) {
98+
ginkgo.By("selecting nodes")
99+
// The kubelet plugin is harder. We deploy the builtin manifest
100+
// after patching in the driver name and all nodes on which we
101+
// want the plugin to run.
102+
//
103+
// Only a subset of the nodes are picked to avoid causing
104+
// unnecessary load on a big cluster.
105+
nodeList, err := e2enode.GetBoundedReadySchedulableNodes(ctx, f.ClientSet, maxNodes)
106+
framework.ExpectNoError(err, "get nodes")
107+
numNodes := int32(len(nodeList.Items))
108+
if int(numNodes) < minNodes {
109+
e2eskipper.Skipf("%d ready nodes required, only have %d", minNodes, numNodes)
110+
}
111+
nodes.NodeNames = nil
112+
for _, node := range nodeList.Items {
113+
nodes.NodeNames = append(nodes.NodeNames, node.Name)
114+
}
115+
sort.Strings(nodes.NodeNames)
116+
framework.Logf("testing on nodes %v", nodes.NodeNames)
117+
118+
// Watch claims in the namespace. This is useful for monitoring a test
119+
// and enables additional sanity checks.
120+
claimInformer := resourceapiinformer.NewResourceClaimInformer(f.ClientSet, f.Namespace.Name, 100*time.Hour /* resync */, nil)
121+
cancelCtx, cancel := context.WithCancelCause(context.Background())
122+
var wg sync.WaitGroup
123+
ginkgo.DeferCleanup(func() {
124+
cancel(errors.New("test has completed"))
125+
wg.Wait()
126+
})
127+
_, err = claimInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
128+
AddFunc: func(obj any) {
129+
defer ginkgo.GinkgoRecover()
130+
claim := obj.(*resourceapi.ResourceClaim)
131+
framework.Logf("New claim:\n%s", format.Object(claim, 1))
132+
validateClaim(claim)
133+
},
134+
UpdateFunc: func(oldObj, newObj any) {
135+
defer ginkgo.GinkgoRecover()
136+
oldClaim := oldObj.(*resourceapi.ResourceClaim)
137+
newClaim := newObj.(*resourceapi.ResourceClaim)
138+
framework.Logf("Updated claim:\n%s\nDiff:\n%s", format.Object(newClaim, 1), cmp.Diff(oldClaim, newClaim))
139+
validateClaim(newClaim)
140+
},
141+
DeleteFunc: func(obj any) {
142+
defer ginkgo.GinkgoRecover()
143+
claim := obj.(*resourceapi.ResourceClaim)
144+
framework.Logf("Deleted claim:\n%s", format.Object(claim, 1))
145+
},
146+
})
147+
framework.ExpectNoError(err, "AddEventHandler")
148+
wg.Add(1)
149+
go func() {
150+
defer wg.Done()
151+
claimInformer.Run(cancelCtx.Done())
152+
}()
153+
}
154+
143155
func validateClaim(claim *resourceapi.ResourceClaim) {
144156
// The apiserver doesn't enforce that a claim always has a finalizer
145157
// while being allocated. This is a convention that whoever allocates a
@@ -153,28 +165,43 @@ func validateClaim(claim *resourceapi.ResourceClaim) {
153165
// NewDriver sets up controller (as client of the cluster) and
154166
// kubelet plugin (via proxy) before the test runs. It cleans
155167
// up after the test.
168+
//
169+
// Call this outside of ginkgo.It, then use the instance inside ginkgo.It.
156170
func NewDriver(f *framework.Framework, nodes *Nodes, configureResources func() app.Resources, devicesPerNode ...map[string]map[resourceapi.QualifiedName]resourceapi.DeviceAttribute) *Driver {
157-
d := &Driver{
158-
f: f,
159-
fail: map[MethodInstance]bool{},
160-
callCounts: map[MethodInstance]int64{},
161-
NodeV1alpha3: true,
162-
}
171+
d := NewDriverInstance(f)
163172

164173
ginkgo.BeforeEach(func() {
165-
resources := configureResources()
166-
if len(resources.Nodes) == 0 {
167-
// This always has to be set because the driver might
168-
// not run on all nodes.
169-
resources.Nodes = nodes.NodeNames
170-
}
171-
ginkgo.DeferCleanup(d.IsGone) // Register first so it gets called last.
172-
d.SetUp(nodes, resources, devicesPerNode...)
173-
ginkgo.DeferCleanup(d.TearDown)
174+
d.Run(nodes, configureResources, devicesPerNode...)
174175
})
175176
return d
176177
}
177178

179+
// NewDriverInstance is a variant of NewDriver where the driver is inactive and must
180+
// be started explicitly with Run. May be used inside ginkgo.It.
181+
func NewDriverInstance(f *framework.Framework) *Driver {
182+
d := &Driver{
183+
f: f,
184+
fail: map[MethodInstance]bool{},
185+
callCounts: map[MethodInstance]int64{},
186+
NodeV1alpha3: true,
187+
parameterMode: parameterModeStructured,
188+
}
189+
d.initName()
190+
return d
191+
}
192+
193+
func (d *Driver) Run(nodes *Nodes, configureResources func() app.Resources, devicesPerNode ...map[string]map[resourceapi.QualifiedName]resourceapi.DeviceAttribute) {
194+
resources := configureResources()
195+
if len(resources.Nodes) == 0 {
196+
// This always has to be set because the driver might
197+
// not run on all nodes.
198+
resources.Nodes = nodes.NodeNames
199+
}
200+
ginkgo.DeferCleanup(d.IsGone) // Register first so it gets called last.
201+
d.SetUp(nodes, resources, devicesPerNode...)
202+
ginkgo.DeferCleanup(d.TearDown)
203+
}
204+
178205
type MethodInstance struct {
179206
Nodename string
180207
FullMethod string
@@ -215,25 +242,23 @@ const (
215242
parameterModeStructured parameterMode = "structured" // allocation through scheduler
216243
)
217244

245+
func (d *Driver) initName() {
246+
d.Name = d.f.UniqueName + d.NameSuffix + ".k8s.io"
247+
}
248+
218249
func (d *Driver) SetUp(nodes *Nodes, resources app.Resources, devicesPerNode ...map[string]map[resourceapi.QualifiedName]resourceapi.DeviceAttribute) {
219-
ginkgo.By(fmt.Sprintf("deploying driver on nodes %v", nodes.NodeNames))
250+
d.initName()
251+
ginkgo.By(fmt.Sprintf("deploying driver %s on nodes %v", d.Name, nodes.NodeNames))
220252
d.Nodes = make(map[string]KubeletPlugin)
221-
d.Name = d.f.UniqueName + d.NameSuffix + ".k8s.io"
222253
resources.DriverName = d.Name
223254

224255
ctx, cancel := context.WithCancel(context.Background())
225-
if d.NameSuffix != "" {
226-
logger := klog.FromContext(ctx)
227-
logger = klog.LoggerWithName(logger, "instance"+d.NameSuffix)
228-
ctx = klog.NewContext(ctx, logger)
229-
}
256+
logger := klog.FromContext(ctx)
257+
logger = klog.LoggerWithValues(logger, "driverName", d.Name)
258+
ctx = klog.NewContext(ctx, logger)
230259
d.ctx = ctx
231260
d.cleanup = append(d.cleanup, cancel)
232261

233-
if d.parameterMode == "" {
234-
d.parameterMode = parameterModeStructured
235-
}
236-
237262
switch d.parameterMode {
238263
case parameterModeClassicDRA:
239264
// The controller is easy: we simply connect to the API server.
@@ -387,7 +412,7 @@ func (d *Driver) SetUp(nodes *Nodes, resources app.Resources, devicesPerNode ...
387412
// Here we merely use impersonation, which is faster.
388413
driverClient := d.impersonateKubeletPlugin(&pod)
389414

390-
logger := klog.LoggerWithValues(klog.LoggerWithName(klog.Background(), "kubelet plugin"), "node", pod.Spec.NodeName, "pod", klog.KObj(&pod))
415+
logger := klog.LoggerWithValues(klog.LoggerWithName(logger, "kubelet-plugin"), "node", pod.Spec.NodeName, "pod", klog.KObj(&pod))
391416
loggerCtx := klog.NewContext(ctx, logger)
392417
fileOps := app.FileOperations{
393418
Create: func(name string, content []byte) error {

0 commit comments

Comments
 (0)