@@ -24,6 +24,7 @@ import (
24
24
25
25
"github.com/golang/glog"
26
26
27
+ "k8s.io/api/core/v1"
27
28
"k8s.io/apimachinery/pkg/api/errors"
28
29
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
29
30
"k8s.io/apimachinery/pkg/types"
@@ -33,7 +34,6 @@ import (
33
34
"k8s.io/client-go/tools/cache"
34
35
"k8s.io/client-go/tools/record"
35
36
36
- "k8s.io/api/core/v1"
37
37
clientset "k8s.io/client-go/kubernetes"
38
38
"k8s.io/client-go/kubernetes/scheme"
39
39
v1core "k8s.io/client-go/kubernetes/typed/core/v1"
@@ -155,16 +155,21 @@ func (ca *cloudCIDRAllocator) worker(stopChan <-chan struct{}) {
155
155
glog .Warning ("Channel nodeCIDRUpdateChannel was unexpectedly closed" )
156
156
return
157
157
}
158
- if err := ca .updateCIDRAllocation (workItem ); err != nil {
159
- if ca .canRetry (workItem ) {
160
- time .AfterFunc (updateRetryTimeout , func () {
158
+ if err := ca .updateCIDRAllocation (workItem ); err == nil {
159
+ glog .V (3 ).Infof ("Updated CIDR for %q" , workItem )
160
+ ca .removeNodeFromProcessing (workItem )
161
+ } else {
162
+ glog .Errorf ("Error updating CIDR for %q: %v" , workItem , err )
163
+ if canRetry , timeout := ca .retryParams (workItem ); canRetry {
164
+ glog .V (2 ).Infof ("Retrying update for %q after %v" , workItem , timeout )
165
+ time .AfterFunc (timeout , func () {
161
166
// Requeue the failed node for update again.
162
167
ca .nodeUpdateChannel <- workItem
163
168
})
164
169
continue
165
170
}
171
+ glog .Errorf ("Exceeded retry count for %q, dropping from queue" , workItem )
166
172
}
167
- ca .removeNodeFromProcessing (workItem )
168
173
case <- stopChan :
169
174
return
170
175
}
@@ -181,15 +186,34 @@ func (ca *cloudCIDRAllocator) insertNodeToProcessing(nodeName string) bool {
181
186
return true
182
187
}
183
188
184
- func (ca * cloudCIDRAllocator ) canRetry (nodeName string ) bool {
189
+ func (ca * cloudCIDRAllocator ) retryParams (nodeName string ) ( bool , time. Duration ) {
185
190
ca .lock .Lock ()
186
191
defer ca .lock .Unlock ()
187
- count := ca .nodesInProcessing [nodeName ].retries + 1
192
+
193
+ entry , ok := ca .nodesInProcessing [nodeName ]
194
+ if ! ok {
195
+ glog .Errorf ("Cannot get retryParams for %q as entry does not exist" , nodeName )
196
+ return false , 0
197
+ }
198
+
199
+ count := entry .retries + 1
188
200
if count > updateMaxRetries {
189
- return false
201
+ return false , 0
190
202
}
191
203
ca .nodesInProcessing [nodeName ].retries = count
192
- return true
204
+
205
+ return true , nodeUpdateRetryTimeout (count )
206
+ }
207
+
208
+ func nodeUpdateRetryTimeout (count int ) time.Duration {
209
+ timeout := updateRetryTimeout
210
+ for i := 0 ; i < count && timeout < maxUpdateRetryTimeout ; i ++ {
211
+ timeout *= 2
212
+ }
213
+ if timeout > maxUpdateRetryTimeout {
214
+ return maxUpdateRetryTimeout
215
+ }
216
+ return timeout
193
217
}
194
218
195
219
func (ca * cloudCIDRAllocator ) removeNodeFromProcessing (nodeName string ) {
0 commit comments