Skip to content

Commit 3cc3131

Browse files
authored
Merge pull request #1229 from Madhu-1/replace-RateLimitingInterface
replace deprecated Queue and RateLimiter
2 parents a0cef53 + 7b6db9d commit 3cc3131

File tree

7 files changed

+91
-79
lines changed

7 files changed

+91
-79
lines changed

cmd/csi-snapshotter/main.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -274,11 +274,11 @@ func main() {
274274
*groupSnapshotNamePrefix,
275275
*groupSnapshotNameUUIDLength,
276276
*extraCreateMetadata,
277-
workqueue.NewItemExponentialFailureRateLimiter(*retryIntervalStart, *retryIntervalMax),
277+
workqueue.NewTypedItemExponentialFailureRateLimiter[string](*retryIntervalStart, *retryIntervalMax),
278278
utilfeature.DefaultFeatureGate.Enabled(features.VolumeGroupSnapshot),
279279
snapshotContentfactory.Groupsnapshot().V1beta1().VolumeGroupSnapshotContents(),
280280
snapshotContentfactory.Groupsnapshot().V1beta1().VolumeGroupSnapshotClasses(),
281-
workqueue.NewItemExponentialFailureRateLimiter(*retryIntervalStart, *retryIntervalMax),
281+
workqueue.NewTypedItemExponentialFailureRateLimiter[string](*retryIntervalStart, *retryIntervalMax),
282282
)
283283

284284
run := func(context.Context) {

cmd/snapshot-controller/main.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -233,10 +233,10 @@ func main() {
233233
nodeInformer,
234234
metricsManager,
235235
*resyncPeriod,
236-
workqueue.NewItemExponentialFailureRateLimiter(*retryIntervalStart, *retryIntervalMax),
237-
workqueue.NewItemExponentialFailureRateLimiter(*retryIntervalStart, *retryIntervalMax),
238-
workqueue.NewItemExponentialFailureRateLimiter(*retryIntervalStart, *retryIntervalMax),
239-
workqueue.NewItemExponentialFailureRateLimiter(*retryIntervalStart, *retryIntervalMax),
236+
workqueue.NewTypedItemExponentialFailureRateLimiter[string](*retryIntervalStart, *retryIntervalMax),
237+
workqueue.NewTypedItemExponentialFailureRateLimiter[string](*retryIntervalStart, *retryIntervalMax),
238+
workqueue.NewTypedItemExponentialFailureRateLimiter[string](*retryIntervalStart, *retryIntervalMax),
239+
workqueue.NewTypedItemExponentialFailureRateLimiter[string](*retryIntervalStart, *retryIntervalMax),
240240
*enableDistributedSnapshotting,
241241
*preventVolumeModeConversion,
242242
utilfeature.DefaultFeatureGate.Enabled(features.VolumeGroupSnapshot),

pkg/common-controller/framework_test.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1205,10 +1205,10 @@ func newTestController(kubeClient kubernetes.Interface, clientset clientset.Inte
12051205
nil,
12061206
metricsManager,
12071207
60*time.Second,
1208-
workqueue.NewItemExponentialFailureRateLimiter(1*time.Millisecond, 1*time.Minute),
1209-
workqueue.NewItemExponentialFailureRateLimiter(1*time.Millisecond, 1*time.Minute),
1210-
workqueue.NewItemExponentialFailureRateLimiter(1*time.Millisecond, 1*time.Minute),
1211-
workqueue.NewItemExponentialFailureRateLimiter(1*time.Millisecond, 1*time.Minute),
1208+
workqueue.NewTypedItemExponentialFailureRateLimiter[string](1*time.Millisecond, 1*time.Minute),
1209+
workqueue.NewTypedItemExponentialFailureRateLimiter[string](1*time.Millisecond, 1*time.Minute),
1210+
workqueue.NewTypedItemExponentialFailureRateLimiter[string](1*time.Millisecond, 1*time.Minute),
1211+
workqueue.NewTypedItemExponentialFailureRateLimiter[string](1*time.Millisecond, 1*time.Minute),
12121212
false,
12131213
false,
12141214
true,

pkg/common-controller/snapshot_controller_base.go

Lines changed: 50 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -50,10 +50,10 @@ type csiSnapshotCommonController struct {
5050
clientset clientset.Interface
5151
client kubernetes.Interface
5252
eventRecorder record.EventRecorder
53-
snapshotQueue workqueue.RateLimitingInterface
54-
contentQueue workqueue.RateLimitingInterface
55-
groupSnapshotQueue workqueue.RateLimitingInterface
56-
groupSnapshotContentQueue workqueue.RateLimitingInterface
53+
snapshotQueue workqueue.TypedRateLimitingInterface[string]
54+
contentQueue workqueue.TypedRateLimitingInterface[string]
55+
groupSnapshotQueue workqueue.TypedRateLimitingInterface[string]
56+
groupSnapshotContentQueue workqueue.TypedRateLimitingInterface[string]
5757

5858
snapshotLister snapshotlisters.VolumeSnapshotLister
5959
snapshotListerSynced cache.InformerSynced
@@ -106,10 +106,10 @@ func NewCSISnapshotCommonController(
106106
nodeInformer coreinformers.NodeInformer,
107107
metricsManager metrics.MetricsManager,
108108
resyncPeriod time.Duration,
109-
snapshotRateLimiter workqueue.RateLimiter,
110-
contentRateLimiter workqueue.RateLimiter,
111-
groupSnapshotRateLimiter workqueue.RateLimiter,
112-
groupSnapshotContentRateLimiter workqueue.RateLimiter,
109+
snapshotRateLimiter workqueue.TypedRateLimiter[string],
110+
contentRateLimiter workqueue.TypedRateLimiter[string],
111+
groupSnapshotRateLimiter workqueue.TypedRateLimiter[string],
112+
groupSnapshotContentRateLimiter workqueue.TypedRateLimiter[string],
113113
enableDistributedSnapshotting bool,
114114
preventVolumeModeConversion bool,
115115
enableVolumeGroupSnapshots bool,
@@ -121,14 +121,18 @@ func NewCSISnapshotCommonController(
121121
eventRecorder = broadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: fmt.Sprintf("snapshot-controller")})
122122

123123
ctrl := &csiSnapshotCommonController{
124-
clientset: clientset,
125-
client: client,
126-
eventRecorder: eventRecorder,
127-
resyncPeriod: resyncPeriod,
128-
snapshotStore: cache.NewStore(cache.DeletionHandlingMetaNamespaceKeyFunc),
129-
contentStore: cache.NewStore(cache.DeletionHandlingMetaNamespaceKeyFunc),
130-
snapshotQueue: workqueue.NewNamedRateLimitingQueue(snapshotRateLimiter, "snapshot-controller-snapshot"),
131-
contentQueue: workqueue.NewNamedRateLimitingQueue(contentRateLimiter, "snapshot-controller-content"),
124+
clientset: clientset,
125+
client: client,
126+
eventRecorder: eventRecorder,
127+
resyncPeriod: resyncPeriod,
128+
snapshotStore: cache.NewStore(cache.DeletionHandlingMetaNamespaceKeyFunc),
129+
contentStore: cache.NewStore(cache.DeletionHandlingMetaNamespaceKeyFunc),
130+
snapshotQueue: workqueue.NewTypedRateLimitingQueueWithConfig(snapshotRateLimiter,
131+
workqueue.TypedRateLimitingQueueConfig[string]{
132+
Name: "snapshot-controller-snapshot"}),
133+
contentQueue: workqueue.NewTypedRateLimitingQueueWithConfig(contentRateLimiter,
134+
workqueue.TypedRateLimitingQueueConfig[string]{
135+
Name: "snapshot-controller-content"}),
132136
metricsManager: metricsManager,
133137
}
134138

@@ -203,8 +207,12 @@ func NewCSISnapshotCommonController(
203207
ctrl.groupSnapshotStore = cache.NewStore(cache.DeletionHandlingMetaNamespaceKeyFunc)
204208
ctrl.groupSnapshotContentStore = cache.NewStore(cache.DeletionHandlingMetaNamespaceKeyFunc)
205209

206-
ctrl.groupSnapshotQueue = workqueue.NewNamedRateLimitingQueue(groupSnapshotRateLimiter, "snapshot-controller-group-snapshot")
207-
ctrl.groupSnapshotContentQueue = workqueue.NewNamedRateLimitingQueue(groupSnapshotContentRateLimiter, "snapshot-controller-group-content")
210+
ctrl.groupSnapshotQueue = workqueue.NewTypedRateLimitingQueueWithConfig(
211+
groupSnapshotRateLimiter, workqueue.TypedRateLimitingQueueConfig[string]{
212+
Name: "snapshot-controller-group-snapshot"})
213+
ctrl.groupSnapshotContentQueue = workqueue.NewTypedRateLimitingQueueWithConfig(
214+
groupSnapshotContentRateLimiter, workqueue.TypedRateLimitingQueueConfig[string]{
215+
Name: "snapshot-controller-group-content"})
208216

209217
volumeGroupSnapshotInformer.Informer().AddEventHandlerWithResyncPeriod(
210218
cache.ResourceEventHandlerFuncs{
@@ -316,21 +324,21 @@ func (ctrl *csiSnapshotCommonController) enqueueContentWork(obj interface{}) {
316324

317325
// snapshotWorker is the main worker for VolumeSnapshots.
318326
func (ctrl *csiSnapshotCommonController) snapshotWorker() {
319-
keyObj, quit := ctrl.snapshotQueue.Get()
327+
key, quit := ctrl.snapshotQueue.Get()
320328
if quit {
321329
return
322330
}
323-
defer ctrl.snapshotQueue.Done(keyObj)
331+
defer ctrl.snapshotQueue.Done(key)
324332

325-
if err := ctrl.syncSnapshotByKey(keyObj.(string)); err != nil {
333+
if err := ctrl.syncSnapshotByKey(key); err != nil {
326334
// Rather than wait for a full resync, re-add the key to the
327335
// queue to be processed.
328-
ctrl.snapshotQueue.AddRateLimited(keyObj)
329-
klog.V(4).Infof("Failed to sync snapshot %q, will retry again: %v", keyObj.(string), err)
336+
ctrl.snapshotQueue.AddRateLimited(key)
337+
klog.V(4).Infof("Failed to sync snapshot %q, will retry again: %v", key, err)
330338
} else {
331339
// Finally, if no error occurs we Forget this item so it does not
332340
// get queued again until another change happens.
333-
ctrl.snapshotQueue.Forget(keyObj)
341+
ctrl.snapshotQueue.Forget(key)
334342
}
335343
}
336344

@@ -392,21 +400,21 @@ func (ctrl *csiSnapshotCommonController) syncSnapshotByKey(key string) error {
392400

393401
// contentWorker is the main worker for VolumeSnapshotContent.
394402
func (ctrl *csiSnapshotCommonController) contentWorker() {
395-
keyObj, quit := ctrl.contentQueue.Get()
403+
key, quit := ctrl.contentQueue.Get()
396404
if quit {
397405
return
398406
}
399-
defer ctrl.contentQueue.Done(keyObj)
407+
defer ctrl.contentQueue.Done(key)
400408

401-
if err := ctrl.syncContentByKey(keyObj.(string)); err != nil {
409+
if err := ctrl.syncContentByKey(key); err != nil {
402410
// Rather than wait for a full resync, re-add the key to the
403411
// queue to be processed.
404-
ctrl.contentQueue.AddRateLimited(keyObj)
405-
klog.V(4).Infof("Failed to sync content %q, will retry again: %v", keyObj.(string), err)
412+
ctrl.contentQueue.AddRateLimited(key)
413+
klog.V(4).Infof("Failed to sync content %q, will retry again: %v", key, err)
406414
} else {
407415
// Finally, if no error occurs we Forget this item so it does not
408416
// get queued again until another change happens.
409-
ctrl.contentQueue.Forget(keyObj)
417+
ctrl.contentQueue.Forget(key)
410418
}
411419
}
412420

@@ -680,41 +688,41 @@ func (ctrl *csiSnapshotCommonController) enqueueGroupSnapshotContentWork(obj int
680688

681689
// groupSnapshotWorker is the main worker for VolumeGroupSnapshots.
682690
func (ctrl *csiSnapshotCommonController) groupSnapshotWorker() {
683-
keyObj, quit := ctrl.groupSnapshotQueue.Get()
691+
key, quit := ctrl.groupSnapshotQueue.Get()
684692
if quit {
685693
return
686694
}
687-
defer ctrl.groupSnapshotQueue.Done(keyObj)
695+
defer ctrl.groupSnapshotQueue.Done(key)
688696

689-
if err := ctrl.syncGroupSnapshotByKey(context.Background(), keyObj.(string)); err != nil {
697+
if err := ctrl.syncGroupSnapshotByKey(context.Background(), key); err != nil {
690698
// Rather than wait for a full resync, re-add the key to the
691699
// queue to be processed.
692-
ctrl.groupSnapshotQueue.AddRateLimited(keyObj)
693-
klog.V(4).Infof("Failed to sync group snapshot %q, will retry again: %v", keyObj.(string), err)
700+
ctrl.groupSnapshotQueue.AddRateLimited(key)
701+
klog.V(4).Infof("Failed to sync group snapshot %q, will retry again: %v", key, err)
694702
} else {
695703
// Finally, if no error occurs we forget this item so it does not
696704
// get queued again until another change happens.
697-
ctrl.groupSnapshotQueue.Forget(keyObj)
705+
ctrl.groupSnapshotQueue.Forget(key)
698706
}
699707
}
700708

701709
// groupSnapshotContentWorker is the main worker for VolumeGroupSnapshotContent.
702710
func (ctrl *csiSnapshotCommonController) groupSnapshotContentWorker() {
703-
keyObj, quit := ctrl.groupSnapshotContentQueue.Get()
711+
key, quit := ctrl.groupSnapshotContentQueue.Get()
704712
if quit {
705713
return
706714
}
707-
defer ctrl.groupSnapshotContentQueue.Done(keyObj)
715+
defer ctrl.groupSnapshotContentQueue.Done(key)
708716

709-
if err := ctrl.syncGroupSnapshotContentByKey(keyObj.(string)); err != nil {
717+
if err := ctrl.syncGroupSnapshotContentByKey(key); err != nil {
710718
// Rather than wait for a full resync, re-add the key to the
711719
// queue to be processed.
712-
ctrl.groupSnapshotContentQueue.AddRateLimited(keyObj)
713-
klog.V(4).Infof("Failed to sync content %q, will retry again: %v", keyObj.(string), err)
720+
ctrl.groupSnapshotContentQueue.AddRateLimited(key)
721+
klog.V(4).Infof("Failed to sync content %q, will retry again: %v", key, err)
714722
} else {
715723
// Finally, if no error occurs we Forget this item so it does not
716724
// get queued again until another change happens.
717-
ctrl.groupSnapshotContentQueue.Forget(keyObj)
725+
ctrl.groupSnapshotContentQueue.Forget(key)
718726
}
719727
}
720728

pkg/sidecar-controller/framework_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -575,11 +575,11 @@ func newTestController(kubeClient kubernetes.Interface, clientset clientset.Inte
575575
"groupsnapshot",
576576
-1,
577577
true,
578-
workqueue.NewItemExponentialFailureRateLimiter(1*time.Millisecond, 1*time.Minute),
578+
workqueue.NewTypedItemExponentialFailureRateLimiter[string](1*time.Millisecond, 1*time.Minute),
579579
false,
580580
informerFactory.Groupsnapshot().V1beta1().VolumeGroupSnapshotContents(),
581581
informerFactory.Groupsnapshot().V1beta1().VolumeGroupSnapshotClasses(),
582-
workqueue.NewItemExponentialFailureRateLimiter(1*time.Millisecond, 1*time.Minute),
582+
workqueue.NewTypedItemExponentialFailureRateLimiter[string](1*time.Millisecond, 1*time.Minute),
583583
)
584584

585585
ctrl.eventRecorder = record.NewFakeRecorder(1000)

pkg/sidecar-controller/groupsnapshot_helper.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -66,23 +66,23 @@ func (ctrl *csiSnapshotSideCarController) enqueueGroupSnapshotContentWork(obj in
6666
// groupSnapshotContentWorker processes items from groupSnapshotContentQueue.
6767
// It must run only once, syncGroupSnapshotContent is not assured to be reentrant.
6868
func (ctrl *csiSnapshotSideCarController) groupSnapshotContentWorker() {
69-
keyObj, quit := ctrl.groupSnapshotContentQueue.Get()
69+
key, quit := ctrl.groupSnapshotContentQueue.Get()
7070
if quit {
7171
return
7272
}
73-
defer ctrl.groupSnapshotContentQueue.Done(keyObj)
73+
defer ctrl.groupSnapshotContentQueue.Done(key)
7474

75-
if err := ctrl.syncGroupSnapshotContentByKey(keyObj.(string)); err != nil {
75+
if err := ctrl.syncGroupSnapshotContentByKey(key); err != nil {
7676
// Rather than wait for a full resync, re-add the key to the
7777
// queue to be processed.
78-
ctrl.groupSnapshotContentQueue.AddRateLimited(keyObj)
79-
klog.V(4).Infof("Failed to sync group snapshot content %q, will retry again: %v", keyObj.(string), err)
78+
ctrl.groupSnapshotContentQueue.AddRateLimited(key)
79+
klog.V(4).Infof("Failed to sync group snapshot content %q, will retry again: %v", key, err)
8080
return
8181
}
8282

8383
// Finally, if no error occurs we forget this item so it does not
8484
// get queued again until another change happens.
85-
ctrl.groupSnapshotContentQueue.Forget(keyObj)
85+
ctrl.groupSnapshotContentQueue.Forget(key)
8686
return
8787
}
8888

pkg/sidecar-controller/snapshot_controller_base.go

Lines changed: 23 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ type csiSnapshotSideCarController struct {
5050
client kubernetes.Interface
5151
driverName string
5252
eventRecorder record.EventRecorder
53-
contentQueue workqueue.RateLimitingInterface
53+
contentQueue workqueue.TypedRateLimitingInterface[string]
5454
extraCreateMetadata bool
5555

5656
contentLister snapshotlisters.VolumeSnapshotContentLister
@@ -65,7 +65,7 @@ type csiSnapshotSideCarController struct {
6565
resyncPeriod time.Duration
6666

6767
enableVolumeGroupSnapshots bool
68-
groupSnapshotContentQueue workqueue.RateLimitingInterface
68+
groupSnapshotContentQueue workqueue.TypedRateLimitingInterface[string]
6969
groupSnapshotContentLister groupsnapshotlisters.VolumeGroupSnapshotContentLister
7070
groupSnapshotContentListerSynced cache.InformerSynced
7171
groupSnapshotClassLister groupsnapshotlisters.VolumeGroupSnapshotClassLister
@@ -89,11 +89,11 @@ func NewCSISnapshotSideCarController(
8989
groupSnapshotNamePrefix string,
9090
groupSnapshotNameUUIDLength int,
9191
extraCreateMetadata bool,
92-
contentRateLimiter workqueue.RateLimiter,
92+
contentRateLimiter workqueue.TypedRateLimiter[string],
9393
enableVolumeGroupSnapshots bool,
9494
volumeGroupSnapshotContentInformer groupsnapshotinformers.VolumeGroupSnapshotContentInformer,
9595
volumeGroupSnapshotClassInformer groupsnapshotinformers.VolumeGroupSnapshotClassInformer,
96-
groupSnapshotContentRateLimiter workqueue.RateLimiter,
96+
groupSnapshotContentRateLimiter workqueue.TypedRateLimiter[string],
9797
) *csiSnapshotSideCarController {
9898
broadcaster := record.NewBroadcaster()
9999
broadcaster.StartLogging(klog.Infof)
@@ -102,14 +102,16 @@ func NewCSISnapshotSideCarController(
102102
eventRecorder = broadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: fmt.Sprintf("csi-snapshotter %s", driverName)})
103103

104104
ctrl := &csiSnapshotSideCarController{
105-
clientset: clientset,
106-
client: client,
107-
driverName: driverName,
108-
eventRecorder: eventRecorder,
109-
handler: NewCSIHandler(snapshotter, groupSnapshotter, timeout, snapshotNamePrefix, snapshotNameUUIDLength, groupSnapshotNamePrefix, groupSnapshotNameUUIDLength),
110-
resyncPeriod: resyncPeriod,
111-
contentStore: cache.NewStore(cache.DeletionHandlingMetaNamespaceKeyFunc),
112-
contentQueue: workqueue.NewNamedRateLimitingQueue(contentRateLimiter, "csi-snapshotter-content"),
105+
clientset: clientset,
106+
client: client,
107+
driverName: driverName,
108+
eventRecorder: eventRecorder,
109+
handler: NewCSIHandler(snapshotter, groupSnapshotter, timeout, snapshotNamePrefix, snapshotNameUUIDLength, groupSnapshotNamePrefix, groupSnapshotNameUUIDLength),
110+
resyncPeriod: resyncPeriod,
111+
contentStore: cache.NewStore(cache.DeletionHandlingMetaNamespaceKeyFunc),
112+
contentQueue: workqueue.NewTypedRateLimitingQueueWithConfig(
113+
contentRateLimiter, workqueue.TypedRateLimitingQueueConfig[string]{
114+
Name: "csi-snapshotter-content"}),
113115
extraCreateMetadata: extraCreateMetadata,
114116
}
115117

@@ -136,7 +138,9 @@ func NewCSISnapshotSideCarController(
136138
ctrl.enableVolumeGroupSnapshots = enableVolumeGroupSnapshots
137139
if enableVolumeGroupSnapshots {
138140
ctrl.groupSnapshotContentStore = cache.NewStore(cache.DeletionHandlingMetaNamespaceKeyFunc)
139-
ctrl.groupSnapshotContentQueue = workqueue.NewNamedRateLimitingQueue(groupSnapshotContentRateLimiter, "csi-snapshotter-groupsnapshotcontent")
141+
ctrl.groupSnapshotContentQueue = workqueue.NewTypedRateLimitingQueueWithConfig(
142+
groupSnapshotContentRateLimiter, workqueue.TypedRateLimitingQueueConfig[string]{
143+
Name: "csi-snapshotter-groupsnapshotcontent"})
140144

141145
volumeGroupSnapshotContentInformer.Informer().AddEventHandlerWithResyncPeriod(
142146
cache.ResourceEventHandlerFuncs{
@@ -217,27 +221,27 @@ func (ctrl *csiSnapshotSideCarController) contentWorker() {
217221
}
218222

219223
func (ctrl *csiSnapshotSideCarController) processNextItem() bool {
220-
keyObj, quit := ctrl.contentQueue.Get()
224+
key, quit := ctrl.contentQueue.Get()
221225
if quit {
222226
return false
223227
}
224-
defer ctrl.contentQueue.Done(keyObj)
228+
defer ctrl.contentQueue.Done(key)
225229

226-
requeue, err := ctrl.syncContentByKey(keyObj.(string))
230+
requeue, err := ctrl.syncContentByKey(key)
227231
if err != nil {
228-
klog.V(4).Infof("Failed to sync content %q, will retry again: %v", keyObj.(string), err)
232+
klog.V(4).Infof("Failed to sync content %q, will retry again: %v", key, err)
229233
// Always requeue on error to be able to call functions like "return false, doSomething()" where doSomething
230234
// does not need to worry about re-queueing.
231235
requeue = true
232236
}
233237
if requeue {
234-
ctrl.contentQueue.AddRateLimited(keyObj)
238+
ctrl.contentQueue.AddRateLimited(key)
235239
return true
236240
}
237241

238242
// Finally, if no error occurs we Forget this item so it does not
239243
// get queued again until another change happens.
240-
ctrl.contentQueue.Forget(keyObj)
244+
ctrl.contentQueue.Forget(key)
241245
return true
242246
}
243247

0 commit comments

Comments
 (0)