Skip to content

Commit 913340a

Browse files
committed
dra(resourceslice): use index lookup instead of using sliceStore.List()
1 parent 7c599d3 commit 913340a

File tree

4 files changed

+344
-36
lines changed

4 files changed

+344
-36
lines changed

staging/src/k8s.io/dynamic-resource-allocation/kubeletplugin/draplugin.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -406,7 +406,10 @@ func (d *draPlugin) PublishResources(ctx context.Context, resources Resources) e
406406
controllerLogger := klog.FromContext(controllerCtx)
407407
controllerLogger = klog.LoggerWithName(controllerLogger, "ResourceSlice controller")
408408
controllerCtx = klog.NewContext(controllerCtx, controllerLogger)
409-
d.resourceSliceController = resourceslice.StartController(controllerCtx, d.kubeClient, d.driverName, owner, driverResources)
409+
var err error
410+
if d.resourceSliceController, err = resourceslice.StartController(controllerCtx, d.kubeClient, d.driverName, owner, driverResources); err != nil {
411+
return fmt.Errorf("start ResourceSlice controller: %w", err)
412+
}
410413
return nil
411414
}
412415

staging/src/k8s.io/dynamic-resource-allocation/resourceslice/resourceslicecontroller.go

Lines changed: 69 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import (
2020
"context"
2121
"errors"
2222
"fmt"
23+
"sort"
2324
"sync"
2425
"time"
2526

@@ -44,6 +45,10 @@ const (
4445
// resyncPeriod for informer
4546
// TODO (https://github.com/kubernetes/kubernetes/issues/123688): disable?
4647
resyncPeriod = time.Duration(10 * time.Minute)
48+
49+
// poolNameIndex is the name for the ResourceSlice store's index function,
50+
// which is to index by ResourceSlice.Spec.Pool.Name
51+
poolNameIndex = "poolName"
4752
)
4853

4954
// Controller synchronizes information about resources of one driver with
@@ -58,7 +63,7 @@ type Controller struct {
5863
wg sync.WaitGroup
5964
// The queue is keyed with the pool name that needs work.
6065
queue workqueue.TypedRateLimitingInterface[string]
61-
sliceStore cache.Store
66+
sliceStore cache.Indexer
6267

6368
mutex sync.RWMutex
6469

@@ -109,24 +114,11 @@ type Owner struct {
109114
// the controller is inactive. This can happen when kubelet is run stand-alone
110115
// without an apiserver. In that case we can't and don't need to publish
111116
// ResourceSlices.
112-
func StartController(ctx context.Context, kubeClient kubernetes.Interface, driver string, owner Owner, resources *DriverResources) *Controller {
113-
if kubeClient == nil {
114-
return nil
115-
}
116-
117+
func StartController(ctx context.Context, kubeClient kubernetes.Interface, driver string, owner Owner, resources *DriverResources) (*Controller, error) {
117118
logger := klog.FromContext(ctx)
118-
ctx, cancel := context.WithCancelCause(ctx)
119-
120-
c := &Controller{
121-
cancel: cancel,
122-
kubeClient: kubeClient,
123-
driver: driver,
124-
owner: owner,
125-
queue: workqueue.NewTypedRateLimitingQueueWithConfig(
126-
workqueue.DefaultTypedControllerRateLimiter[string](),
127-
workqueue.TypedRateLimitingQueueConfig[string]{Name: "node_resource_slices"},
128-
),
129-
resources: resources,
119+
c, err := newController(ctx, kubeClient, driver, owner, resources)
120+
if err != nil {
121+
return nil, fmt.Errorf("create controller: %w", err)
130122
}
131123

132124
logger.V(3).Info("Starting")
@@ -142,7 +134,7 @@ func StartController(ctx context.Context, kubeClient kubernetes.Interface, drive
142134
c.queue.Add(poolName)
143135
}
144136

145-
return c
137+
return c, nil
146138
}
147139

148140
// Stop cancels all background activity and blocks until the controller has stopped.
@@ -175,20 +167,53 @@ func (c *Controller) Update(resources *DriverResources) {
175167
}
176168
}
177169

178-
// run is running in the background. It handles blocking initialization (like
179-
// syncing the informer) and then syncs the actual with the desired state.
180-
func (c *Controller) run(ctx context.Context) {
170+
// newController creates a new controller.
171+
func newController(ctx context.Context, kubeClient kubernetes.Interface, driver string, owner Owner, resources *DriverResources) (*Controller, error) {
172+
if kubeClient == nil {
173+
return nil, fmt.Errorf("kubeClient is nil")
174+
}
175+
176+
ctx, cancel := context.WithCancelCause(ctx)
177+
178+
c := &Controller{
179+
cancel: cancel,
180+
kubeClient: kubeClient,
181+
driver: driver,
182+
owner: owner,
183+
queue: workqueue.NewTypedRateLimitingQueueWithConfig(
184+
workqueue.DefaultTypedControllerRateLimiter[string](),
185+
workqueue.TypedRateLimitingQueueConfig[string]{Name: "node_resource_slices"},
186+
),
187+
resources: resources,
188+
}
189+
190+
if err := c.initInformer(ctx); err != nil {
191+
return nil, err
192+
}
193+
return c, nil
194+
}
195+
196+
// initInformer initializes the informer used to watch for changes to the resources slice.
197+
func (c *Controller) initInformer(ctx context.Context) error {
181198
logger := klog.FromContext(ctx)
182199

183200
// We always filter by driver name, by node name only for node-local resources.
184201
selector := fields.Set{resourceapi.ResourceSliceSelectorDriver: c.driver}
185202
if c.owner.APIVersion == "v1" && c.owner.Kind == "Node" {
186203
selector[resourceapi.ResourceSliceSelectorNodeName] = c.owner.Name
187204
}
188-
informer := resourceinformers.NewFilteredResourceSliceInformer(c.kubeClient, resyncPeriod, nil, func(options *metav1.ListOptions) {
205+
informer := resourceinformers.NewFilteredResourceSliceInformer(c.kubeClient, resyncPeriod, cache.Indexers{
206+
poolNameIndex: func(obj interface{}) ([]string, error) {
207+
slice, ok := obj.(*resourceapi.ResourceSlice)
208+
if !ok {
209+
return []string{}, nil
210+
}
211+
return []string{slice.Spec.Pool.Name}, nil
212+
},
213+
}, func(options *metav1.ListOptions) {
189214
options.FieldSelector = selector.String()
190215
})
191-
c.sliceStore = informer.GetStore()
216+
c.sliceStore = informer.GetIndexer()
192217
handler, err := informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
193218
AddFunc: func(obj any) {
194219
slice, ok := obj.(*resourceapi.ResourceSlice)
@@ -228,10 +253,8 @@ func (c *Controller) run(ctx context.Context) {
228253
},
229254
})
230255
if err != nil {
231-
logger.Error(err, "Registering event handler on the ResourceSlice informer failed, disabling resource monitoring")
232-
return
256+
return fmt.Errorf("registering event handler on the ResourceSlice informer: %w", err)
233257
}
234-
235258
// Start informer and wait for our cache to be populated.
236259
logger.V(3).Info("Starting ResourceSlice informer and waiting for it to sync")
237260
c.wg.Add(1)
@@ -245,13 +268,15 @@ func (c *Controller) run(ctx context.Context) {
245268
select {
246269
case <-time.After(time.Second):
247270
case <-ctx.Done():
248-
return
271+
return fmt.Errorf("sync ResourceSlice informer: %w", context.Cause(ctx))
249272
}
250273
}
251274
logger.V(3).Info("ResourceSlice informer has synced")
275+
return nil
276+
}
252277

253-
// Seed the
254-
278+
// run is running in the background.
279+
func (c *Controller) run(ctx context.Context) {
255280
for c.processNextWorkItem(ctx) {
256281
}
257282
}
@@ -295,10 +320,13 @@ func (c *Controller) syncPool(ctx context.Context, poolName string) error {
295320
logger := klog.FromContext(ctx)
296321

297322
// Gather information about the actual and desired state.
298-
// TODO: index by pool name.
299323
var slices []*resourceapi.ResourceSlice
300-
for _, obj := range c.sliceStore.List() {
301-
if slice, ok := obj.(*resourceapi.ResourceSlice); ok && slice.Spec.Pool.Name == poolName {
324+
objs, err := c.sliceStore.ByIndex(poolNameIndex, poolName)
325+
if err != nil {
326+
return fmt.Errorf("retrieve ResourceSlice objects: %w", err)
327+
}
328+
for _, obj := range objs {
329+
if slice, ok := obj.(*resourceapi.ResourceSlice); ok {
302330
slices = append(slices, slice)
303331
}
304332
}
@@ -346,6 +374,11 @@ func (c *Controller) syncPool(ctx context.Context, poolName string) error {
346374
}
347375
slices = currentSlices
348376

377+
// Sort by name to ensure that keeping only the first slice is deterministic.
378+
sort.Slice(slices, func(i, j int) bool {
379+
return slices[i].Name < slices[j].Name
380+
})
381+
349382
if pool, ok := resources.Pools[poolName]; ok {
350383
if pool.Generation > generation {
351384
generation = pool.Generation
@@ -397,6 +430,8 @@ func (c *Controller) syncPool(ctx context.Context, poolName string) error {
397430

398431
logger.V(5).Info("Removing resource slices after pool removal", "obsoleteSlices", klog.KObjSlice(obsoleteSlices), "slices", klog.KObjSlice(slices), "numDevices", len(pool.Devices))
399432
obsoleteSlices = append(obsoleteSlices, slices...)
433+
// No need to create or update the slices.
434+
slices = nil
400435
}
401436

402437
// Remove stale slices.
@@ -420,7 +455,7 @@ func (c *Controller) syncPool(ctx context.Context, poolName string) error {
420455
// TODO: switch to SSA once unit testing supports it.
421456
logger.V(5).Info("Updating existing resource slice", "slice", klog.KObj(slice))
422457
if _, err := c.kubeClient.ResourceV1alpha3().ResourceSlices().Update(ctx, slice, metav1.UpdateOptions{}); err != nil {
423-
return fmt.Errorf("delete resource slice: %w", err)
458+
return fmt.Errorf("update resource slice: %w", err)
424459
}
425460
}
426461

0 commit comments

Comments
 (0)