@@ -79,7 +79,7 @@ type Controller struct {
79
79
csiController CSICapacityClient
80
80
driverName string
81
81
clientFactory CSIStorageCapacityFactory
82
- queue workqueue.RateLimitingInterface
82
+ queue workqueue.TypedRateLimitingInterface [ QueueKey ]
83
83
owner * metav1.OwnerReference
84
84
managedByID string
85
85
ownerNamespace string
@@ -95,29 +95,28 @@ type Controller struct {
95
95
// have a non-nil pointer. Those get added and updated
96
96
// exclusively through the informer event handler to avoid
97
97
// races.
98
- capacities map [workItem ]* storagev1.CSIStorageCapacity
98
+ capacities map [WorkItem ]* storagev1.CSIStorageCapacity
99
99
capacitiesLock sync.Mutex
100
100
}
101
101
102
- type workItem struct {
102
+ type WorkItem struct {
103
103
segment * topology.Segment
104
104
storageClassName string
105
105
}
106
106
107
- func (w workItem ) equals (capacity * storagev1.CSIStorageCapacity ) bool {
107
+ func (w WorkItem ) equals (capacity * storagev1.CSIStorageCapacity ) bool {
108
108
return w .storageClassName == capacity .StorageClassName &&
109
109
reflect .DeepEqual (w .segment .GetLabelSelector (), capacity .NodeTopology )
110
110
}
111
111
112
- var (
113
- // Defines parameters for ExponentialBackoff used while starting up
114
- // and listing CSIStorageCapacity objects.
115
- listCSIStorageCapacityBackoff = wait.Backoff {
116
- Duration : time .Second * 5 ,
117
- Factor : 1.1 ,
118
- Steps : 10 ,
119
- }
112
+ type QueueKey struct {
113
+ // item is for sync/create/update of a CSIStorageCapacity object.
114
+ item * WorkItem
115
+ // capacity is for deleting an obsolete CSIStorageCapacity object.
116
+ capacity * storagev1.CSIStorageCapacity
117
+ }
120
118
119
+ var (
121
120
objectsGoalDesc = metrics .NewDesc (
122
121
"csistoragecapacities_desired_goal" ,
123
122
"Number of CSIStorageCapacity objects that are supposed to be managed automatically." ,
@@ -165,7 +164,7 @@ func NewCentralCapacityController(
165
164
csiController CSICapacityClient ,
166
165
driverName string ,
167
166
clientFactory CSIStorageCapacityFactory ,
168
- queue workqueue.RateLimitingInterface ,
167
+ queue workqueue.TypedRateLimitingInterface [ QueueKey ] ,
169
168
owner * metav1.OwnerReference ,
170
169
managedByID string ,
171
170
ownerNamespace string ,
@@ -190,29 +189,29 @@ func NewCentralCapacityController(
190
189
pollPeriod : pollPeriod ,
191
190
immediateBinding : immediateBinding ,
192
191
timeout : timeout ,
193
- capacities : map [workItem ]* storagev1.CSIStorageCapacity {},
192
+ capacities : map [WorkItem ]* storagev1.CSIStorageCapacity {},
194
193
}
195
194
196
195
// Now register for changes. Depending on the implementation of the informers,
197
196
// this may already invoke callbacks.
198
197
handler := cache.ResourceEventHandlerFuncs {
199
- AddFunc : func (obj interface {} ) {
198
+ AddFunc : func (obj any ) {
200
199
sc , ok := obj .(* storagev1.StorageClass )
201
200
if ! ok {
202
201
klog .Errorf ("added object: expected StorageClass, got %T -> ignoring it" , obj )
203
202
return
204
203
}
205
204
c .onSCAddOrUpdate (sc )
206
205
},
207
- UpdateFunc : func (_ interface {} , newObj interface {} ) {
206
+ UpdateFunc : func (_ any , newObj any ) {
208
207
sc , ok := newObj .(* storagev1.StorageClass )
209
208
if ! ok {
210
209
klog .Errorf ("updated object: expected StorageClass, got %T -> ignoring it" , newObj )
211
210
return
212
211
}
213
212
c .onSCAddOrUpdate (sc )
214
213
},
215
- DeleteFunc : func (obj interface {} ) {
214
+ DeleteFunc : func (obj any ) {
216
215
// Beware of "xxx deleted" events
217
216
if unknown , ok := obj .(cache.DeletedFinalStateUnknown ); ok && unknown .Obj != nil {
218
217
obj = unknown .Obj
@@ -244,7 +243,7 @@ func (c *Controller) Run(ctx context.Context, threadiness int) {
244
243
defer c .queue .ShutDown ()
245
244
246
245
c .prepare (ctx )
247
- for i := 0 ; i < threadiness ; i ++ {
246
+ for range threadiness {
248
247
go wait .UntilWithContext (ctx , func (ctx context.Context ) {
249
248
c .runWorker (ctx )
250
249
}, time .Second )
@@ -289,23 +288,23 @@ func (c *Controller) prepare(ctx context.Context) {
289
288
// for all objects immediately when adding it.
290
289
klog .V (3 ).Info ("Checking for existing CSIStorageCapacity objects" )
291
290
handler := cache.ResourceEventHandlerFuncs {
292
- AddFunc : func (obj interface {} ) {
291
+ AddFunc : func (obj any ) {
293
292
csc , ok := obj .(* storagev1.CSIStorageCapacity )
294
293
if ! ok {
295
294
klog .Errorf ("added object: expected CSIStorageCapacity, got %T -> ignoring it" , obj )
296
295
return
297
296
}
298
297
c .onCAddOrUpdate (ctx , csc )
299
298
},
300
- UpdateFunc : func (_ interface {} , newObj interface {} ) {
299
+ UpdateFunc : func (_ any , newObj any ) {
301
300
csc , ok := newObj .(* storagev1.CSIStorageCapacity )
302
301
if ! ok {
303
302
klog .Errorf ("updated object: expected CSIStorageCapacity, got %T -> ignoring it" , newObj )
304
303
return
305
304
}
306
305
c .onCAddOrUpdate (ctx , csc )
307
306
},
308
- DeleteFunc : func (obj interface {} ) {
307
+ DeleteFunc : func (obj any ) {
309
308
// Beware of "xxx deleted" events
310
309
if unknown , ok := obj .(cache.DeletedFinalStateUnknown ); ok && unknown .Obj != nil {
311
310
obj = unknown .Obj
@@ -422,7 +421,7 @@ func (c *Controller) refreshTopology(nodeAffinity v1.VolumeNodeAffinity) {
422
421
for item := range c .capacities {
423
422
if item .segment .Compare (segment ) == 0 {
424
423
klog .V (5 ).Infof ("Capacity Controller: skipping refresh: enqueuing %+v because of the topology" , item )
425
- c .queue .Add (item )
424
+ c .queue .Add (QueueKey { item : & item } )
426
425
}
427
426
}
428
427
}
@@ -457,15 +456,15 @@ func (c *Controller) refreshSC(storageClassName string) {
457
456
for item := range c .capacities {
458
457
if item .storageClassName == storageClassName {
459
458
klog .V (5 ).Infof ("Capacity Controller: enqueuing %+v because of the storage class" , item )
460
- c .queue .Add (item )
459
+ c .queue .Add (QueueKey { item : & item } )
461
460
}
462
461
}
463
462
}
464
463
465
464
// addWorkItem ensures that there is an item in c.capacities. It
466
465
// must be called while holding c.capacitiesLock!
467
466
func (c * Controller ) addWorkItem (segment * topology.Segment , sc * storagev1.StorageClass ) {
468
- item := workItem {
467
+ item := WorkItem {
469
468
segment : segment ,
470
469
storageClassName : sc .Name ,
471
470
}
@@ -478,13 +477,13 @@ func (c *Controller) addWorkItem(segment *topology.Segment, sc *storagev1.Storag
478
477
// ... and then tell our workers to update
479
478
// or create that capacity object.
480
479
klog .V (5 ).Infof ("Capacity Controller: enqueuing %+v" , item )
481
- c .queue .Add (item )
480
+ c .queue .Add (QueueKey { item : & item } )
482
481
}
483
482
484
483
// removeWorkItem ensures that the item gets removed from c.capacities. It
485
484
// must be called while holding c.capacitiesLock!
486
485
func (c * Controller ) removeWorkItem (segment * topology.Segment , sc * storagev1.StorageClass ) {
487
- item := workItem {
486
+ item := WorkItem {
488
487
segment : segment ,
489
488
storageClassName : sc .Name ,
490
489
}
@@ -506,7 +505,7 @@ func (c *Controller) removeWorkItem(segment *topology.Segment, sc *storagev1.Sto
506
505
507
506
// Any capacity object in the queue will be deleted.
508
507
klog .V (5 ).Infof ("Capacity Controller: enqueuing CSIStorageCapacity %s for removal" , capacity .Name )
509
- c .queue .Add (capacity )
508
+ c .queue .Add (QueueKey { capacity : capacity } )
510
509
}
511
510
512
511
// pollCapacities must be called periodically to detect when the underlying storage capacity has changed.
@@ -516,7 +515,7 @@ func (c *Controller) pollCapacities() {
516
515
517
516
for item := range c .capacities {
518
517
klog .V (5 ).Infof ("Capacity Controller: enqueuing %+v for periodic update" , item )
519
- c .queue .Add (item )
518
+ c .queue .Add (QueueKey { item : & item } )
520
519
}
521
520
}
522
521
@@ -535,11 +534,11 @@ func (c *Controller) processNextWorkItem(ctx context.Context) bool {
535
534
err := func () error {
536
535
defer c .queue .Done (obj )
537
536
538
- switch obj := obj .( type ) {
539
- case workItem :
540
- return c .syncCapacity (ctx , obj )
541
- case * storagev1. CSIStorageCapacity :
542
- return c .deleteCapacity (ctx , obj )
537
+ switch {
538
+ case obj . item != nil :
539
+ return c .syncCapacity (ctx , * obj . item )
540
+ case obj . capacity != nil :
541
+ return c .deleteCapacity (ctx , obj . capacity )
543
542
default :
544
543
klog .Warningf ("unexpected work item %#v" , obj )
545
544
}
@@ -559,7 +558,7 @@ func (c *Controller) processNextWorkItem(ctx context.Context) bool {
559
558
}
560
559
561
560
// syncCapacity gets the capacity and then updates or creates the object.
562
- func (c * Controller ) syncCapacity (ctx context.Context , item workItem ) error {
561
+ func (c * Controller ) syncCapacity (ctx context.Context , item WorkItem ) error {
563
562
// We lock only while accessing c.capacities, but not during
564
563
// the potentially long-running operations. That is okay
565
564
// because there is only a single worker per item. In the
@@ -682,7 +681,7 @@ func (c *Controller) deleteCapacity(ctx context.Context, capacity *storagev1.CSI
682
681
// and either remembers the pointer to it for future updates or
683
682
// ensures that it gets deleted if no longer needed. Foreign objects
684
683
// are ignored.
685
- func (c * Controller ) onCAddOrUpdate (ctx context.Context , capacity * storagev1.CSIStorageCapacity ) {
684
+ func (c * Controller ) onCAddOrUpdate (_ context.Context , capacity * storagev1.CSIStorageCapacity ) {
686
685
if ! c .isManaged (capacity ) {
687
686
// Not ours (anymore?). For the unlikely case that someone removed our owner reference,
688
687
// we also must remove our reference to the object.
@@ -692,7 +691,7 @@ func (c *Controller) onCAddOrUpdate(ctx context.Context, capacity *storagev1.CSI
692
691
if capacity2 != nil && capacity2 .UID == capacity .UID {
693
692
c .capacities [item ] = nil
694
693
klog .V (5 ).Infof ("Capacity Controller: CSIStorageCapacity %s owner was modified by someone, enqueueing %v for re-creation" , capacity .Name , item )
695
- c .queue .Add (item )
694
+ c .queue .Add (QueueKey { item : & item } )
696
695
}
697
696
}
698
697
return
@@ -720,10 +719,10 @@ func (c *Controller) onCAddOrUpdate(ctx context.Context, capacity *storagev1.CSI
720
719
}
721
720
// The CSIStorageCapacity object is obsolete, delete it.
722
721
klog .V (5 ).Infof ("Capacity Controller: CSIStorageCapacity %s with resource version %s is obsolete, enqueue for removal" , capacity .Name , capacity .ResourceVersion )
723
- c .queue .Add (capacity )
722
+ c .queue .Add (QueueKey { capacity : capacity } )
724
723
}
725
724
726
- func (c * Controller ) onCDelete (ctx context.Context , capacity * storagev1.CSIStorageCapacity ) {
725
+ func (c * Controller ) onCDelete (_ context.Context , capacity * storagev1.CSIStorageCapacity ) {
727
726
c .capacitiesLock .Lock ()
728
727
defer c .capacitiesLock .Unlock ()
729
728
for item , capacity2 := range c .capacities {
@@ -732,7 +731,7 @@ func (c *Controller) onCDelete(ctx context.Context, capacity *storagev1.CSIStora
732
731
// Re-create it...
733
732
klog .V (5 ).Infof ("Capacity Controller: CSIStorageCapacity %s was removed by someone, enqueue %v for re-creation" , capacity .Name , item )
734
733
c .capacities [item ] = nil
735
- c .queue .Add (item )
734
+ c .queue .Add (QueueKey { item : & item } )
736
735
return
737
736
}
738
737
}
@@ -806,7 +805,7 @@ func (c *Controller) getObjectsObsolete() int64 {
806
805
}
807
806
808
807
func (c * Controller ) isObsolete (capacity * storagev1.CSIStorageCapacity ) bool {
809
- for item , _ := range c .capacities {
808
+ for item := range c .capacities {
810
809
if item .equals (capacity ) {
811
810
return false
812
811
}
0 commit comments