Skip to content

Commit 87cd496

Browse files
committed
scheduler/framework: introduce pluggable SharedDRAManager
SharedDRAManager will be used by the DRA plugin to obtain DRA objects, and to track modifications to them in-memory. The current DRA plugin behavior will be the default implementation of SharedDRAManager. Plugging a different implementation will allow Cluster Autoscaler to provide a simulated state of DRA objects to the DRA plugin when making scheduling simulations, as well as obtain the modifications to DRA objects from the plugin.
1 parent 2bb886c commit 87cd496

File tree

4 files changed

+90
-21
lines changed

4 files changed

+90
-21
lines changed

pkg/scheduler/framework/interface.go

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,6 @@ import (
3939
"k8s.io/klog/v2"
4040
"k8s.io/kubernetes/pkg/scheduler/apis/config"
4141
"k8s.io/kubernetes/pkg/scheduler/framework/parallelize"
42-
"k8s.io/kubernetes/pkg/scheduler/util/assumecache"
4342
)
4443

4544
// NodeScoreList declares a list of nodes and their scores.
@@ -820,10 +819,9 @@ type Handle interface {
820819

821820
SharedInformerFactory() informers.SharedInformerFactory
822821

823-
// ResourceClaimCache returns an assume cache of ResourceClaim objects
824-
// which gets populated by the shared informer factory and the dynamic resources
825-
// plugin.
826-
ResourceClaimCache() *assumecache.AssumeCache
822+
// SharedDRAManager can be used to obtain DRA objects, and track modifications to them in-memory - mainly by the DRA plugin.
823+
// A non-default implementation can be plugged into the framework to simulate the state of DRA objects.
824+
SharedDRAManager() SharedDRAManager
827825

828826
// RunFilterPluginsWithNominatedPods runs the set of configured filter plugins for nominated pod on the given node.
829827
RunFilterPluginsWithNominatedPods(ctx context.Context, state *CycleState, pod *v1.Pod, info *NodeInfo) *Status

pkg/scheduler/framework/listers.go

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,13 @@ limitations under the License.
1616

1717
package framework
1818

19+
import (
20+
resourceapi "k8s.io/api/resource/v1alpha3"
21+
"k8s.io/apimachinery/pkg/types"
22+
"k8s.io/apimachinery/pkg/util/sets"
23+
"k8s.io/dynamic-resource-allocation/structured"
24+
)
25+
1926
// NodeInfoLister interface represents anything that can list/get NodeInfo objects from node name.
2027
type NodeInfoLister interface {
2128
// List returns the list of NodeInfos.
@@ -40,3 +47,65 @@ type SharedLister interface {
4047
NodeInfos() NodeInfoLister
4148
StorageInfos() StorageInfoLister
4249
}
50+
51+
// ResourceSliceLister can be used to obtain ResourceSlices.
52+
type ResourceSliceLister interface {
53+
// List returns a list of all ResourceSlices.
54+
List() ([]*resourceapi.ResourceSlice, error)
55+
}
56+
57+
// DeviceClassLister can be used to obtain DeviceClasses.
58+
type DeviceClassLister interface {
59+
// List returns a list of all DeviceClasses.
60+
List() ([]*resourceapi.DeviceClass, error)
61+
// Get returns the DeviceClass with the given className.
62+
Get(className string) (*resourceapi.DeviceClass, error)
63+
}
64+
65+
// ResourceClaimTracker can be used to obtain ResourceClaims, and track changes to ResourceClaims in-memory.
66+
//
67+
// If the claims are meant to be allocated in the API during the binding phase (when used by scheduler), the tracker helps avoid
68+
// race conditions between scheduling and binding phases (as well as between the binding phase and the informer cache update).
69+
//
70+
// If the binding phase is not run (e.g. when used by Cluster Autoscaler which only runs the scheduling phase, and simulates binding in-memory),
71+
// the tracker allows the framework user to obtain the claim allocations produced by the DRA plugin, and persist them outside of the API (e.g. in-memory).
72+
type ResourceClaimTracker interface {
73+
// List lists ResourceClaims. The result is guaranteed to immediately include any changes made via AssumeClaimAfterAPICall(),
74+
// and SignalClaimPendingAllocation().
75+
List() ([]*resourceapi.ResourceClaim, error)
76+
// Get works like List(), but for a single claim.
77+
Get(namespace, claimName string) (*resourceapi.ResourceClaim, error)
78+
// ListAllAllocatedDevices lists all allocated Devices from allocated ResourceClaims. The result is guaranteed to immediately include
79+
// any changes made via AssumeClaimAfterAPICall(), and SignalClaimPendingAllocation().
80+
ListAllAllocatedDevices() (sets.Set[structured.DeviceID], error)
81+
82+
// SignalClaimPendingAllocation signals to the tracker that the given ResourceClaim will be allocated via an API call in the
83+
// binding phase. This change is immediately reflected in the result of List() and the other accessors.
84+
SignalClaimPendingAllocation(claimUID types.UID, allocatedClaim *resourceapi.ResourceClaim) error
85+
// ClaimHasPendingAllocation answers whether a given claim has a pending allocation during the binding phase. It can be used to avoid
86+
// race conditions in subsequent scheduling phases.
87+
ClaimHasPendingAllocation(claimUID types.UID) bool
88+
// RemoveClaimPendingAllocation removes the pending allocation for the given ResourceClaim from the tracker if any was signaled via
89+
// SignalClaimPendingAllocation(). Returns whether there was a pending allocation to remove. List() and the other accessors immediately
90+
// stop reflecting the pending allocation in the results.
91+
RemoveClaimPendingAllocation(claimUID types.UID) (deleted bool)
92+
93+
// AssumeClaimAfterAPICall signals to the tracker that an API call modifying the given ResourceClaim was made in the binding phase, and the
94+
// changes should be reflected in informers very soon. This change is immediately reflected in the result of List() and the other accessors.
95+
// This mechanism can be used to avoid race conditions between the informer update and subsequent scheduling phases.
96+
AssumeClaimAfterAPICall(claim *resourceapi.ResourceClaim) error
97+
// AssumedClaimRestore signals to the tracker that something went wrong with the API call modifying the given ResourceClaim, and
98+
// the changes won't be reflected in informers after all. List() and the other accessors immediately stop reflecting the assumed change,
99+
// and go back to the informer version.
100+
AssumedClaimRestore(namespace, claimName string)
101+
}
102+
103+
// SharedDRAManager can be used to obtain DRA objects, and track modifications to them in-memory - mainly by the DRA plugin.
104+
// The plugin's default implementation obtains the objects from the API. A different implementation can be
105+
// plugged into the framework in order to simulate the state of DRA objects. For example, Cluster Autoscaler
106+
// can use this to provide the correct DRA object state to the DRA plugin when simulating scheduling changes in-memory.
107+
type SharedDRAManager interface {
108+
ResourceClaims() ResourceClaimTracker
109+
ResourceSlices() ResourceSliceLister
110+
DeviceClasses() DeviceClassLister
111+
}

pkg/scheduler/framework/runtime/framework.go

Lines changed: 14 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,6 @@ import (
3939
"k8s.io/kubernetes/pkg/scheduler/framework"
4040
"k8s.io/kubernetes/pkg/scheduler/framework/parallelize"
4141
"k8s.io/kubernetes/pkg/scheduler/metrics"
42-
"k8s.io/kubernetes/pkg/scheduler/util/assumecache"
4342
"k8s.io/kubernetes/pkg/util/slice"
4443
)
4544

@@ -72,12 +71,12 @@ type frameworkImpl struct {
7271
// pluginsMap contains all plugins, by name.
7372
pluginsMap map[string]framework.Plugin
7473

75-
clientSet clientset.Interface
76-
kubeConfig *restclient.Config
77-
eventRecorder events.EventRecorder
78-
informerFactory informers.SharedInformerFactory
79-
resourceClaimCache *assumecache.AssumeCache
80-
logger klog.Logger
74+
clientSet clientset.Interface
75+
kubeConfig *restclient.Config
76+
eventRecorder events.EventRecorder
77+
informerFactory informers.SharedInformerFactory
78+
sharedDRAManager framework.SharedDRAManager
79+
logger klog.Logger
8180

8281
metricsRecorder *metrics.MetricAsyncRecorder
8382
profileName string
@@ -128,7 +127,7 @@ type frameworkOptions struct {
128127
kubeConfig *restclient.Config
129128
eventRecorder events.EventRecorder
130129
informerFactory informers.SharedInformerFactory
131-
resourceClaimCache *assumecache.AssumeCache
130+
sharedDRAManager framework.SharedDRAManager
132131
snapshotSharedLister framework.SharedLister
133132
metricsRecorder *metrics.MetricAsyncRecorder
134133
podNominator framework.PodNominator
@@ -180,10 +179,10 @@ func WithInformerFactory(informerFactory informers.SharedInformerFactory) Option
180179
}
181180
}
182181

183-
// WithResourceClaimCache sets the resource claim cache for the scheduling frameworkImpl.
184-
func WithResourceClaimCache(resourceClaimCache *assumecache.AssumeCache) Option {
182+
// WithSharedDRAManager sets SharedDRAManager for the framework.
183+
func WithSharedDRAManager(sharedDRAManager framework.SharedDRAManager) Option {
185184
return func(o *frameworkOptions) {
186-
o.resourceClaimCache = resourceClaimCache
185+
o.sharedDRAManager = sharedDRAManager
187186
}
188187
}
189188

@@ -267,7 +266,6 @@ func NewFramework(ctx context.Context, r Registry, profile *config.KubeScheduler
267266
if options.logger != nil {
268267
logger = *options.logger
269268
}
270-
271269
f := &frameworkImpl{
272270
registry: r,
273271
snapshotSharedLister: options.snapshotSharedLister,
@@ -277,7 +275,7 @@ func NewFramework(ctx context.Context, r Registry, profile *config.KubeScheduler
277275
kubeConfig: options.kubeConfig,
278276
eventRecorder: options.eventRecorder,
279277
informerFactory: options.informerFactory,
280-
resourceClaimCache: options.resourceClaimCache,
278+
sharedDRAManager: options.sharedDRAManager,
281279
metricsRecorder: options.metricsRecorder,
282280
extenders: options.extenders,
283281
PodNominator: options.podNominator,
@@ -1617,8 +1615,9 @@ func (f *frameworkImpl) SharedInformerFactory() informers.SharedInformerFactory
16171615
return f.informerFactory
16181616
}
16191617

1620-
func (f *frameworkImpl) ResourceClaimCache() *assumecache.AssumeCache {
1621-
return f.resourceClaimCache
1618+
// SharedDRAManager returns the SharedDRAManager of the framework.
1619+
func (f *frameworkImpl) SharedDRAManager() framework.SharedDRAManager {
1620+
return f.sharedDRAManager
16221621
}
16231622

16241623
func (f *frameworkImpl) pluginsNeeded(plugins *config.Plugins) sets.Set[string] {

pkg/scheduler/scheduler.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ import (
4444
"k8s.io/kubernetes/pkg/scheduler/framework"
4545
"k8s.io/kubernetes/pkg/scheduler/framework/parallelize"
4646
frameworkplugins "k8s.io/kubernetes/pkg/scheduler/framework/plugins"
47+
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/dynamicresources"
4748
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/noderesources"
4849
frameworkruntime "k8s.io/kubernetes/pkg/scheduler/framework/runtime"
4950
"k8s.io/kubernetes/pkg/scheduler/metrics"
@@ -296,17 +297,19 @@ func New(ctx context.Context,
296297
waitingPods := frameworkruntime.NewWaitingPodsMap()
297298

298299
var resourceClaimCache *assumecache.AssumeCache
300+
var draManager framework.SharedDRAManager
299301
if utilfeature.DefaultFeatureGate.Enabled(features.DynamicResourceAllocation) {
300302
resourceClaimInformer := informerFactory.Resource().V1alpha3().ResourceClaims().Informer()
301303
resourceClaimCache = assumecache.NewAssumeCache(logger, resourceClaimInformer, "ResourceClaim", "", nil)
304+
draManager = dynamicresources.NewDRAManager(ctx, resourceClaimCache, informerFactory)
302305
}
303306

304307
profiles, err := profile.NewMap(ctx, options.profiles, registry, recorderFactory,
305308
frameworkruntime.WithComponentConfigVersion(options.componentConfigVersion),
306309
frameworkruntime.WithClientSet(client),
307310
frameworkruntime.WithKubeConfig(options.kubeConfig),
308311
frameworkruntime.WithInformerFactory(informerFactory),
309-
frameworkruntime.WithResourceClaimCache(resourceClaimCache),
312+
frameworkruntime.WithSharedDRAManager(draManager),
310313
frameworkruntime.WithSnapshotSharedLister(snapshot),
311314
frameworkruntime.WithCaptureProfile(frameworkruntime.CaptureProfile(options.frameworkCapturer)),
312315
frameworkruntime.WithParallelism(int(options.parallelism)),

0 commit comments

Comments
 (0)