Skip to content

Commit c69f150

Browse files
authored
Merge pull request kubernetes#127277 from pohly/dra-structured-performance
kube-scheduler: enhance performance for DRA structured parameters
2 parents bc79d3b + 7863d9a commit c69f150

File tree

13 files changed

+1085
-275
lines changed

13 files changed

+1085
-275
lines changed
Lines changed: 175 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,175 @@
1+
/*
2+
Copyright 2024 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package dynamicresources
18+
19+
import (
20+
"sync"
21+
22+
resourceapi "k8s.io/api/resource/v1alpha3"
23+
"k8s.io/apimachinery/pkg/util/sets"
24+
"k8s.io/client-go/tools/cache"
25+
"k8s.io/dynamic-resource-allocation/structured"
26+
"k8s.io/klog/v2"
27+
schedutil "k8s.io/kubernetes/pkg/scheduler/util"
28+
"k8s.io/utils/ptr"
29+
)
30+
31+
// foreachAllocatedDevice invokes the provided callback for each
32+
// device in the claim's allocation result which was allocated
33+
// exclusively for the claim.
34+
//
35+
// Devices allocated with admin access can be shared with other
36+
// claims and are skipped without invoking the callback.
37+
//
38+
// foreachAllocatedDevice does nothing if the claim is not allocated.
39+
func foreachAllocatedDevice(claim *resourceapi.ResourceClaim, cb func(deviceID structured.DeviceID)) {
40+
if claim.Status.Allocation == nil {
41+
return
42+
}
43+
for _, result := range claim.Status.Allocation.Devices.Results {
44+
// Kubernetes 1.31 did not set this, 1.32 always does.
45+
// Supporting 1.31 is not worth the additional code that
46+
// would have to be written (= looking up in request) because
47+
// it is extremely unlikely that there really is a result
48+
// that still exists in a cluster from 1.31 where this matters.
49+
if ptr.Deref(result.AdminAccess, false) {
50+
// Is not considered as allocated.
51+
continue
52+
}
53+
deviceID := structured.MakeDeviceID(result.Driver, result.Pool, result.Device)
54+
55+
// None of the users of this helper need to abort iterating,
56+
// therefore it's not supported as it only would add overhead.
57+
cb(deviceID)
58+
}
59+
}
60+
61+
// allocatedDevices reacts to events in a cache and maintains a set of all allocated devices.
62+
// This is cheaper than repeatedly calling List, making strings unique, and building the set
63+
// each time PreFilter is called.
64+
//
65+
// All methods are thread-safe. Get returns a cloned set.
66+
type allocatedDevices struct {
67+
logger klog.Logger
68+
69+
mutex sync.RWMutex
70+
ids sets.Set[structured.DeviceID]
71+
}
72+
73+
func newAllocatedDevices(logger klog.Logger) *allocatedDevices {
74+
return &allocatedDevices{
75+
logger: logger,
76+
ids: sets.New[structured.DeviceID](),
77+
}
78+
}
79+
80+
func (a *allocatedDevices) Get() sets.Set[structured.DeviceID] {
81+
a.mutex.RLock()
82+
defer a.mutex.RUnlock()
83+
84+
return a.ids.Clone()
85+
}
86+
87+
func (a *allocatedDevices) handlers() cache.ResourceEventHandler {
88+
return cache.ResourceEventHandlerFuncs{
89+
AddFunc: a.onAdd,
90+
UpdateFunc: a.onUpdate,
91+
DeleteFunc: a.onDelete,
92+
}
93+
}
94+
95+
func (a *allocatedDevices) onAdd(obj any) {
96+
claim, _, err := schedutil.As[*resourceapi.ResourceClaim](obj, nil)
97+
if err != nil {
98+
// Shouldn't happen.
99+
a.logger.Error(err, "unexpected object in allocatedDevices.onAdd")
100+
return
101+
}
102+
103+
if claim.Status.Allocation != nil {
104+
a.addDevices(claim)
105+
}
106+
}
107+
108+
func (a *allocatedDevices) onUpdate(oldObj, newObj any) {
109+
originalClaim, modifiedClaim, err := schedutil.As[*resourceapi.ResourceClaim](oldObj, newObj)
110+
if err != nil {
111+
// Shouldn't happen.
112+
a.logger.Error(err, "unexpected object in allocatedDevices.onUpdate")
113+
return
114+
}
115+
116+
switch {
117+
case originalClaim.Status.Allocation == nil && modifiedClaim.Status.Allocation != nil:
118+
a.addDevices(modifiedClaim)
119+
case originalClaim.Status.Allocation != nil && modifiedClaim.Status.Allocation == nil:
120+
a.removeDevices(originalClaim)
121+
default:
122+
// Nothing to do. Either both nil or both non-nil, in which case the content
123+
// also must be the same (immutable!).
124+
}
125+
}
126+
127+
func (a *allocatedDevices) onDelete(obj any) {
128+
claim, _, err := schedutil.As[*resourceapi.ResourceClaim](obj, nil)
129+
if err != nil {
130+
// Shouldn't happen.
131+
a.logger.Error(err, "unexpected object in allocatedDevices.onDelete")
132+
return
133+
}
134+
135+
a.removeDevices(claim)
136+
}
137+
138+
func (a *allocatedDevices) addDevices(claim *resourceapi.ResourceClaim) {
139+
if claim.Status.Allocation == nil {
140+
return
141+
}
142+
// Locking of the mutex gets minimized by pre-computing what needs to be done
143+
// without holding the lock.
144+
deviceIDs := make([]structured.DeviceID, 0, 20)
145+
foreachAllocatedDevice(claim, func(deviceID structured.DeviceID) {
146+
a.logger.V(6).Info("Observed device allocation", "device", deviceID, "claim", klog.KObj(claim))
147+
deviceIDs = append(deviceIDs, deviceID)
148+
})
149+
150+
a.mutex.Lock()
151+
defer a.mutex.Unlock()
152+
for _, deviceID := range deviceIDs {
153+
a.ids.Insert(deviceID)
154+
}
155+
}
156+
157+
func (a *allocatedDevices) removeDevices(claim *resourceapi.ResourceClaim) {
158+
if claim.Status.Allocation == nil {
159+
return
160+
}
161+
162+
// Locking of the mutex gets minimized by pre-computing what needs to be done
163+
// without holding the lock.
164+
deviceIDs := make([]structured.DeviceID, 0, 20)
165+
foreachAllocatedDevice(claim, func(deviceID structured.DeviceID) {
166+
a.logger.V(6).Info("Observed device deallocation", "device", deviceID, "claim", klog.KObj(claim))
167+
deviceIDs = append(deviceIDs, deviceID)
168+
})
169+
170+
a.mutex.Lock()
171+
defer a.mutex.Unlock()
172+
for _, deviceID := range deviceIDs {
173+
a.ids.Delete(deviceID)
174+
}
175+
}

pkg/scheduler/framework/plugins/dynamicresources/dynamicresources.go

Lines changed: 48 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -30,13 +30,15 @@ import (
3030
apiequality "k8s.io/apimachinery/pkg/api/equality"
3131
apierrors "k8s.io/apimachinery/pkg/api/errors"
3232
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
33+
"k8s.io/apimachinery/pkg/labels"
3334
"k8s.io/apimachinery/pkg/runtime"
3435
"k8s.io/apimachinery/pkg/types"
3536
"k8s.io/apimachinery/pkg/util/sets"
3637
"k8s.io/client-go/kubernetes"
3738
resourcelisters "k8s.io/client-go/listers/resource/v1alpha3"
3839
"k8s.io/client-go/util/retry"
3940
"k8s.io/component-helpers/scheduling/corev1/nodeaffinity"
41+
"k8s.io/dynamic-resource-allocation/cel"
4042
"k8s.io/dynamic-resource-allocation/resourceclaim"
4143
"k8s.io/dynamic-resource-allocation/structured"
4244
"k8s.io/klog/v2"
@@ -85,7 +87,7 @@ type stateData struct {
8587
informationsForClaim []informationForClaim
8688

8789
// nodeAllocations caches the result of Filter for the nodes.
88-
nodeAllocations map[string][]*resourceapi.AllocationResult
90+
nodeAllocations map[string][]resourceapi.AllocationResult
8991
}
9092

9193
func (d *stateData) Clone() framework.StateData {
@@ -106,10 +108,12 @@ type DynamicResources struct {
106108
enableAdminAccess bool
107109
enableSchedulingQueueHint bool
108110

109-
fh framework.Handle
110-
clientset kubernetes.Interface
111-
classLister resourcelisters.DeviceClassLister
112-
sliceLister resourcelisters.ResourceSliceLister
111+
fh framework.Handle
112+
clientset kubernetes.Interface
113+
classLister resourcelisters.DeviceClassLister
114+
sliceLister resourcelisters.ResourceSliceLister
115+
celCache *cel.Cache
116+
allocatedDevices *allocatedDevices
113117

114118
// claimAssumeCache enables temporarily storing a newer claim object
115119
// while the scheduler has allocated it and the corresponding object
@@ -174,6 +178,7 @@ func New(ctx context.Context, plArgs runtime.Object, fh framework.Handle, fts fe
174178
return &DynamicResources{}, nil
175179
}
176180

181+
logger := klog.FromContext(ctx)
177182
pl := &DynamicResources{
178183
enabled: true,
179184
enableAdminAccess: fts.EnableDRAAdminAccess,
@@ -184,8 +189,19 @@ func New(ctx context.Context, plArgs runtime.Object, fh framework.Handle, fts fe
184189
classLister: fh.SharedInformerFactory().Resource().V1alpha3().DeviceClasses().Lister(),
185190
sliceLister: fh.SharedInformerFactory().Resource().V1alpha3().ResourceSlices().Lister(),
186191
claimAssumeCache: fh.ResourceClaimCache(),
192+
193+
// This is a LRU cache for compiled CEL expressions. The most
194+
// recent 10 of them get reused across different scheduling
195+
// cycles.
196+
celCache: cel.NewCache(10),
197+
198+
allocatedDevices: newAllocatedDevices(logger),
187199
}
188200

201+
// Reacting to events is more efficient than iterating over the list
202+
// repeatedly in PreFilter.
203+
pl.claimAssumeCache.AddEventHandler(pl.allocatedDevices.handlers())
204+
189205
return pl, nil
190206
}
191207

@@ -527,39 +543,41 @@ func (pl *DynamicResources) PreFilter(ctx context.Context, state *framework.Cycl
527543
// expensive, we may have to maintain and update state more
528544
// persistently.
529545
//
530-
// Claims are treated as "allocated" if they are in the assume cache
531-
// or currently their allocation is in-flight.
532-
allocator, err := structured.NewAllocator(ctx, pl.enableAdminAccess, allocateClaims, &claimListerForAssumeCache{assumeCache: pl.claimAssumeCache, inFlightAllocations: &pl.inFlightAllocations}, pl.classLister, pl.sliceLister)
546+
// Claims (and thus their devices) are treated as "allocated" if they are in the assume cache
547+
// or currently their allocation is in-flight. This does not change
548+
// during filtering, so we can determine that once.
549+
allAllocatedDevices := pl.listAllAllocatedDevices(logger)
550+
slices, err := pl.sliceLister.List(labels.Everything())
551+
if err != nil {
552+
return nil, statusError(logger, err)
553+
}
554+
allocator, err := structured.NewAllocator(ctx, pl.enableAdminAccess, allocateClaims, allAllocatedDevices, pl.classLister, slices, pl.celCache)
533555
if err != nil {
534556
return nil, statusError(logger, err)
535557
}
536558
s.allocator = allocator
537-
s.nodeAllocations = make(map[string][]*resourceapi.AllocationResult)
559+
s.nodeAllocations = make(map[string][]resourceapi.AllocationResult)
538560
}
539561

540562
s.claims = claims
541563
return nil, nil
542564
}
543565

544-
type claimListerForAssumeCache struct {
545-
assumeCache *assumecache.AssumeCache
546-
inFlightAllocations *sync.Map
547-
}
548-
549-
func (cl *claimListerForAssumeCache) ListAllAllocated() ([]*resourceapi.ResourceClaim, error) {
550-
// Probably not worth adding an index for?
551-
objs := cl.assumeCache.List(nil)
552-
allocated := make([]*resourceapi.ResourceClaim, 0, len(objs))
553-
for _, obj := range objs {
554-
claim := obj.(*resourceapi.ResourceClaim)
555-
if obj, ok := cl.inFlightAllocations.Load(claim.UID); ok {
556-
claim = obj.(*resourceapi.ResourceClaim)
557-
}
558-
if claim.Status.Allocation != nil {
559-
allocated = append(allocated, claim)
560-
}
561-
}
562-
return allocated, nil
566+
func (pl *DynamicResources) listAllAllocatedDevices(logger klog.Logger) sets.Set[structured.DeviceID] {
567+
// Start with a fresh set that matches the current known state of the
568+
// world according to the informers.
569+
allocated := pl.allocatedDevices.Get()
570+
571+
// Whatever is in flight also has to be checked.
572+
pl.inFlightAllocations.Range(func(key, value any) bool {
573+
claim := value.(*resourceapi.ResourceClaim)
574+
foreachAllocatedDevice(claim, func(deviceID structured.DeviceID) {
575+
logger.V(6).Info("Device is in flight for allocation", "device", deviceID, "claim", klog.KObj(claim))
576+
allocated.Insert(deviceID)
577+
})
578+
return true
579+
})
580+
return allocated
563581
}
564582

565583
// PreFilterExtensions returns prefilter extensions, pod add and remove.
@@ -615,7 +633,7 @@ func (pl *DynamicResources) Filter(ctx context.Context, cs *framework.CycleState
615633
}
616634

617635
// Use allocator to check the node and cache the result in case that the node is picked.
618-
var allocations []*resourceapi.AllocationResult
636+
var allocations []resourceapi.AllocationResult
619637
if state.allocator != nil {
620638
allocCtx := ctx
621639
if loggerV := logger.V(5); loggerV.Enabled() {
@@ -763,7 +781,7 @@ func (pl *DynamicResources) Reserve(ctx context.Context, cs *framework.CycleStat
763781
if index < 0 {
764782
return statusError(logger, fmt.Errorf("internal error, claim %s with allocation not found", claim.Name))
765783
}
766-
allocation := allocations[i]
784+
allocation := &allocations[i]
767785
state.informationsForClaim[index].allocation = allocation
768786

769787
// Strictly speaking, we don't need to store the full modified object.
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
/*
2+
Copyright 2024 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package api
18+
19+
import (
20+
"unique"
21+
22+
conversion "k8s.io/apimachinery/pkg/conversion"
23+
"k8s.io/apimachinery/pkg/runtime"
24+
)
25+
26+
var (
27+
localSchemeBuilder runtime.SchemeBuilder
28+
AddToScheme = localSchemeBuilder.AddToScheme
29+
)
30+
31+
func Convert_api_UniqueString_To_string(in *UniqueString, out *string, s conversion.Scope) error {
32+
if *in == NullUniqueString {
33+
*out = ""
34+
return nil
35+
}
36+
*out = in.String()
37+
return nil
38+
}
39+
40+
func Convert_string_To_api_UniqueString(in *string, out *UniqueString, s conversion.Scope) error {
41+
if *in == "" {
42+
*out = NullUniqueString
43+
return nil
44+
}
45+
*out = UniqueString(unique.Make(*in))
46+
return nil
47+
}
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
/*
2+
Copyright 2024 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
// Package api contains a variant of the API where strings are unique. These
18+
// unique strings are faster to compare and more efficient when used as key in
19+
// a map.
20+
//
21+
// +k8s:conversion-gen=k8s.io/api/resource/v1alpha3
22+
package api

0 commit comments

Comments
 (0)