Skip to content

Commit 8d48942

Browse files
committed
scheduler/dynamicresources: extract obtaining and tracking in-memory modifications of DRA objects
All logic related to obtaining DRA objects and tracking modifications to ResourceClaims in-memory is extracted to DefaultDRAManager, which implements framework.SharedDRAManager. This is intended to be a no-op in terms of the DRA plugin behavior.
1 parent 87cd496 commit 8d48942

File tree

6 files changed

+285
-148
lines changed

6 files changed

+285
-148
lines changed
Lines changed: 226 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,226 @@
1+
/*
2+
Copyright 2024 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package dynamicresources
18+
19+
import (
20+
"context"
21+
"fmt"
22+
"sync"
23+
24+
resourceapi "k8s.io/api/resource/v1alpha3"
25+
"k8s.io/apimachinery/pkg/labels"
26+
"k8s.io/apimachinery/pkg/types"
27+
"k8s.io/apimachinery/pkg/util/sets"
28+
"k8s.io/client-go/informers"
29+
resourcelisters "k8s.io/client-go/listers/resource/v1alpha3"
30+
"k8s.io/dynamic-resource-allocation/structured"
31+
"k8s.io/klog/v2"
32+
"k8s.io/kubernetes/pkg/scheduler/framework"
33+
"k8s.io/kubernetes/pkg/scheduler/util/assumecache"
34+
)
35+
36+
var _ framework.SharedDRAManager = &DefaultDRAManager{}
37+
38+
// DefaultDRAManager is the default implementation of SharedDRAManager. It obtains the DRA objects
39+
// from API informers, and uses an AssumeCache and a map of in-flight allocations in order
40+
// to avoid race conditions when modifying ResourceClaims.
41+
type DefaultDRAManager struct {
42+
resourceClaimTracker *claimTracker
43+
resourceSliceLister *resourceSliceLister
44+
deviceClassLister *deviceClassLister
45+
}
46+
47+
func NewDRAManager(ctx context.Context, claimsCache *assumecache.AssumeCache, informerFactory informers.SharedInformerFactory) *DefaultDRAManager {
48+
logger := klog.FromContext(ctx)
49+
manager := &DefaultDRAManager{
50+
resourceClaimTracker: &claimTracker{
51+
cache: claimsCache,
52+
inFlightAllocations: &sync.Map{},
53+
allocatedDevices: newAllocatedDevices(logger),
54+
logger: logger,
55+
},
56+
resourceSliceLister: &resourceSliceLister{sliceLister: informerFactory.Resource().V1alpha3().ResourceSlices().Lister()},
57+
deviceClassLister: &deviceClassLister{classLister: informerFactory.Resource().V1alpha3().DeviceClasses().Lister()},
58+
}
59+
60+
// Reacting to events is more efficient than iterating over the list
61+
// repeatedly in PreFilter.
62+
manager.resourceClaimTracker.cache.AddEventHandler(manager.resourceClaimTracker.allocatedDevices.handlers())
63+
64+
return manager
65+
}
66+
67+
func (s *DefaultDRAManager) ResourceClaims() framework.ResourceClaimTracker {
68+
return s.resourceClaimTracker
69+
}
70+
71+
func (s *DefaultDRAManager) ResourceSlices() framework.ResourceSliceLister {
72+
return s.resourceSliceLister
73+
}
74+
75+
func (s *DefaultDRAManager) DeviceClasses() framework.DeviceClassLister {
76+
return s.deviceClassLister
77+
}
78+
79+
var _ framework.ResourceSliceLister = &resourceSliceLister{}
80+
81+
type resourceSliceLister struct {
82+
sliceLister resourcelisters.ResourceSliceLister
83+
}
84+
85+
func (l *resourceSliceLister) List() ([]*resourceapi.ResourceSlice, error) {
86+
return l.sliceLister.List(labels.Everything())
87+
}
88+
89+
var _ framework.DeviceClassLister = &deviceClassLister{}
90+
91+
type deviceClassLister struct {
92+
classLister resourcelisters.DeviceClassLister
93+
}
94+
95+
func (l *deviceClassLister) Get(className string) (*resourceapi.DeviceClass, error) {
96+
return l.classLister.Get(className)
97+
}
98+
99+
func (l *deviceClassLister) List() ([]*resourceapi.DeviceClass, error) {
100+
return l.classLister.List(labels.Everything())
101+
}
102+
103+
var _ framework.ResourceClaimTracker = &claimTracker{}
104+
105+
type claimTracker struct {
106+
// cache enables temporarily storing a newer claim object
107+
// while the scheduler has allocated it and the corresponding object
108+
// update from the apiserver has not been processed by the claim
109+
// informer callbacks. ResourceClaimTracker get added here in PreBind and removed by
110+
// the informer callback (based on the "newer than" comparison in the
111+
// assume cache).
112+
//
113+
// It uses cache.MetaNamespaceKeyFunc to generate object names, which
114+
// therefore are "<namespace>/<name>".
115+
//
116+
// This is necessary to ensure that reconstructing the resource usage
117+
// at the start of a pod scheduling cycle doesn't reuse the resources
118+
// assigned to such a claim. Alternatively, claim allocation state
119+
// could also get tracked across pod scheduling cycles, but that
120+
// - adds complexity (need to carefully sync state with informer events
121+
// for claims and ResourceSlices)
122+
// - would make integration with cluster autoscaler harder because it would need
123+
// to trigger informer callbacks.
124+
cache *assumecache.AssumeCache
125+
// inFlightAllocations is a map from claim UUIDs to claim objects for those claims
126+
// for which allocation was triggered during a scheduling cycle and the
127+
// corresponding claim status update call in PreBind has not been done
128+
// yet. If another pod needs the claim, the pod is treated as "not
129+
// schedulable yet". The cluster event for the claim status update will
130+
// make it schedulable.
131+
//
132+
// This mechanism avoids the following problem:
133+
// - Pod A triggers allocation for claim X.
134+
// - Pod B shares access to that claim and gets scheduled because
135+
// the claim is assumed to be allocated.
136+
// - PreBind for pod B is called first, tries to update reservedFor and
137+
// fails because the claim is not really allocated yet.
138+
//
139+
// We could avoid the ordering problem by allowing either pod A or pod B
140+
// to set the allocation. But that is more complicated and leads to another
141+
// problem:
142+
// - Pod A and B get scheduled as above.
143+
// - PreBind for pod A gets called first, then fails with a temporary API error.
144+
// It removes the updated claim from the assume cache because of that.
145+
// - PreBind for pod B gets called next and succeeds with adding the
146+
// allocation and its own reservedFor entry.
147+
// - The assume cache is now not reflecting that the claim is allocated,
148+
// which could lead to reusing the same resource for some other claim.
149+
//
150+
// A sync.Map is used because in practice sharing of a claim between
151+
// pods is expected to be rare compared to per-pod claim, so we end up
152+
// hitting the "multiple goroutines read, write, and overwrite entries
153+
// for disjoint sets of keys" case that sync.Map is optimized for.
154+
inFlightAllocations *sync.Map
155+
allocatedDevices *allocatedDevices
156+
logger klog.Logger
157+
}
158+
159+
func (c *claimTracker) ClaimHasPendingAllocation(claimUID types.UID) bool {
160+
_, found := c.inFlightAllocations.Load(claimUID)
161+
return found
162+
}
163+
164+
func (c *claimTracker) SignalClaimPendingAllocation(claimUID types.UID, allocatedClaim *resourceapi.ResourceClaim) error {
165+
c.inFlightAllocations.Store(claimUID, allocatedClaim)
166+
// There's no reason to return an error in this implementation, but the error is helpful for other implementations.
167+
// For example, implementations that have to deal with fake claims might want to return an error if the allocation
168+
// is for an invalid claim.
169+
return nil
170+
}
171+
172+
func (c *claimTracker) RemoveClaimPendingAllocation(claimUID types.UID) (deleted bool) {
173+
_, found := c.inFlightAllocations.LoadAndDelete(claimUID)
174+
return found
175+
}
176+
177+
func (c *claimTracker) Get(namespace, claimName string) (*resourceapi.ResourceClaim, error) {
178+
obj, err := c.cache.Get(namespace + "/" + claimName)
179+
if err != nil {
180+
return nil, err
181+
}
182+
claim, ok := obj.(*resourceapi.ResourceClaim)
183+
if !ok {
184+
return nil, fmt.Errorf("unexpected object type %T for assumed object %s/%s", obj, namespace, claimName)
185+
}
186+
return claim, nil
187+
}
188+
189+
func (c *claimTracker) List() ([]*resourceapi.ResourceClaim, error) {
190+
var result []*resourceapi.ResourceClaim
191+
// Probably not worth adding an index for?
192+
objs := c.cache.List(nil)
193+
for _, obj := range objs {
194+
claim, ok := obj.(*resourceapi.ResourceClaim)
195+
if ok {
196+
result = append(result, claim)
197+
}
198+
}
199+
return result, nil
200+
}
201+
202+
func (c *claimTracker) ListAllAllocatedDevices() (sets.Set[structured.DeviceID], error) {
203+
// Start with a fresh set that matches the current known state of the
204+
// world according to the informers.
205+
allocated := c.allocatedDevices.Get()
206+
207+
// Whatever is in flight also has to be checked.
208+
c.inFlightAllocations.Range(func(key, value any) bool {
209+
claim := value.(*resourceapi.ResourceClaim)
210+
foreachAllocatedDevice(claim, func(deviceID structured.DeviceID) {
211+
c.logger.V(6).Info("Device is in flight for allocation", "device", deviceID, "claim", klog.KObj(claim))
212+
allocated.Insert(deviceID)
213+
})
214+
return true
215+
})
216+
// There's no reason to return an error in this implementation, but the error might be helpful for other implementations.
217+
return allocated, nil
218+
}
219+
220+
func (c *claimTracker) AssumeClaimAfterAPICall(claim *resourceapi.ResourceClaim) error {
221+
return c.cache.Assume(claim)
222+
}
223+
224+
func (c *claimTracker) AssumedClaimRestore(namespace, claimName string) {
225+
c.cache.Restore(namespace + "/" + claimName)
226+
}

0 commit comments

Comments
 (0)