Skip to content

Commit bc55e82

Browse files
committed
DRA scheduler: maintain a set of allocated device IDs
Reacting to events from the informer cache (indirectly, through the assume cache) is more efficient than repeatedly listing it's content and then converting to IDs with unique strings. goos: linux goarch: amd64 pkg: k8s.io/kubernetes/test/integration/scheduler_perf cpu: Intel(R) Core(TM) i9-7980XE CPU @ 2.60GHz │ before │ after │ │ SchedulingThroughput/Average │ SchedulingThroughput/Average vs base │ PerfScheduling/SchedulingWithResourceClaimTemplateStructured/5000pods_500nodes-36 54.70 ± 6% 76.81 ± 6% +40.42% (p=0.002 n=6) PerfScheduling/SteadyStateClusterResourceClaimTemplateStructured/empty_100nodes-36 106.4 ± 4% 105.6 ± 2% ~ (p=0.413 n=6) PerfScheduling/SteadyStateClusterResourceClaimTemplateStructured/empty_500nodes-36 120.0 ± 4% 118.9 ± 7% ~ (p=0.117 n=6) PerfScheduling/SteadyStateClusterResourceClaimTemplateStructured/half_100nodes-36 112.5 ± 4% 105.9 ± 4% -5.87% (p=0.002 n=6) PerfScheduling/SteadyStateClusterResourceClaimTemplateStructured/half_500nodes-36 87.13 ± 4% 123.55 ± 4% +41.80% (p=0.002 n=6) PerfScheduling/SteadyStateClusterResourceClaimTemplateStructured/full_100nodes-36 113.4 ± 2% 103.3 ± 2% -8.95% (p=0.002 n=6) PerfScheduling/SteadyStateClusterResourceClaimTemplateStructured/full_500nodes-36 65.55 ± 3% 121.30 ± 3% +85.05% (p=0.002 n=6) geomean 90.81 106.8 +17.57%
1 parent f0efb8a commit bc55e82

File tree

5 files changed

+195
-33
lines changed

5 files changed

+195
-33
lines changed
Lines changed: 158 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,158 @@
1+
/*
2+
Copyright 2024 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package dynamicresources
18+
19+
import (
20+
"sync"
21+
22+
resourceapi "k8s.io/api/resource/v1alpha3"
23+
"k8s.io/apimachinery/pkg/util/sets"
24+
"k8s.io/client-go/tools/cache"
25+
"k8s.io/dynamic-resource-allocation/structured"
26+
"k8s.io/klog/v2"
27+
schedutil "k8s.io/kubernetes/pkg/scheduler/util"
28+
"k8s.io/utils/ptr"
29+
)
30+
31+
// allocatedDevices reacts to events in a cache and maintains a set of all allocated devices.
32+
// This is cheaper than repeatedly calling List, making strings unique, and building the set
33+
// each time PreFilter is called.
34+
//
35+
// All methods are thread-safe. Get returns a cloned set.
36+
type allocatedDevices struct {
37+
logger klog.Logger
38+
39+
mutex sync.RWMutex
40+
ids sets.Set[structured.DeviceID]
41+
}
42+
43+
func newAllocatedDevices(logger klog.Logger) *allocatedDevices {
44+
return &allocatedDevices{
45+
logger: logger,
46+
ids: sets.New[structured.DeviceID](),
47+
}
48+
}
49+
50+
func (a *allocatedDevices) Get() sets.Set[structured.DeviceID] {
51+
a.mutex.RLock()
52+
defer a.mutex.RUnlock()
53+
54+
return a.ids.Clone()
55+
}
56+
57+
func (a *allocatedDevices) handlers() cache.ResourceEventHandler {
58+
return cache.ResourceEventHandlerFuncs{
59+
AddFunc: a.onAdd,
60+
UpdateFunc: a.onUpdate,
61+
DeleteFunc: a.onDelete,
62+
}
63+
}
64+
65+
func (a *allocatedDevices) onAdd(obj any) {
66+
claim, _, err := schedutil.As[*resourceapi.ResourceClaim](obj, nil)
67+
if err != nil {
68+
// Shouldn't happen.
69+
a.logger.Error(err, "unexpected object in allocatedDevices.onAdd")
70+
return
71+
}
72+
73+
if claim.Status.Allocation != nil {
74+
a.addDevices(claim)
75+
}
76+
}
77+
78+
func (a *allocatedDevices) onUpdate(oldObj, newObj any) {
79+
originalClaim, modifiedClaim, err := schedutil.As[*resourceapi.ResourceClaim](oldObj, newObj)
80+
if err != nil {
81+
// Shouldn't happen.
82+
a.logger.Error(err, "unexpected object in allocatedDevices.onUpdate")
83+
return
84+
}
85+
86+
switch {
87+
case originalClaim.Status.Allocation == nil && modifiedClaim.Status.Allocation != nil:
88+
a.addDevices(modifiedClaim)
89+
case originalClaim.Status.Allocation != nil && modifiedClaim.Status.Allocation == nil:
90+
a.removeDevices(originalClaim)
91+
default:
92+
// Nothing to do. Either both nil or both non-nil, in which case the content
93+
// also must be the same (immutable!).
94+
}
95+
}
96+
97+
func (a *allocatedDevices) onDelete(obj any) {
98+
claim, _, err := schedutil.As[*resourceapi.ResourceClaim](obj, nil)
99+
if err != nil {
100+
// Shouldn't happen.
101+
a.logger.Error(err, "unexpected object in allocatedDevices.onDelete")
102+
return
103+
}
104+
105+
a.removeDevices(claim)
106+
}
107+
108+
func (a *allocatedDevices) addDevices(claim *resourceapi.ResourceClaim) {
109+
if claim.Status.Allocation == nil {
110+
return
111+
}
112+
// Locking of the mutex gets minimized by pre-computing what needs to be done
113+
// without holding the lock.
114+
deviceIDs := make([]structured.DeviceID, 0, 20)
115+
116+
for _, result := range claim.Status.Allocation.Devices.Results {
117+
if ptr.Deref(result.AdminAccess, false) {
118+
// Is not considered as allocated.
119+
continue
120+
}
121+
deviceID := structured.MakeDeviceID(result.Driver, result.Pool, result.Device)
122+
a.logger.V(6).Info("Device was allocated", "device", deviceID, "claim", klog.KObj(claim))
123+
deviceIDs = append(deviceIDs, deviceID)
124+
}
125+
126+
a.mutex.Lock()
127+
defer a.mutex.Unlock()
128+
for _, deviceID := range deviceIDs {
129+
a.ids.Insert(deviceID)
130+
}
131+
}
132+
133+
func (a *allocatedDevices) removeDevices(claim *resourceapi.ResourceClaim) {
134+
if claim.Status.Allocation == nil {
135+
return
136+
}
137+
138+
// Locking of the mutex gets minimized by pre-computing what needs to be done
139+
// without holding the lock.
140+
deviceIDs := make([]structured.DeviceID, 0, 20)
141+
142+
for _, result := range claim.Status.Allocation.Devices.Results {
143+
if ptr.Deref(result.AdminAccess, false) {
144+
// Is not considered as allocated and thus does not need to be removed
145+
// because of this claim.
146+
continue
147+
}
148+
deviceID := structured.MakeDeviceID(result.Driver, result.Pool, result.Device)
149+
a.logger.V(6).Info("Device was deallocated", "device", deviceID, "claim", klog.KObj(claim))
150+
deviceIDs = append(deviceIDs, deviceID)
151+
}
152+
153+
a.mutex.Lock()
154+
defer a.mutex.Unlock()
155+
for _, deviceID := range deviceIDs {
156+
a.ids.Delete(deviceID)
157+
}
158+
}

pkg/scheduler/framework/plugins/dynamicresources/dynamicresources.go

Lines changed: 26 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -108,11 +108,12 @@ type DynamicResources struct {
108108
enableAdminAccess bool
109109
enableSchedulingQueueHint bool
110110

111-
fh framework.Handle
112-
clientset kubernetes.Interface
113-
classLister resourcelisters.DeviceClassLister
114-
sliceLister resourcelisters.ResourceSliceLister
115-
celCache *structured.CELCache
111+
fh framework.Handle
112+
clientset kubernetes.Interface
113+
classLister resourcelisters.DeviceClassLister
114+
sliceLister resourcelisters.ResourceSliceLister
115+
celCache *structured.CELCache
116+
allocatedDevices *allocatedDevices
116117

117118
// claimAssumeCache enables temporarily storing a newer claim object
118119
// while the scheduler has allocated it and the corresponding object
@@ -177,6 +178,7 @@ func New(ctx context.Context, plArgs runtime.Object, fh framework.Handle, fts fe
177178
return &DynamicResources{}, nil
178179
}
179180

181+
logger := klog.FromContext(ctx)
180182
pl := &DynamicResources{
181183
enabled: true,
182184
enableAdminAccess: fts.EnableDRAAdminAccess,
@@ -192,8 +194,14 @@ func New(ctx context.Context, plArgs runtime.Object, fh framework.Handle, fts fe
192194
// recent 10 of them get reused across different scheduling
193195
// cycles.
194196
celCache: structured.NewCELCache(10),
197+
198+
allocatedDevices: newAllocatedDevices(logger),
195199
}
196200

201+
// Reacting to events is more efficient than iterating over the list
202+
// repeatedly in PreFilter.
203+
pl.claimAssumeCache.AddEventHandler(pl.allocatedDevices.handlers())
204+
197205
return pl, nil
198206
}
199207

@@ -538,10 +546,7 @@ func (pl *DynamicResources) PreFilter(ctx context.Context, state *framework.Cycl
538546
// Claims are treated as "allocated" if they are in the assume cache
539547
// or currently their allocation is in-flight. This does not change
540548
// during filtering, so we can determine that once.
541-
allocatedDevices := pl.listAllAllocatedDevices()
542-
if err != nil {
543-
return nil, statusError(logger, err)
544-
}
549+
allocatedDevices := pl.listAllAllocatedDevices(logger)
545550
slices, err := pl.sliceLister.List(labels.Everything())
546551
if err != nil {
547552
return nil, statusError(logger, err)
@@ -558,18 +563,14 @@ func (pl *DynamicResources) PreFilter(ctx context.Context, state *framework.Cycl
558563
return nil, nil
559564
}
560565

561-
func (pl *DynamicResources) listAllAllocatedDevices() []structured.DeviceID {
562-
// Probably not worth adding an index for?
563-
objs := pl.claimAssumeCache.List(nil)
564-
var allocated []structured.DeviceID
565-
for _, obj := range objs {
566-
claim := obj.(*resourceapi.ResourceClaim)
567-
if obj, ok := pl.inFlightAllocations.Load(claim.UID); ok {
568-
claim = obj.(*resourceapi.ResourceClaim)
569-
}
570-
if claim.Status.Allocation == nil {
571-
continue
572-
}
566+
func (pl *DynamicResources) listAllAllocatedDevices(logger klog.Logger) sets.Set[structured.DeviceID] {
567+
// Start with a fresh set that matches the current known state of the
568+
// world according to the informers.
569+
allocated := pl.allocatedDevices.Get()
570+
571+
// Whatever is in flight also has to be checked.
572+
pl.inFlightAllocations.Range(func(key, value any) bool {
573+
claim := value.(*resourceapi.ResourceClaim)
573574
for _, result := range claim.Status.Allocation.Devices.Results {
574575
// Kubernetes 1.31 did not set this, 1.32 always does.
575576
// Supporting 1.31 is not worth the additional code that
@@ -581,9 +582,11 @@ func (pl *DynamicResources) listAllAllocatedDevices() []structured.DeviceID {
581582
continue
582583
}
583584
deviceID := structured.MakeDeviceID(result.Driver, result.Pool, result.Device)
584-
allocated = append(allocated, deviceID)
585+
logger.V(6).Info("Device is in flight for allocation", "device", deviceID, "claim", klog.KObj(claim))
586+
allocated.Insert(deviceID)
585587
}
586-
}
588+
return true
589+
})
587590
return allocated
588591
}
589592

staging/src/k8s.io/dynamic-resource-allocation/structured/allocator.go

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -58,20 +58,19 @@ type Allocator struct {
5858
func NewAllocator(ctx context.Context,
5959
adminAccessEnabled bool,
6060
claimsToAllocate []*resourceapi.ResourceClaim,
61-
allocatedDevices []DeviceID,
61+
allocatedDevices sets.Set[DeviceID],
6262
classLister resourcelisters.DeviceClassLister,
6363
slices []*resourceapi.ResourceSlice,
6464
celCache *CELCache,
6565
) (*Allocator, error) {
6666
return &Allocator{
6767
adminAccessEnabled: adminAccessEnabled,
6868
claimsToAllocate: claimsToAllocate,
69-
// This won't change, so build this set only once.
70-
allocatedDevices: sets.New(allocatedDevices...),
71-
classLister: classLister,
72-
slices: slices,
73-
celCache: celCache,
74-
celMutex: keymutex.NewHashed(0),
69+
allocatedDevices: allocatedDevices,
70+
classLister: classLister,
71+
slices: slices,
72+
celCache: celCache,
73+
celMutex: keymutex.NewHashed(0),
7574
}, nil
7675
}
7776

staging/src/k8s.io/dynamic-resource-allocation/structured/allocator_test.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ import (
3636
"k8s.io/apimachinery/pkg/labels"
3737
"k8s.io/apimachinery/pkg/runtime"
3838
"k8s.io/apimachinery/pkg/runtime/schema"
39+
"k8s.io/apimachinery/pkg/util/sets"
3940
"k8s.io/klog/v2/ktesting"
4041
"k8s.io/utils/ptr"
4142
)
@@ -1375,7 +1376,7 @@ func TestAllocator(t *testing.T) {
13751376
allocatedDevices := slices.Clone(tc.allocatedDevices)
13761377
slices := slices.Clone(tc.slices)
13771378

1378-
allocator, err := NewAllocator(ctx, tc.adminAccess, claimsToAllocate, allocatedDevices, classLister, slices, NewCELCache(1))
1379+
allocator, err := NewAllocator(ctx, tc.adminAccess, claimsToAllocate, sets.New(allocatedDevices...), classLister, slices, NewCELCache(1))
13791380
g.Expect(err).ToNot(gomega.HaveOccurred())
13801381

13811382
results, err := allocator.Allocate(ctx, tc.node)

test/integration/scheduler_perf/dra.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ import (
3030
"k8s.io/apimachinery/pkg/api/resource"
3131
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
3232
"k8s.io/apimachinery/pkg/labels"
33+
"k8s.io/apimachinery/pkg/util/sets"
3334
utilfeature "k8s.io/apiserver/pkg/util/feature"
3435
"k8s.io/client-go/informers"
3536
"k8s.io/client-go/util/workqueue"
@@ -309,14 +310,14 @@ claims:
309310
}
310311

311312
objs := claimCache.List(nil)
312-
allocatedDevices := make([]structured.DeviceID, 0, len(objs))
313+
allocatedDevices := sets.New[structured.DeviceID]()
313314
for _, obj := range objs {
314315
claim := obj.(*resourceapi.ResourceClaim)
315316
if claim.Status.Allocation == nil {
316317
continue
317318
}
318319
for _, result := range claim.Status.Allocation.Devices.Results {
319-
allocatedDevices = append(allocatedDevices, structured.MakeDeviceID(result.Driver, result.Pool, result.Device))
320+
allocatedDevices.Insert(structured.MakeDeviceID(result.Driver, result.Pool, result.Device))
320321
}
321322
}
322323

0 commit comments

Comments
 (0)