@@ -48,36 +48,36 @@ func newUpdaterPool(nfdMaster *nfdMaster) *updaterPool {
48
48
}
49
49
}
50
50
51
- func (u * updaterPool ) processNodeUpdateRequest (cli k8sclient.Interface , queue workqueue. RateLimitingInterface ) bool {
52
- n , quit := queue .Get ()
51
+ func (u * updaterPool ) processNodeUpdateRequest (cli k8sclient.Interface ) bool {
52
+ n , quit := u . queue .Get ()
53
53
if quit {
54
54
return false
55
55
}
56
56
nodeName := n .(string )
57
57
58
- defer queue .Done (nodeName )
58
+ defer u . queue .Done (nodeName )
59
59
60
60
nodeUpdateRequests .Inc ()
61
61
62
62
// Check if node exists
63
63
if node , err := getNode (cli , nodeName ); apierrors .IsNotFound (err ) {
64
64
klog .InfoS ("node not found, skip update" , "nodeName" , nodeName )
65
65
} else if err := u .nfdMaster .nfdAPIUpdateOneNode (cli , node ); err != nil {
66
- if n := queue .NumRequeues (nodeName ); n < 15 {
66
+ if n := u . queue .NumRequeues (nodeName ); n < 15 {
67
67
klog .InfoS ("retrying node update" , "nodeName" , nodeName , "lastError" , err , "numRetries" , n )
68
68
} else {
69
69
klog .ErrorS (err , "node update failed, queuing for retry " , "nodeName" , nodeName , "numRetries" , n )
70
70
// Count only long-failing attempts
71
71
nodeUpdateFailures .Inc ()
72
72
}
73
- queue .AddRateLimited (nodeName )
73
+ u . queue .AddRateLimited (nodeName )
74
74
return true
75
75
}
76
- queue .Forget (nodeName )
76
+ u . queue .Forget (nodeName )
77
77
return true
78
78
}
79
79
80
- func (u * updaterPool ) runNodeUpdater (queue workqueue. RateLimitingInterface ) {
80
+ func (u * updaterPool ) runNodeUpdater () {
81
81
var cli k8sclient.Interface
82
82
if u .nfdMaster .kubeconfig != nil {
83
83
// For normal execution, initialize a separate api client for each updater
@@ -86,17 +86,17 @@ func (u *updaterPool) runNodeUpdater(queue workqueue.RateLimitingInterface) {
86
86
// For tests, re-use the api client from nfd-master
87
87
cli = u .nfdMaster .k8sClient
88
88
}
89
- for u .processNodeUpdateRequest (cli , queue ) {
89
+ for u .processNodeUpdateRequest (cli ) {
90
90
}
91
91
u .wg .Done ()
92
92
}
93
93
94
- func (u * updaterPool ) processNodeFeatureGroupUpdateRequest (cli nfdclientset.Interface , ngfQueue workqueue. RateLimitingInterface ) bool {
95
- nfgName , quit := ngfQueue .Get ()
94
+ func (u * updaterPool ) processNodeFeatureGroupUpdateRequest (cli nfdclientset.Interface ) bool {
95
+ nfgName , quit := u . nfgQueue .Get ()
96
96
if quit {
97
97
return false
98
98
}
99
- defer ngfQueue .Done (nfgName )
99
+ defer u . nfgQueue .Done (nfgName )
100
100
101
101
nodeFeatureGroupUpdateRequests .Inc ()
102
102
@@ -106,22 +106,22 @@ func (u *updaterPool) processNodeFeatureGroupUpdateRequest(cli nfdclientset.Inte
106
106
if nfg , err = getNodeFeatureGroup (cli , u .nfdMaster .namespace , nfgName .(string )); apierrors .IsNotFound (err ) {
107
107
klog .InfoS ("NodeFeatureGroup not found, skip update" , "NodeFeatureGroupName" , nfgName )
108
108
} else if err := u .nfdMaster .nfdAPIUpdateNodeFeatureGroup (u .nfdMaster .nfdClient , nfg ); err != nil {
109
- if n := ngfQueue .NumRequeues (nfgName ); n < 15 {
109
+ if n := u . nfgQueue .NumRequeues (nfgName ); n < 15 {
110
110
klog .InfoS ("retrying NodeFeatureGroup update" , "nodeFeatureGroup" , klog .KObj (nfg ), "lastError" , err )
111
111
} else {
112
112
klog .ErrorS (err , "failed to update NodeFeatureGroup, queueing for retry" , "nodeFeatureGroup" , klog .KObj (nfg ), "lastError" , err , "numRetries" , n )
113
113
}
114
- ngfQueue .AddRateLimited (nfgName )
114
+ u . nfgQueue .AddRateLimited (nfgName )
115
115
return true
116
116
}
117
117
118
- ngfQueue .Forget (nfgName )
118
+ u . nfgQueue .Forget (nfgName )
119
119
return true
120
120
}
121
121
122
- func (u * updaterPool ) runNodeFeatureGroupUpdater (ngfQueue workqueue. RateLimitingInterface ) {
122
+ func (u * updaterPool ) runNodeFeatureGroupUpdater () {
123
123
cli := nfdclientset .NewForConfigOrDie (u .nfdMaster .kubeconfig )
124
- for u .processNodeFeatureGroupUpdateRequest (cli , ngfQueue ) {
124
+ for u .processNodeFeatureGroupUpdateRequest (cli ) {
125
125
}
126
126
u .nfgWg .Done ()
127
127
}
@@ -148,10 +148,10 @@ func (u *updaterPool) start(parallelism int) {
148
148
149
149
for i := 0 ; i < parallelism ; i ++ {
150
150
u .wg .Add (1 )
151
- go u .runNodeUpdater (u . queue )
151
+ go u .runNodeUpdater ()
152
152
if features .NFDFeatureGate .Enabled (features .NodeFeatureGroupAPI ) {
153
153
u .nfgWg .Add (1 )
154
- go u .runNodeFeatureGroupUpdater (u . nfgQueue )
154
+ go u .runNodeFeatureGroupUpdater ()
155
155
}
156
156
}
157
157
u .started = true
0 commit comments