@@ -51,6 +51,13 @@ const (
51
51
// poolNameIndex is the name for the ResourceSlice store's index function,
52
52
// which is to index by ResourceSlice.Spec.Pool.Name
53
53
poolNameIndex = "poolName"
54
+
55
+ // Including adds in the mutation cache is not safe: We could add a slice, store it,
56
+ // and then the slice gets deleted without the informer hearing anything about that.
57
+ // Then the obsolete slice remains in the mutation cache.
58
+ //
59
+ // To mitigate this, we use a TTL and check a pool again once added slices expire.
60
+ defaultMutationCacheTTL = time .Minute
54
61
)
55
62
56
63
// Controller synchronizes information about resources of one driver with
@@ -64,8 +71,9 @@ type Controller struct {
64
71
kubeClient kubernetes.Interface
65
72
wg sync.WaitGroup
66
73
// The queue is keyed with the pool name that needs work.
67
- queue workqueue.TypedRateLimitingInterface [string ]
68
- sliceStore cache.Indexer
74
+ queue workqueue.TypedRateLimitingInterface [string ]
75
+ sliceStore cache.MutationCache
76
+ mutationCacheTTL time.Duration
69
77
70
78
// Must use atomic access...
71
79
numCreates int64
@@ -182,6 +190,10 @@ type Options struct {
182
190
183
191
// Queue can be used to override the default work queue implementation.
184
192
Queue workqueue.TypedRateLimitingInterface [string ]
193
+
194
+ // MutationCacheTTL can be used to change the default TTL of one minute.
195
+ // See source code for details.
196
+ MutationCacheTTL * time.Duration
185
197
}
186
198
187
199
// Stop cancels all background activity and blocks until the controller has stopped.
@@ -249,20 +261,23 @@ func newController(ctx context.Context, options Options) (*Controller, error) {
249
261
ctx , cancel := context .WithCancelCause (ctx )
250
262
251
263
c := & Controller {
252
- cancel : cancel ,
253
- kubeClient : options .KubeClient ,
254
- driverName : options .DriverName ,
255
- owner : options .Owner .DeepCopy (),
256
- queue : options .Queue ,
257
- resources : options .Resources ,
264
+ cancel : cancel ,
265
+ kubeClient : options .KubeClient ,
266
+ driverName : options .DriverName ,
267
+ owner : options .Owner .DeepCopy (),
268
+ queue : options .Queue ,
269
+ resources : options .Resources ,
270
+ mutationCacheTTL : defaultMutationCacheTTL ,
258
271
}
259
272
if c .queue == nil {
260
273
c .queue = workqueue .NewTypedRateLimitingQueueWithConfig (
261
274
workqueue .DefaultTypedControllerRateLimiter [string ](),
262
275
workqueue.TypedRateLimitingQueueConfig [string ]{Name : "node_resource_slices" },
263
276
)
264
277
}
265
-
278
+ if options .MutationCacheTTL != nil {
279
+ c .mutationCacheTTL = * options .MutationCacheTTL
280
+ }
266
281
if err := c .initInformer (ctx ); err != nil {
267
282
return nil , err
268
283
}
@@ -298,7 +313,7 @@ func (c *Controller) initInformer(ctx context.Context) error {
298
313
}, func (options * metav1.ListOptions ) {
299
314
options .FieldSelector = selector .String ()
300
315
})
301
- c .sliceStore = informer .GetIndexer ()
316
+ c .sliceStore = cache . NewIntegerResourceVersionMutationCache ( informer .GetStore (), informer . GetIndexer (), c . mutationCacheTTL , true /* includeAdds */ )
302
317
handler , err := informer .AddEventHandler (cache.ResourceEventHandlerFuncs {
303
318
AddFunc : func (obj any ) {
304
319
slice , ok := obj .(* resourceapi.ResourceSlice )
@@ -562,13 +577,16 @@ func (c *Controller) syncPool(ctx context.Context, poolName string) error {
562
577
slice .Spec .Devices = pool .Slices [i ].Devices
563
578
564
579
logger .V (5 ).Info ("Updating existing resource slice" , "slice" , klog .KObj (slice ))
565
- if _ , err := c .kubeClient .ResourceV1alpha3 ().ResourceSlices ().Update (ctx , slice , metav1.UpdateOptions {}); err != nil {
580
+ slice , err := c .kubeClient .ResourceV1alpha3 ().ResourceSlices ().Update (ctx , slice , metav1.UpdateOptions {})
581
+ if err != nil {
566
582
return fmt .Errorf ("update resource slice: %w" , err )
567
583
}
568
584
atomic .AddInt64 (& c .numUpdates , 1 )
585
+ c .sliceStore .Mutation (slice )
569
586
}
570
587
571
588
// Create new slices.
589
+ added := false
572
590
for i := 0 ; i < len (pool .Slices ); i ++ {
573
591
if _ , ok := currentSliceForDesiredSlice [i ]; ok {
574
592
// Was handled above through an update.
@@ -608,21 +626,25 @@ func (c *Controller) syncPool(ctx context.Context, poolName string) error {
608
626
// It can happen that we create a missing slice, some
609
627
// other change than the create causes another sync of
610
628
// the pool, and then a second slice for the same set
611
- // of devices gets created because the controller has
629
+ // of devices would get created because the controller has
612
630
// no copy of the first slice instance in its informer
613
631
// cache yet.
614
632
//
615
- // This is not a problem: a client will either see one
616
- // of the two slices and use it or see both and do
617
- // nothing because of the duplicated device IDs.
618
- //
619
- // To avoid creating a second slice, we would have to use a
620
- // https://pkg.go.dev/k8s.io/client-go/tools/cache#MutationCache.
633
+ // Using a https://pkg.go.dev/k8s.io/client-go/tools/cache#MutationCache
634
+ // avoids that.
621
635
logger .V (5 ).Info ("Creating new resource slice" )
622
- if _ , err := c .kubeClient .ResourceV1alpha3 ().ResourceSlices ().Create (ctx , slice , metav1.CreateOptions {}); err != nil {
636
+ slice , err := c .kubeClient .ResourceV1alpha3 ().ResourceSlices ().Create (ctx , slice , metav1.CreateOptions {})
637
+ if err != nil {
623
638
return fmt .Errorf ("create resource slice: %w" , err )
624
639
}
625
640
atomic .AddInt64 (& c .numCreates , 1 )
641
+ c .sliceStore .Mutation (slice )
642
+ added = true
643
+ }
644
+ if added {
645
+ // Check that the recently added slice(s) really exist even
646
+ // after they expired from the mutation cache.
647
+ c .queue .AddAfter (poolName , c .mutationCacheTTL )
626
648
}
627
649
} else if len (slices ) > 0 {
628
650
// All are obsolete, pool does not exist anymore.
0 commit comments