@@ -50,10 +50,10 @@ type csiSnapshotCommonController struct {
5050 clientset clientset.Interface
5151 client kubernetes.Interface
5252 eventRecorder record.EventRecorder
53- snapshotQueue workqueue.RateLimitingInterface
54- contentQueue workqueue.RateLimitingInterface
55- groupSnapshotQueue workqueue.RateLimitingInterface
56- groupSnapshotContentQueue workqueue.RateLimitingInterface
53+ snapshotQueue workqueue.TypedRateLimitingInterface [ string ]
54+ contentQueue workqueue.TypedRateLimitingInterface [ string ]
55+ groupSnapshotQueue workqueue.TypedRateLimitingInterface [ string ]
56+ groupSnapshotContentQueue workqueue.TypedRateLimitingInterface [ string ]
5757
5858 snapshotLister snapshotlisters.VolumeSnapshotLister
5959 snapshotListerSynced cache.InformerSynced
@@ -106,10 +106,10 @@ func NewCSISnapshotCommonController(
106106 nodeInformer coreinformers.NodeInformer ,
107107 metricsManager metrics.MetricsManager ,
108108 resyncPeriod time.Duration ,
109- snapshotRateLimiter workqueue.RateLimiter ,
110- contentRateLimiter workqueue.RateLimiter ,
111- groupSnapshotRateLimiter workqueue.RateLimiter ,
112- groupSnapshotContentRateLimiter workqueue.RateLimiter ,
109+ snapshotRateLimiter workqueue.TypedRateLimiter [ string ] ,
110+ contentRateLimiter workqueue.TypedRateLimiter [ string ] ,
111+ groupSnapshotRateLimiter workqueue.TypedRateLimiter [ string ] ,
112+ groupSnapshotContentRateLimiter workqueue.TypedRateLimiter [ string ] ,
113113 enableDistributedSnapshotting bool ,
114114 preventVolumeModeConversion bool ,
115115 enableVolumeGroupSnapshots bool ,
@@ -121,14 +121,18 @@ func NewCSISnapshotCommonController(
121121 eventRecorder = broadcaster .NewRecorder (scheme .Scheme , v1.EventSource {Component : fmt .Sprintf ("snapshot-controller" )})
122122
123123 ctrl := & csiSnapshotCommonController {
124- clientset : clientset ,
125- client : client ,
126- eventRecorder : eventRecorder ,
127- resyncPeriod : resyncPeriod ,
128- snapshotStore : cache .NewStore (cache .DeletionHandlingMetaNamespaceKeyFunc ),
129- contentStore : cache .NewStore (cache .DeletionHandlingMetaNamespaceKeyFunc ),
130- snapshotQueue : workqueue .NewNamedRateLimitingQueue (snapshotRateLimiter , "snapshot-controller-snapshot" ),
131- contentQueue : workqueue .NewNamedRateLimitingQueue (contentRateLimiter , "snapshot-controller-content" ),
124+ clientset : clientset ,
125+ client : client ,
126+ eventRecorder : eventRecorder ,
127+ resyncPeriod : resyncPeriod ,
128+ snapshotStore : cache .NewStore (cache .DeletionHandlingMetaNamespaceKeyFunc ),
129+ contentStore : cache .NewStore (cache .DeletionHandlingMetaNamespaceKeyFunc ),
130+ snapshotQueue : workqueue .NewTypedRateLimitingQueueWithConfig (snapshotRateLimiter ,
131+ workqueue.TypedRateLimitingQueueConfig [string ]{
132+ Name : "snapshot-controller-snapshot" }),
133+ contentQueue : workqueue .NewTypedRateLimitingQueueWithConfig (contentRateLimiter ,
134+ workqueue.TypedRateLimitingQueueConfig [string ]{
135+ Name : "snapshot-controller-content" }),
132136 metricsManager : metricsManager ,
133137 }
134138
@@ -203,8 +207,12 @@ func NewCSISnapshotCommonController(
203207 ctrl .groupSnapshotStore = cache .NewStore (cache .DeletionHandlingMetaNamespaceKeyFunc )
204208 ctrl .groupSnapshotContentStore = cache .NewStore (cache .DeletionHandlingMetaNamespaceKeyFunc )
205209
206- ctrl .groupSnapshotQueue = workqueue .NewNamedRateLimitingQueue (groupSnapshotRateLimiter , "snapshot-controller-group-snapshot" )
207- ctrl .groupSnapshotContentQueue = workqueue .NewNamedRateLimitingQueue (groupSnapshotContentRateLimiter , "snapshot-controller-group-content" )
210+ ctrl .groupSnapshotQueue = workqueue .NewTypedRateLimitingQueueWithConfig (
211+ groupSnapshotRateLimiter , workqueue.TypedRateLimitingQueueConfig [string ]{
212+ Name : "snapshot-controller-group-snapshot" })
213+ ctrl .groupSnapshotContentQueue = workqueue .NewTypedRateLimitingQueueWithConfig (
214+ groupSnapshotContentRateLimiter , workqueue.TypedRateLimitingQueueConfig [string ]{
215+ Name : "snapshot-controller-group-content" })
208216
209217 volumeGroupSnapshotInformer .Informer ().AddEventHandlerWithResyncPeriod (
210218 cache.ResourceEventHandlerFuncs {
@@ -316,21 +324,21 @@ func (ctrl *csiSnapshotCommonController) enqueueContentWork(obj interface{}) {
316324
317325// snapshotWorker is the main worker for VolumeSnapshots.
318326func (ctrl * csiSnapshotCommonController ) snapshotWorker () {
319- keyObj , quit := ctrl .snapshotQueue .Get ()
327+ key , quit := ctrl .snapshotQueue .Get ()
320328 if quit {
321329 return
322330 }
323- defer ctrl .snapshotQueue .Done (keyObj )
331+ defer ctrl .snapshotQueue .Done (key )
324332
325- if err := ctrl .syncSnapshotByKey (keyObj .( string ) ); err != nil {
333+ if err := ctrl .syncSnapshotByKey (key ); err != nil {
326334 // Rather than wait for a full resync, re-add the key to the
327335 // queue to be processed.
328- ctrl .snapshotQueue .AddRateLimited (keyObj )
329- klog .V (4 ).Infof ("Failed to sync snapshot %q, will retry again: %v" , keyObj .( string ) , err )
336+ ctrl .snapshotQueue .AddRateLimited (key )
337+ klog .V (4 ).Infof ("Failed to sync snapshot %q, will retry again: %v" , key , err )
330338 } else {
331339 // Finally, if no error occurs we Forget this item so it does not
332340 // get queued again until another change happens.
333- ctrl .snapshotQueue .Forget (keyObj )
341+ ctrl .snapshotQueue .Forget (key )
334342 }
335343}
336344
@@ -392,21 +400,21 @@ func (ctrl *csiSnapshotCommonController) syncSnapshotByKey(key string) error {
392400
393401// contentWorker is the main worker for VolumeSnapshotContent.
394402func (ctrl * csiSnapshotCommonController ) contentWorker () {
395- keyObj , quit := ctrl .contentQueue .Get ()
403+ key , quit := ctrl .contentQueue .Get ()
396404 if quit {
397405 return
398406 }
399- defer ctrl .contentQueue .Done (keyObj )
407+ defer ctrl .contentQueue .Done (key )
400408
401- if err := ctrl .syncContentByKey (keyObj .( string ) ); err != nil {
409+ if err := ctrl .syncContentByKey (key ); err != nil {
402410 // Rather than wait for a full resync, re-add the key to the
403411 // queue to be processed.
404- ctrl .contentQueue .AddRateLimited (keyObj )
405- klog .V (4 ).Infof ("Failed to sync content %q, will retry again: %v" , keyObj .( string ) , err )
412+ ctrl .contentQueue .AddRateLimited (key )
413+ klog .V (4 ).Infof ("Failed to sync content %q, will retry again: %v" , key , err )
406414 } else {
407415 // Finally, if no error occurs we Forget this item so it does not
408416 // get queued again until another change happens.
409- ctrl .contentQueue .Forget (keyObj )
417+ ctrl .contentQueue .Forget (key )
410418 }
411419}
412420
@@ -680,41 +688,41 @@ func (ctrl *csiSnapshotCommonController) enqueueGroupSnapshotContentWork(obj int
680688
681689// groupSnapshotWorker is the main worker for VolumeGroupSnapshots.
682690func (ctrl * csiSnapshotCommonController ) groupSnapshotWorker () {
683- keyObj , quit := ctrl .groupSnapshotQueue .Get ()
691+ key , quit := ctrl .groupSnapshotQueue .Get ()
684692 if quit {
685693 return
686694 }
687- defer ctrl .groupSnapshotQueue .Done (keyObj )
695+ defer ctrl .groupSnapshotQueue .Done (key )
688696
689- if err := ctrl .syncGroupSnapshotByKey (context .Background (), keyObj .( string ) ); err != nil {
697+ if err := ctrl .syncGroupSnapshotByKey (context .Background (), key ); err != nil {
690698 // Rather than wait for a full resync, re-add the key to the
691699 // queue to be processed.
692- ctrl .groupSnapshotQueue .AddRateLimited (keyObj )
693- klog .V (4 ).Infof ("Failed to sync group snapshot %q, will retry again: %v" , keyObj .( string ) , err )
700+ ctrl .groupSnapshotQueue .AddRateLimited (key )
701+ klog .V (4 ).Infof ("Failed to sync group snapshot %q, will retry again: %v" , key , err )
694702 } else {
695703 // Finally, if no error occurs we forget this item so it does not
696704 // get queued again until another change happens.
697- ctrl .groupSnapshotQueue .Forget (keyObj )
705+ ctrl .groupSnapshotQueue .Forget (key )
698706 }
699707}
700708
701709// groupSnapshotContentWorker is the main worker for VolumeGroupSnapshotContent.
702710func (ctrl * csiSnapshotCommonController ) groupSnapshotContentWorker () {
703- keyObj , quit := ctrl .groupSnapshotContentQueue .Get ()
711+ key , quit := ctrl .groupSnapshotContentQueue .Get ()
704712 if quit {
705713 return
706714 }
707- defer ctrl .groupSnapshotContentQueue .Done (keyObj )
715+ defer ctrl .groupSnapshotContentQueue .Done (key )
708716
709- if err := ctrl .syncGroupSnapshotContentByKey (keyObj .( string ) ); err != nil {
717+ if err := ctrl .syncGroupSnapshotContentByKey (key ); err != nil {
710718 // Rather than wait for a full resync, re-add the key to the
711719 // queue to be processed.
712- ctrl .groupSnapshotContentQueue .AddRateLimited (keyObj )
713- klog .V (4 ).Infof ("Failed to sync content %q, will retry again: %v" , keyObj .( string ) , err )
720+ ctrl .groupSnapshotContentQueue .AddRateLimited (key )
721+ klog .V (4 ).Infof ("Failed to sync content %q, will retry again: %v" , key , err )
714722 } else {
715723 // Finally, if no error occurs we Forget this item so it does not
716724 // get queued again until another change happens.
717- ctrl .groupSnapshotContentQueue .Forget (keyObj )
725+ ctrl .groupSnapshotContentQueue .Forget (key )
718726 }
719727}
720728
0 commit comments