@@ -20,6 +20,8 @@ import (
20
20
"fmt"
21
21
"time"
22
22
23
+ "golang.org/x/time/rate"
24
+
23
25
v1 "k8s.io/api/core/v1"
24
26
discovery "k8s.io/api/discovery/v1beta1"
25
27
apierrors "k8s.io/apimachinery/pkg/api/errors"
@@ -47,13 +49,24 @@ const (
47
49
// maxRetries is the number of times a service will be retried before it is
48
50
// dropped out of the queue. Any sync error, such as a failure to create or
49
51
// update an EndpointSlice could trigger a retry. With the current
50
- // rate-limiter in use (5ms*2^(maxRetries-1)) the following numbers
51
- // represent the sequence of delays between successive queuings of a
52
- // service.
52
+ // rate-limiter in use (1s*2^(numRetries-1)) the following numbers represent
53
+ // the sequence of delays between successive queuings of a service.
53
54
//
54
- // 5ms, 10ms, 20ms, 40ms, 80ms, 160ms, 320ms, 640ms, 1.3s, 2.6s, 5.1s,
55
- // 10.2s, 20.4s, 41s, 82s
55
+ // 1s, 2s, 4s, 8s, 16s, 32s, 64s, 128s, 256s, 512s, 1000s (max)
56
56
maxRetries = 15
57
+
58
+ // endpointSliceChangeMinSyncDelay indicates the mininum delay before
59
+ // queuing a syncService call after an EndpointSlice changes. If
60
+ // endpointUpdatesBatchPeriod is greater than this value, it will be used
61
+ // instead. This helps batch processing of changes to multiple
62
+ // EndpointSlices.
63
+ endpointSliceChangeMinSyncDelay = 1 * time .Second
64
+
65
+ // defaultSyncBackOff is the default backoff period for syncService calls.
66
+ defaultSyncBackOff = 1 * time .Second
67
+ // maxSyncBackOff is the max backoff period for syncService calls.
68
+ maxSyncBackOff = 100 * time .Second
69
+
57
70
// controllerName is a unique value used with LabelManagedBy to indicated
58
71
// the component managing an EndpointSlice.
59
72
controllerName = "endpointslice-controller.k8s.io"
@@ -80,8 +93,19 @@ func NewController(podInformer coreinformers.PodInformer,
80
93
endpointslicemetrics .RegisterMetrics ()
81
94
82
95
c := & Controller {
83
- client : client ,
84
- queue : workqueue .NewNamedRateLimitingQueue (workqueue .DefaultControllerRateLimiter (), "endpoint_slice" ),
96
+ client : client ,
97
+ // This is similar to the DefaultControllerRateLimiter, just with a
98
+ // significantly higher default backoff (1s vs 5ms). This controller
99
+ // processes events that can require significant EndpointSlice changes,
100
+ // such as an update to a Service or Deployment. A more significant
101
+ // rate limit back off here helps ensure that the Controller does not
102
+ // overwhelm the API Server.
103
+ queue : workqueue .NewNamedRateLimitingQueue (workqueue .NewMaxOfRateLimiter (
104
+ workqueue .NewItemExponentialFailureRateLimiter (defaultSyncBackOff , maxSyncBackOff ),
105
+ // 10 qps, 100 bucket size. This is only for retry speed and its
106
+ // only the overall factor (not per item).
107
+ & workqueue.BucketRateLimiter {Limiter : rate .NewLimiter (rate .Limit (10 ), 100 )},
108
+ ), "endpoint_slice" ),
85
109
workerLoopPeriod : time .Second ,
86
110
}
87
111
@@ -409,7 +433,14 @@ func (c *Controller) queueServiceForEndpointSlice(endpointSlice *discovery.Endpo
409
433
utilruntime .HandleError (fmt .Errorf ("Couldn't get key for EndpointSlice %+v: %v" , endpointSlice , err ))
410
434
return
411
435
}
412
- c .queue .Add (key )
436
+
437
+ // queue after the max of endpointSliceChangeMinSyncDelay and
438
+ // endpointUpdatesBatchPeriod.
439
+ delay := endpointSliceChangeMinSyncDelay
440
+ if c .endpointUpdatesBatchPeriod > delay {
441
+ delay = c .endpointUpdatesBatchPeriod
442
+ }
443
+ c .queue .AddAfter (key , delay )
413
444
}
414
445
415
446
func (c * Controller ) addPod (obj interface {}) {
0 commit comments