@@ -50,10 +50,10 @@ type csiSnapshotCommonController struct {
50
50
clientset clientset.Interface
51
51
client kubernetes.Interface
52
52
eventRecorder record.EventRecorder
53
- snapshotQueue workqueue.RateLimitingInterface
54
- contentQueue workqueue.RateLimitingInterface
55
- groupSnapshotQueue workqueue.RateLimitingInterface
56
- groupSnapshotContentQueue workqueue.RateLimitingInterface
53
+ snapshotQueue workqueue.TypedRateLimitingInterface [ string ]
54
+ contentQueue workqueue.TypedRateLimitingInterface [ string ]
55
+ groupSnapshotQueue workqueue.TypedRateLimitingInterface [ string ]
56
+ groupSnapshotContentQueue workqueue.TypedRateLimitingInterface [ string ]
57
57
58
58
snapshotLister snapshotlisters.VolumeSnapshotLister
59
59
snapshotListerSynced cache.InformerSynced
@@ -106,10 +106,10 @@ func NewCSISnapshotCommonController(
106
106
nodeInformer coreinformers.NodeInformer ,
107
107
metricsManager metrics.MetricsManager ,
108
108
resyncPeriod time.Duration ,
109
- snapshotRateLimiter workqueue.RateLimiter ,
110
- contentRateLimiter workqueue.RateLimiter ,
111
- groupSnapshotRateLimiter workqueue.RateLimiter ,
112
- groupSnapshotContentRateLimiter workqueue.RateLimiter ,
109
+ snapshotRateLimiter workqueue.TypedRateLimiter [ string ] ,
110
+ contentRateLimiter workqueue.TypedRateLimiter [ string ] ,
111
+ groupSnapshotRateLimiter workqueue.TypedRateLimiter [ string ] ,
112
+ groupSnapshotContentRateLimiter workqueue.TypedRateLimiter [ string ] ,
113
113
enableDistributedSnapshotting bool ,
114
114
preventVolumeModeConversion bool ,
115
115
enableVolumeGroupSnapshots bool ,
@@ -121,14 +121,18 @@ func NewCSISnapshotCommonController(
121
121
eventRecorder = broadcaster .NewRecorder (scheme .Scheme , v1.EventSource {Component : fmt .Sprintf ("snapshot-controller" )})
122
122
123
123
ctrl := & csiSnapshotCommonController {
124
- clientset : clientset ,
125
- client : client ,
126
- eventRecorder : eventRecorder ,
127
- resyncPeriod : resyncPeriod ,
128
- snapshotStore : cache .NewStore (cache .DeletionHandlingMetaNamespaceKeyFunc ),
129
- contentStore : cache .NewStore (cache .DeletionHandlingMetaNamespaceKeyFunc ),
130
- snapshotQueue : workqueue .NewNamedRateLimitingQueue (snapshotRateLimiter , "snapshot-controller-snapshot" ),
131
- contentQueue : workqueue .NewNamedRateLimitingQueue (contentRateLimiter , "snapshot-controller-content" ),
124
+ clientset : clientset ,
125
+ client : client ,
126
+ eventRecorder : eventRecorder ,
127
+ resyncPeriod : resyncPeriod ,
128
+ snapshotStore : cache .NewStore (cache .DeletionHandlingMetaNamespaceKeyFunc ),
129
+ contentStore : cache .NewStore (cache .DeletionHandlingMetaNamespaceKeyFunc ),
130
+ snapshotQueue : workqueue .NewTypedRateLimitingQueueWithConfig (snapshotRateLimiter ,
131
+ workqueue.TypedRateLimitingQueueConfig [string ]{
132
+ Name : "snapshot-controller-snapshot" }),
133
+ contentQueue : workqueue .NewTypedRateLimitingQueueWithConfig (contentRateLimiter ,
134
+ workqueue.TypedRateLimitingQueueConfig [string ]{
135
+ Name : "snapshot-controller-content" }),
132
136
metricsManager : metricsManager ,
133
137
}
134
138
@@ -203,8 +207,12 @@ func NewCSISnapshotCommonController(
203
207
ctrl .groupSnapshotStore = cache .NewStore (cache .DeletionHandlingMetaNamespaceKeyFunc )
204
208
ctrl .groupSnapshotContentStore = cache .NewStore (cache .DeletionHandlingMetaNamespaceKeyFunc )
205
209
206
- ctrl .groupSnapshotQueue = workqueue .NewNamedRateLimitingQueue (groupSnapshotRateLimiter , "snapshot-controller-group-snapshot" )
207
- ctrl .groupSnapshotContentQueue = workqueue .NewNamedRateLimitingQueue (groupSnapshotContentRateLimiter , "snapshot-controller-group-content" )
210
+ ctrl .groupSnapshotQueue = workqueue .NewTypedRateLimitingQueueWithConfig (
211
+ groupSnapshotRateLimiter , workqueue.TypedRateLimitingQueueConfig [string ]{
212
+ Name : "snapshot-controller-group-snapshot" })
213
+ ctrl .groupSnapshotContentQueue = workqueue .NewTypedRateLimitingQueueWithConfig (
214
+ groupSnapshotContentRateLimiter , workqueue.TypedRateLimitingQueueConfig [string ]{
215
+ Name : "snapshot-controller-group-content" })
208
216
209
217
volumeGroupSnapshotInformer .Informer ().AddEventHandlerWithResyncPeriod (
210
218
cache.ResourceEventHandlerFuncs {
@@ -316,21 +324,21 @@ func (ctrl *csiSnapshotCommonController) enqueueContentWork(obj interface{}) {
316
324
317
325
// snapshotWorker is the main worker for VolumeSnapshots.
318
326
func (ctrl * csiSnapshotCommonController ) snapshotWorker () {
319
- keyObj , quit := ctrl .snapshotQueue .Get ()
327
+ key , quit := ctrl .snapshotQueue .Get ()
320
328
if quit {
321
329
return
322
330
}
323
- defer ctrl .snapshotQueue .Done (keyObj )
331
+ defer ctrl .snapshotQueue .Done (key )
324
332
325
- if err := ctrl .syncSnapshotByKey (keyObj .( string ) ); err != nil {
333
+ if err := ctrl .syncSnapshotByKey (key ); err != nil {
326
334
// Rather than wait for a full resync, re-add the key to the
327
335
// queue to be processed.
328
- ctrl .snapshotQueue .AddRateLimited (keyObj )
329
- klog .V (4 ).Infof ("Failed to sync snapshot %q, will retry again: %v" , keyObj .( string ) , err )
336
+ ctrl .snapshotQueue .AddRateLimited (key )
337
+ klog .V (4 ).Infof ("Failed to sync snapshot %q, will retry again: %v" , key , err )
330
338
} else {
331
339
// Finally, if no error occurs we Forget this item so it does not
332
340
// get queued again until another change happens.
333
- ctrl .snapshotQueue .Forget (keyObj )
341
+ ctrl .snapshotQueue .Forget (key )
334
342
}
335
343
}
336
344
@@ -392,21 +400,21 @@ func (ctrl *csiSnapshotCommonController) syncSnapshotByKey(key string) error {
392
400
393
401
// contentWorker is the main worker for VolumeSnapshotContent.
394
402
func (ctrl * csiSnapshotCommonController ) contentWorker () {
395
- keyObj , quit := ctrl .contentQueue .Get ()
403
+ key , quit := ctrl .contentQueue .Get ()
396
404
if quit {
397
405
return
398
406
}
399
- defer ctrl .contentQueue .Done (keyObj )
407
+ defer ctrl .contentQueue .Done (key )
400
408
401
- if err := ctrl .syncContentByKey (keyObj .( string ) ); err != nil {
409
+ if err := ctrl .syncContentByKey (key ); err != nil {
402
410
// Rather than wait for a full resync, re-add the key to the
403
411
// queue to be processed.
404
- ctrl .contentQueue .AddRateLimited (keyObj )
405
- klog .V (4 ).Infof ("Failed to sync content %q, will retry again: %v" , keyObj .( string ) , err )
412
+ ctrl .contentQueue .AddRateLimited (key )
413
+ klog .V (4 ).Infof ("Failed to sync content %q, will retry again: %v" , key , err )
406
414
} else {
407
415
// Finally, if no error occurs we Forget this item so it does not
408
416
// get queued again until another change happens.
409
- ctrl .contentQueue .Forget (keyObj )
417
+ ctrl .contentQueue .Forget (key )
410
418
}
411
419
}
412
420
@@ -680,41 +688,41 @@ func (ctrl *csiSnapshotCommonController) enqueueGroupSnapshotContentWork(obj int
680
688
681
689
// groupSnapshotWorker is the main worker for VolumeGroupSnapshots.
682
690
func (ctrl * csiSnapshotCommonController ) groupSnapshotWorker () {
683
- keyObj , quit := ctrl .groupSnapshotQueue .Get ()
691
+ key , quit := ctrl .groupSnapshotQueue .Get ()
684
692
if quit {
685
693
return
686
694
}
687
- defer ctrl .groupSnapshotQueue .Done (keyObj )
695
+ defer ctrl .groupSnapshotQueue .Done (key )
688
696
689
- if err := ctrl .syncGroupSnapshotByKey (context .Background (), keyObj .( string ) ); err != nil {
697
+ if err := ctrl .syncGroupSnapshotByKey (context .Background (), key ); err != nil {
690
698
// Rather than wait for a full resync, re-add the key to the
691
699
// queue to be processed.
692
- ctrl .groupSnapshotQueue .AddRateLimited (keyObj )
693
- klog .V (4 ).Infof ("Failed to sync group snapshot %q, will retry again: %v" , keyObj .( string ) , err )
700
+ ctrl .groupSnapshotQueue .AddRateLimited (key )
701
+ klog .V (4 ).Infof ("Failed to sync group snapshot %q, will retry again: %v" , key , err )
694
702
} else {
695
703
// Finally, if no error occurs we forget this item so it does not
696
704
// get queued again until another change happens.
697
- ctrl .groupSnapshotQueue .Forget (keyObj )
705
+ ctrl .groupSnapshotQueue .Forget (key )
698
706
}
699
707
}
700
708
701
709
// groupSnapshotContentWorker is the main worker for VolumeGroupSnapshotContent.
702
710
func (ctrl * csiSnapshotCommonController ) groupSnapshotContentWorker () {
703
- keyObj , quit := ctrl .groupSnapshotContentQueue .Get ()
711
+ key , quit := ctrl .groupSnapshotContentQueue .Get ()
704
712
if quit {
705
713
return
706
714
}
707
- defer ctrl .groupSnapshotContentQueue .Done (keyObj )
715
+ defer ctrl .groupSnapshotContentQueue .Done (key )
708
716
709
- if err := ctrl .syncGroupSnapshotContentByKey (keyObj .( string ) ); err != nil {
717
+ if err := ctrl .syncGroupSnapshotContentByKey (key ); err != nil {
710
718
// Rather than wait for a full resync, re-add the key to the
711
719
// queue to be processed.
712
- ctrl .groupSnapshotContentQueue .AddRateLimited (keyObj )
713
- klog .V (4 ).Infof ("Failed to sync content %q, will retry again: %v" , keyObj .( string ) , err )
720
+ ctrl .groupSnapshotContentQueue .AddRateLimited (key )
721
+ klog .V (4 ).Infof ("Failed to sync content %q, will retry again: %v" , key , err )
714
722
} else {
715
723
// Finally, if no error occurs we Forget this item so it does not
716
724
// get queued again until another change happens.
717
- ctrl .groupSnapshotContentQueue .Forget (keyObj )
725
+ ctrl .groupSnapshotContentQueue .Forget (key )
718
726
}
719
727
}
720
728
0 commit comments