Skip to content

Commit 5760a4f

Browse files
nojnhuhpohly
authored andcommitted
DRA scheduler: device taints and tolerations
Thanks to the tracker, the plugin sees all taints directly in the device definition and can compare it against the tolerations of a request while trying to find a device for the request. When the feature is turnedd off, taints are ignored during scheduling.
1 parent a027b43 commit 5760a4f

File tree

18 files changed

+577
-51
lines changed

18 files changed

+577
-51
lines changed

pkg/scheduler/eventhandlers.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ import (
3333
"k8s.io/client-go/tools/cache"
3434
corev1helpers "k8s.io/component-helpers/scheduling/corev1"
3535
corev1nodeaffinity "k8s.io/component-helpers/scheduling/corev1/nodeaffinity"
36+
resourceslicetracker "k8s.io/dynamic-resource-allocation/resourceslice/tracker"
3637
"k8s.io/klog/v2"
3738
"k8s.io/kubernetes/pkg/features"
3839
"k8s.io/kubernetes/pkg/scheduler/backend/queue"
@@ -366,6 +367,7 @@ func addAllEventHandlers(
366367
informerFactory informers.SharedInformerFactory,
367368
dynInformerFactory dynamicinformer.DynamicSharedInformerFactory,
368369
resourceClaimCache *assumecache.AssumeCache,
370+
resourceSliceTracker *resourceslicetracker.Tracker,
369371
gvkMap map[framework.EventResource]framework.ActionType,
370372
) error {
371373
var (
@@ -555,7 +557,7 @@ func addAllEventHandlers(
555557
}
556558
case framework.ResourceSlice:
557559
if utilfeature.DefaultFeatureGate.Enabled(features.DynamicResourceAllocation) {
558-
if handlerRegistration, err = informerFactory.Resource().V1beta1().ResourceSlices().Informer().AddEventHandler(
560+
if handlerRegistration, err = resourceSliceTracker.AddEventHandler(
559561
buildEvtResHandler(at, framework.ResourceSlice),
560562
); err != nil {
561563
return err

pkg/scheduler/eventhandlers_test.go

Lines changed: 41 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,12 +28,14 @@ import (
2828
appsv1 "k8s.io/api/apps/v1"
2929
batchv1 "k8s.io/api/batch/v1"
3030
v1 "k8s.io/api/core/v1"
31+
resourcealphaapi "k8s.io/api/resource/v1alpha3"
3132
resourceapi "k8s.io/api/resource/v1beta1"
3233
storagev1 "k8s.io/api/storage/v1"
3334
"k8s.io/apimachinery/pkg/api/resource"
3435
"k8s.io/apimachinery/pkg/util/sets"
3536
utilfeature "k8s.io/apiserver/pkg/util/feature"
3637
featuregatetesting "k8s.io/component-base/featuregate/testing"
38+
resourceslicetracker "k8s.io/dynamic-resource-allocation/resourceslice/tracker"
3739
"k8s.io/klog/v2"
3840
"k8s.io/klog/v2/ktesting"
3941

@@ -395,6 +397,7 @@ func TestAddAllEventHandlers(t *testing.T) {
395397
name string
396398
gvkMap map[framework.EventResource]framework.ActionType
397399
enableDRA bool
400+
enableDRADeviceTaints bool
398401
expectStaticInformers map[reflect.Type]bool
399402
expectDynamicInformers map[schema.GroupVersionResource]bool
400403
}{
@@ -423,7 +426,7 @@ func TestAddAllEventHandlers(t *testing.T) {
423426
expectDynamicInformers: map[schema.GroupVersionResource]bool{},
424427
},
425428
{
426-
name: "all DRA events enabled",
429+
name: "core DRA events enabled",
427430
gvkMap: map[framework.EventResource]framework.ActionType{
428431
framework.ResourceClaim: framework.Add,
429432
framework.ResourceSlice: framework.Add,
@@ -440,6 +443,26 @@ func TestAddAllEventHandlers(t *testing.T) {
440443
},
441444
expectDynamicInformers: map[schema.GroupVersionResource]bool{},
442445
},
446+
{
447+
name: "all DRA events enabled",
448+
gvkMap: map[framework.EventResource]framework.ActionType{
449+
framework.ResourceClaim: framework.Add,
450+
framework.ResourceSlice: framework.Add,
451+
framework.DeviceClass: framework.Add,
452+
},
453+
enableDRA: true,
454+
enableDRADeviceTaints: true,
455+
expectStaticInformers: map[reflect.Type]bool{
456+
reflect.TypeOf(&v1.Pod{}): true,
457+
reflect.TypeOf(&v1.Node{}): true,
458+
reflect.TypeOf(&v1.Namespace{}): true,
459+
reflect.TypeOf(&resourceapi.ResourceClaim{}): true,
460+
reflect.TypeOf(&resourceapi.ResourceSlice{}): true,
461+
reflect.TypeOf(&resourcealphaapi.DeviceTaintRule{}): true,
462+
reflect.TypeOf(&resourceapi.DeviceClass{}): true,
463+
},
464+
expectDynamicInformers: map[schema.GroupVersionResource]bool{},
465+
},
443466
{
444467
name: "add GVKs handlers defined in framework dynamically",
445468
gvkMap: map[framework.EventResource]framework.ActionType{
@@ -499,6 +522,7 @@ func TestAddAllEventHandlers(t *testing.T) {
499522
for _, tt := range tests {
500523
t.Run(tt.name, func(t *testing.T) {
501524
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.DynamicResourceAllocation, tt.enableDRA)
525+
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.DRADeviceTaints, tt.enableDRADeviceTaints)
502526

503527
logger, ctx := ktesting.NewTestContext(t)
504528
ctx, cancel := context.WithCancel(ctx)
@@ -515,12 +539,27 @@ func TestAddAllEventHandlers(t *testing.T) {
515539
dynclient := dyfake.NewSimpleDynamicClient(scheme)
516540
dynInformerFactory := dynamicinformer.NewDynamicSharedInformerFactory(dynclient, 0)
517541
var resourceClaimCache *assumecache.AssumeCache
542+
var resourceSliceTracker *resourceslicetracker.Tracker
518543
if utilfeature.DefaultFeatureGate.Enabled(features.DynamicResourceAllocation) {
519544
resourceClaimInformer := informerFactory.Resource().V1beta1().ResourceClaims().Informer()
520545
resourceClaimCache = assumecache.NewAssumeCache(logger, resourceClaimInformer, "ResourceClaim", "", nil)
546+
var err error
547+
opts := resourceslicetracker.Options{
548+
EnableDeviceTaints: utilfeature.DefaultFeatureGate.Enabled(features.DRADeviceTaints),
549+
SliceInformer: informerFactory.Resource().V1beta1().ResourceSlices(),
550+
}
551+
if opts.EnableDeviceTaints {
552+
opts.TaintInformer = informerFactory.Resource().V1alpha3().DeviceTaintRules()
553+
opts.ClassInformer = informerFactory.Resource().V1beta1().DeviceClasses()
554+
555+
}
556+
resourceSliceTracker, err = resourceslicetracker.StartTracker(ctx, opts)
557+
if err != nil {
558+
t.Fatalf("couldn't start resource slice tracker: %v", err)
559+
}
521560
}
522561

523-
if err := addAllEventHandlers(&testSched, informerFactory, dynInformerFactory, resourceClaimCache, tt.gvkMap); err != nil {
562+
if err := addAllEventHandlers(&testSched, informerFactory, dynInformerFactory, resourceClaimCache, resourceSliceTracker, tt.gvkMap); err != nil {
524563
t.Fatalf("Add event handlers failed, error = %v", err)
525564
}
526565

pkg/scheduler/framework/autoscaler_contract/lister_contract_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ func (c *shareListerContract) StorageInfos() framework.StorageInfoLister {
7272

7373
type resourceSliceListerContract struct{}
7474

75-
func (c *resourceSliceListerContract) List() ([]*resourceapi.ResourceSlice, error) {
75+
func (c *resourceSliceListerContract) ListWithDeviceTaintRules() ([]*resourceapi.ResourceSlice, error) {
7676
return nil, nil
7777
}
7878

pkg/scheduler/framework/listers.go

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -50,8 +50,13 @@ type SharedLister interface {
5050

5151
// ResourceSliceLister can be used to obtain ResourceSlices.
5252
type ResourceSliceLister interface {
53-
// List returns a list of all ResourceSlices.
54-
List() ([]*resourceapi.ResourceSlice, error)
53+
// ListWithDeviceTaintRules returns a list of all ResourceSlices with DeviceTaintRules applied
54+
// if the DRADeviceTaints feature is enabled, otherwise without them.
55+
//
56+
// k8s.io/dynamic-resource-allocation/resourceslice/tracker provides an implementation
57+
// of the necessary logic. That tracker can be instantiated as a replacement for
58+
// a normal ResourceSlice informer and provides a ListPatchedResourceSlices method.
59+
ListWithDeviceTaintRules() ([]*resourceapi.ResourceSlice, error)
5560
}
5661

5762
// DeviceClassLister can be used to obtain DeviceClasses.

pkg/scheduler/framework/plugins/dynamicresources/dra_manager.go

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ import (
2727
"k8s.io/apimachinery/pkg/util/sets"
2828
"k8s.io/client-go/informers"
2929
resourcelisters "k8s.io/client-go/listers/resource/v1beta1"
30+
resourceslicetracker "k8s.io/dynamic-resource-allocation/resourceslice/tracker"
3031
"k8s.io/dynamic-resource-allocation/structured"
3132
"k8s.io/klog/v2"
3233
"k8s.io/kubernetes/pkg/scheduler/framework"
@@ -44,16 +45,17 @@ type DefaultDRAManager struct {
4445
deviceClassLister *deviceClassLister
4546
}
4647

47-
func NewDRAManager(ctx context.Context, claimsCache *assumecache.AssumeCache, informerFactory informers.SharedInformerFactory) *DefaultDRAManager {
48+
func NewDRAManager(ctx context.Context, claimsCache *assumecache.AssumeCache, resourceSliceTracker *resourceslicetracker.Tracker, informerFactory informers.SharedInformerFactory) *DefaultDRAManager {
4849
logger := klog.FromContext(ctx)
50+
4951
manager := &DefaultDRAManager{
5052
resourceClaimTracker: &claimTracker{
5153
cache: claimsCache,
5254
inFlightAllocations: &sync.Map{},
5355
allocatedDevices: newAllocatedDevices(logger),
5456
logger: logger,
5557
},
56-
resourceSliceLister: &resourceSliceLister{sliceLister: informerFactory.Resource().V1beta1().ResourceSlices().Lister()},
58+
resourceSliceLister: &resourceSliceLister{tracker: resourceSliceTracker},
5759
deviceClassLister: &deviceClassLister{classLister: informerFactory.Resource().V1beta1().DeviceClasses().Lister()},
5860
}
5961

@@ -79,11 +81,11 @@ func (s *DefaultDRAManager) DeviceClasses() framework.DeviceClassLister {
7981
var _ framework.ResourceSliceLister = &resourceSliceLister{}
8082

8183
type resourceSliceLister struct {
82-
sliceLister resourcelisters.ResourceSliceLister
84+
tracker *resourceslicetracker.Tracker
8385
}
8486

85-
func (l *resourceSliceLister) List() ([]*resourceapi.ResourceSlice, error) {
86-
return l.sliceLister.List(labels.Everything())
87+
func (l *resourceSliceLister) ListWithDeviceTaintRules() ([]*resourceapi.ResourceSlice, error) {
88+
return l.tracker.ListPatchedResourceSlices()
8789
}
8890

8991
var _ framework.DeviceClassLister = &deviceClassLister{}

pkg/scheduler/framework/plugins/dynamicresources/dynamicresources.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,7 @@ type DynamicResources struct {
106106
enableAdminAccess bool
107107
enablePrioritizedList bool
108108
enableSchedulingQueueHint bool
109+
enableDeviceTaints bool
109110

110111
fh framework.Handle
111112
clientset kubernetes.Interface
@@ -123,6 +124,7 @@ func New(ctx context.Context, plArgs runtime.Object, fh framework.Handle, fts fe
123124
pl := &DynamicResources{
124125
enabled: true,
125126
enableAdminAccess: fts.EnableDRAAdminAccess,
127+
enableDeviceTaints: fts.EnableDRADeviceTaints,
126128
enablePrioritizedList: fts.EnableDRAPrioritizedList,
127129
enableSchedulingQueueHint: fts.EnableSchedulingQueueHint,
128130

@@ -448,11 +450,11 @@ func (pl *DynamicResources) PreFilter(ctx context.Context, state *framework.Cycl
448450
if err != nil {
449451
return nil, statusError(logger, err)
450452
}
451-
slices, err := pl.draManager.ResourceSlices().List()
453+
slices, err := pl.draManager.ResourceSlices().ListWithDeviceTaintRules()
452454
if err != nil {
453455
return nil, statusError(logger, err)
454456
}
455-
allocator, err := structured.NewAllocator(ctx, pl.enableAdminAccess, pl.enablePrioritizedList, allocateClaims, allAllocatedDevices, pl.draManager.DeviceClasses(), slices, pl.celCache)
457+
allocator, err := structured.NewAllocator(ctx, pl.enableAdminAccess, pl.enablePrioritizedList, pl.enableDeviceTaints, allocateClaims, allAllocatedDevices, pl.draManager.DeviceClasses(), slices, pl.celCache)
456458
if err != nil {
457459
return nil, statusError(logger, err)
458460
}

pkg/scheduler/framework/plugins/dynamicresources/dynamicresources_test.go

Lines changed: 77 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ import (
3838
"k8s.io/client-go/informers"
3939
"k8s.io/client-go/kubernetes/fake"
4040
cgotesting "k8s.io/client-go/testing"
41+
resourceslicetracker "k8s.io/dynamic-resource-allocation/resourceslice/tracker"
4142
"k8s.io/kubernetes/pkg/scheduler/framework"
4243
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/feature"
4344
"k8s.io/kubernetes/pkg/scheduler/framework/runtime"
@@ -183,8 +184,22 @@ var (
183184
otherAllocatedClaim = st.FromResourceClaim(otherClaim).
184185
Allocation(allocationResult).
185186
Obj()
187+
188+
deviceTaint = resourceapi.DeviceTaint{
189+
Key: "taint-key",
190+
Value: "taint-value",
191+
Effect: resourceapi.DeviceTaintEffectNoSchedule,
192+
}
186193
)
187194

195+
func taintDevices(slice *resourceapi.ResourceSlice) *resourceapi.ResourceSlice {
196+
slice = slice.DeepCopy()
197+
for i := range slice.Spec.Devices {
198+
slice.Spec.Devices[i].Basic.Taints = append(slice.Spec.Devices[i].Basic.Taints, deviceTaint)
199+
}
200+
return slice
201+
}
202+
188203
func reserve(claim *resourceapi.ResourceClaim, pod *v1.Pod) *resourceapi.ResourceClaim {
189204
return st.FromResourceClaim(claim).
190205
ReservedForPod(pod.Name, types.UID(pod.UID)).
@@ -343,6 +358,7 @@ func TestPlugin(t *testing.T) {
343358
disableDRA bool
344359

345360
enableDRAPrioritizedList bool
361+
enableDRADeviceTaints bool
346362
}{
347363
"empty": {
348364
pod: st.MakePod().Name("foo").Namespace("default").Obj(),
@@ -604,6 +620,56 @@ func TestPlugin(t *testing.T) {
604620
},
605621
},
606622

623+
// The two test cases for device tainting only need to cover
624+
// whether the feature gate is passed through to the allocator
625+
// correctly. The actual logic around device taints and allocation
626+
// is in the allocator.
627+
"tainted-device-disabled": {
628+
enableDRADeviceTaints: false,
629+
pod: podWithClaimName,
630+
claims: []*resourceapi.ResourceClaim{pendingClaim},
631+
classes: []*resourceapi.DeviceClass{deviceClass},
632+
objs: []apiruntime.Object{taintDevices(workerNodeSlice)},
633+
want: want{
634+
reserve: result{
635+
inFlightClaim: allocatedClaim,
636+
},
637+
prebind: result{
638+
assumedClaim: reserve(allocatedClaim, podWithClaimName),
639+
changes: change{
640+
claim: func(claim *resourceapi.ResourceClaim) *resourceapi.ResourceClaim {
641+
if claim.Name == claimName {
642+
claim = claim.DeepCopy()
643+
claim.Finalizers = allocatedClaim.Finalizers
644+
claim.Status = inUseClaim.Status
645+
}
646+
return claim
647+
},
648+
},
649+
},
650+
postbind: result{
651+
assumedClaim: reserve(allocatedClaim, podWithClaimName),
652+
},
653+
},
654+
},
655+
"tainted-device-enabled": {
656+
enableDRADeviceTaints: true,
657+
pod: podWithClaimName,
658+
claims: []*resourceapi.ResourceClaim{pendingClaim},
659+
classes: []*resourceapi.DeviceClass{deviceClass},
660+
objs: []apiruntime.Object{taintDevices(workerNodeSlice)},
661+
want: want{
662+
filter: perNodeResult{
663+
workerNode.Name: {
664+
status: framework.NewStatus(framework.UnschedulableAndUnresolvable, `cannot allocate all claims`),
665+
},
666+
},
667+
postfilter: result{
668+
status: framework.NewStatus(framework.Unschedulable, `still not schedulable`),
669+
},
670+
},
671+
},
672+
607673
"request-admin-access-with-DRAAdminAccess-featuregate": {
608674
// When the DRAAdminAccess feature gate is enabled,
609675
// Because the pending claim asks for admin access,
@@ -920,6 +986,7 @@ func TestPlugin(t *testing.T) {
920986
}
921987
features := feature.Features{
922988
EnableDRAAdminAccess: tc.enableDRAAdminAccess,
989+
EnableDRADeviceTaints: tc.enableDRADeviceTaints,
923990
EnableDynamicResourceAllocation: !tc.disableDRA,
924991
EnableDRAPrioritizedList: tc.enableDRAPrioritizedList,
925992
}
@@ -1189,7 +1256,16 @@ func setup(t *testing.T, nodes []*v1.Node, claims []*resourceapi.ResourceClaim,
11891256
tc.client.PrependReactor("*", "*", reactor)
11901257

11911258
tc.informerFactory = informers.NewSharedInformerFactory(tc.client, 0)
1192-
tc.draManager = NewDRAManager(tCtx, assumecache.NewAssumeCache(tCtx.Logger(), tc.informerFactory.Resource().V1beta1().ResourceClaims().Informer(), "resource claim", "", nil), tc.informerFactory)
1259+
resourceSliceTrackerOpts := resourceslicetracker.Options{
1260+
EnableDeviceTaints: true,
1261+
SliceInformer: tc.informerFactory.Resource().V1beta1().ResourceSlices(),
1262+
TaintInformer: tc.informerFactory.Resource().V1alpha3().DeviceTaintRules(),
1263+
ClassInformer: tc.informerFactory.Resource().V1beta1().DeviceClasses(),
1264+
KubeClient: tc.client,
1265+
}
1266+
resourceSliceTracker, err := resourceslicetracker.StartTracker(tCtx, resourceSliceTrackerOpts)
1267+
require.NoError(t, err, "couldn't start resource slice tracker")
1268+
tc.draManager = NewDRAManager(tCtx, assumecache.NewAssumeCache(tCtx.Logger(), tc.informerFactory.Resource().V1beta1().ResourceClaims().Informer(), "resource claim", "", nil), resourceSliceTracker, tc.informerFactory)
11931269
opts := []runtime.Option{
11941270
runtime.WithClientSet(tc.client),
11951271
runtime.WithInformerFactory(tc.informerFactory),

pkg/scheduler/framework/plugins/feature/feature.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ package feature
2222
type Features struct {
2323
EnableDRAPrioritizedList bool
2424
EnableDRAAdminAccess bool
25+
EnableDRADeviceTaints bool
2526
EnableDynamicResourceAllocation bool
2627
EnableVolumeCapacityPriority bool
2728
EnableVolumeAttributesClass bool

pkg/scheduler/framework/plugins/registry.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ func NewInTreeRegistry() runtime.Registry {
4848
fts := plfeature.Features{
4949
EnableDRAPrioritizedList: feature.DefaultFeatureGate.Enabled(features.DRAPrioritizedList),
5050
EnableDRAAdminAccess: feature.DefaultFeatureGate.Enabled(features.DRAAdminAccess),
51+
EnableDRADeviceTaints: feature.DefaultFeatureGate.Enabled(features.DRADeviceTaints),
5152
EnableDynamicResourceAllocation: feature.DefaultFeatureGate.Enabled(features.DynamicResourceAllocation),
5253
EnableVolumeCapacityPriority: feature.DefaultFeatureGate.Enabled(features.VolumeCapacityPriority),
5354
EnableVolumeAttributesClass: feature.DefaultFeatureGate.Enabled(features.VolumeAttributesClass),

0 commit comments

Comments
 (0)