@@ -31,8 +31,8 @@ import (
31
31
32
32
type updaterPool struct {
33
33
started bool
34
- queue workqueue.RateLimitingInterface
35
- nfgQueue workqueue.RateLimitingInterface
34
+ queue workqueue.TypedRateLimitingInterface [ string ]
35
+ nfgQueue workqueue.TypedRateLimitingInterface [ string ]
36
36
sync.RWMutex
37
37
38
38
wg sync.WaitGroup
@@ -48,11 +48,10 @@ func newUpdaterPool(nfdMaster *nfdMaster) *updaterPool {
48
48
}
49
49
50
50
func (u * updaterPool ) processNodeUpdateRequest (cli k8sclient.Interface ) bool {
51
- n , quit := u .queue .Get ()
51
+ nodeName , quit := u .queue .Get ()
52
52
if quit {
53
53
return false
54
54
}
55
- nodeName := n .(string )
56
55
57
56
defer u .queue .Done (nodeName )
58
57
@@ -102,7 +101,7 @@ func (u *updaterPool) processNodeFeatureGroupUpdateRequest(cli nfdclientset.Inte
102
101
// Check if NodeFeatureGroup exists
103
102
var nfg * nfdv1alpha1.NodeFeatureGroup
104
103
var err error
105
- if nfg , err = getNodeFeatureGroup (cli , u .nfdMaster .namespace , nfgName .( string ) ); apierrors .IsNotFound (err ) {
104
+ if nfg , err = getNodeFeatureGroup (cli , u .nfdMaster .namespace , nfgName ); apierrors .IsNotFound (err ) {
106
105
klog .InfoS ("NodeFeatureGroup not found, skip update" , "NodeFeatureGroupName" , nfgName )
107
106
} else if err := u .nfdMaster .nfdAPIUpdateNodeFeatureGroup (u .nfdMaster .nfdClient , nfg ); err != nil {
108
107
if n := u .nfgQueue .NumRequeues (nfgName ); n < 15 {
@@ -145,12 +144,12 @@ func (u *updaterPool) start(parallelism int) {
145
144
146
145
// Create ratelimiter. Mimic workqueue.DefaultControllerRateLimiter() but
147
146
// with modified per-item (node) rate limiting parameters.
148
- rl := workqueue .NewMaxOfRateLimiter (
149
- workqueue .NewItemExponentialFailureRateLimiter (50 * time .Millisecond , 100 * time .Second ),
150
- & workqueue.BucketRateLimiter {Limiter : rate .NewLimiter (rate .Limit (10 ), 100 )},
147
+ rl := workqueue .NewTypedMaxOfRateLimiter [ string ] (
148
+ workqueue .NewTypedItemExponentialFailureRateLimiter [ string ] (50 * time .Millisecond , 100 * time .Second ),
149
+ & workqueue.TypedBucketRateLimiter [ string ] {Limiter : rate .NewLimiter (rate .Limit (10 ), 100 )},
151
150
)
152
- u .queue = workqueue .NewRateLimitingQueue (rl )
153
- u .nfgQueue = workqueue .NewRateLimitingQueue (rl )
151
+ u .queue = workqueue.NewTypedRateLimitingQueue [ string ] (rl )
152
+ u .nfgQueue = workqueue.NewTypedRateLimitingQueue [ string ] (rl )
154
153
155
154
for i := 0 ; i < parallelism ; i ++ {
156
155
u .wg .Add (1 )
0 commit comments