Skip to content

Commit db827e6

Browse files
committed
Add a new workqueue to endpointslice controller for updating topology
cache and checking node topology distribution.
1 parent 548d50d commit db827e6

File tree

2 files changed

+78
-44
lines changed

2 files changed

+78
-44
lines changed

pkg/controller/endpointslice/endpointslice_controller.go

Lines changed: 63 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,9 @@ const (
7575
// controllerName is a unique value used with LabelManagedBy to indicated
7676
// the component managing an EndpointSlice.
7777
controllerName = "endpointslice-controller.k8s.io"
78+
79+
// topologyQueueItemKey is the key for all items in the topologyQueue.
80+
topologyQueueItemKey = "topologyQueueItemKey"
7881
)
7982

8083
// NewController creates and initializes a new Controller
@@ -99,7 +102,7 @@ func NewController(ctx context.Context, podInformer coreinformers.PodInformer,
99102
// such as an update to a Service or Deployment. A more significant
100103
// rate limit back off here helps ensure that the Controller does not
101104
// overwhelm the API Server.
102-
queue: workqueue.NewTypedRateLimitingQueueWithConfig(
105+
serviceQueue: workqueue.NewTypedRateLimitingQueueWithConfig(
103106
workqueue.NewTypedMaxOfRateLimiter(
104107
workqueue.NewTypedItemExponentialFailureRateLimiter[string](defaultSyncBackOff, maxSyncBackOff),
105108
// 10 qps, 100 bucket size. This is only for retry speed and its
@@ -110,6 +113,9 @@ func NewController(ctx context.Context, podInformer coreinformers.PodInformer,
110113
Name: "endpoint_slice",
111114
},
112115
),
116+
topologyQueue: workqueue.NewTypedRateLimitingQueue[string](
117+
workqueue.DefaultTypedControllerRateLimiter[string](),
118+
),
113119
workerLoopPeriod: time.Second,
114120
}
115121

@@ -158,14 +164,14 @@ func NewController(ctx context.Context, podInformer coreinformers.PodInformer,
158164

159165
if utilfeature.DefaultFeatureGate.Enabled(features.TopologyAwareHints) {
160166
nodeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
161-
AddFunc: func(obj interface{}) {
162-
c.addNode(logger, obj)
167+
AddFunc: func(_ interface{}) {
168+
c.addNode()
163169
},
164170
UpdateFunc: func(oldObj, newObj interface{}) {
165-
c.updateNode(logger, oldObj, newObj)
171+
c.updateNode(oldObj, newObj)
166172
},
167-
DeleteFunc: func(obj interface{}) {
168-
c.deleteNode(logger, obj)
173+
DeleteFunc: func(_ interface{}) {
174+
c.deleteNode()
169175
},
170176
})
171177

@@ -236,7 +242,11 @@ type Controller struct {
236242
// more often than services with few pods; it also would cause a
237243
// service that's inserted multiple times to be processed more than
238244
// necessary.
239-
queue workqueue.TypedRateLimitingInterface[string]
245+
serviceQueue workqueue.TypedRateLimitingInterface[string]
246+
247+
// topologyQueue is used to trigger a topology cache update and checking node
248+
// topology distribution.
249+
topologyQueue workqueue.TypedRateLimitingInterface[string]
240250

241251
// maxEndpointsPerSlice references the maximum number of endpoints that
242252
// should be added to an EndpointSlice
@@ -264,7 +274,8 @@ func (c *Controller) Run(ctx context.Context, workers int) {
264274
c.eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: c.client.CoreV1().Events("")})
265275
defer c.eventBroadcaster.Shutdown()
266276

267-
defer c.queue.ShutDown()
277+
defer c.serviceQueue.ShutDown()
278+
defer c.topologyQueue.ShutDown()
268279

269280
logger := klog.FromContext(ctx)
270281
logger.Info("Starting endpoint slice controller")
@@ -274,52 +285,69 @@ func (c *Controller) Run(ctx context.Context, workers int) {
274285
return
275286
}
276287

277-
logger.V(2).Info("Starting worker threads", "total", workers)
288+
logger.V(2).Info("Starting service queue worker threads", "total", workers)
278289
for i := 0; i < workers; i++ {
279-
go wait.Until(func() { c.worker(logger) }, c.workerLoopPeriod, ctx.Done())
290+
go wait.Until(func() { c.serviceQueueWorker(logger) }, c.workerLoopPeriod, ctx.Done())
280291
}
292+
logger.V(2).Info("Starting topology queue worker threads", "total", 1)
293+
go wait.Until(func() { c.topologyQueueWorker(logger) }, c.workerLoopPeriod, ctx.Done())
281294

282295
<-ctx.Done()
283296
}
284297

285-
// worker runs a worker thread that just dequeues items, processes them, and
286-
// marks them done. You may run as many of these in parallel as you wish; the
287-
// workqueue guarantees that they will not end up processing the same service
288-
// at the same time
289-
func (c *Controller) worker(logger klog.Logger) {
290-
for c.processNextWorkItem(logger) {
298+
// serviceQueueWorker runs a worker thread that just dequeues items, processes
299+
// them, and marks them done. You may run as many of these in parallel as you
300+
// wish; the workqueue guarantees that they will not end up processing the same
301+
// service at the same time
302+
func (c *Controller) serviceQueueWorker(logger klog.Logger) {
303+
for c.processNextServiceWorkItem(logger) {
291304
}
292305
}
293306

294-
func (c *Controller) processNextWorkItem(logger klog.Logger) bool {
295-
cKey, quit := c.queue.Get()
307+
func (c *Controller) processNextServiceWorkItem(logger klog.Logger) bool {
308+
cKey, quit := c.serviceQueue.Get()
296309
if quit {
297310
return false
298311
}
299-
defer c.queue.Done(cKey)
312+
defer c.serviceQueue.Done(cKey)
300313

301314
err := c.syncService(logger, cKey)
302315
c.handleErr(logger, err, cKey)
303316

304317
return true
305318
}
306319

320+
func (c *Controller) topologyQueueWorker(logger klog.Logger) {
321+
for c.processNextTopologyWorkItem(logger) {
322+
}
323+
}
324+
325+
func (c *Controller) processNextTopologyWorkItem(logger klog.Logger) bool {
326+
key, quit := c.topologyQueue.Get()
327+
if quit {
328+
return false
329+
}
330+
defer c.topologyQueue.Done(key)
331+
c.checkNodeTopologyDistribution(logger)
332+
return true
333+
}
334+
307335
func (c *Controller) handleErr(logger klog.Logger, err error, key string) {
308336
trackSync(err)
309337

310338
if err == nil {
311-
c.queue.Forget(key)
339+
c.serviceQueue.Forget(key)
312340
return
313341
}
314342

315-
if c.queue.NumRequeues(key) < maxRetries {
343+
if c.serviceQueue.NumRequeues(key) < maxRetries {
316344
logger.Info("Error syncing endpoint slices for service, retrying", "key", key, "err", err)
317-
c.queue.AddRateLimited(key)
345+
c.serviceQueue.AddRateLimited(key)
318346
return
319347
}
320348

321349
logger.Info("Retry budget exceeded, dropping service out of the queue", "key", key, "err", err)
322-
c.queue.Forget(key)
350+
c.serviceQueue.Forget(key)
323351
utilruntime.HandleError(err)
324352
}
325353

@@ -416,7 +444,7 @@ func (c *Controller) onServiceUpdate(obj interface{}) {
416444
return
417445
}
418446

419-
c.queue.Add(key)
447+
c.serviceQueue.Add(key)
420448
}
421449

422450
// onServiceDelete removes the Service Selector from the cache and queues the Service for processing.
@@ -427,7 +455,7 @@ func (c *Controller) onServiceDelete(obj interface{}) {
427455
return
428456
}
429457

430-
c.queue.Add(key)
458+
c.serviceQueue.Add(key)
431459
}
432460

433461
// onEndpointSliceAdd queues a sync for the relevant Service for a sync if the
@@ -500,7 +528,7 @@ func (c *Controller) queueServiceForEndpointSlice(endpointSlice *discovery.Endpo
500528
if c.endpointUpdatesBatchPeriod > delay {
501529
delay = c.endpointUpdatesBatchPeriod
502530
}
503-
c.queue.AddAfter(key, delay)
531+
c.serviceQueue.AddAfter(key, delay)
504532
}
505533

506534
func (c *Controller) addPod(obj interface{}) {
@@ -511,14 +539,14 @@ func (c *Controller) addPod(obj interface{}) {
511539
return
512540
}
513541
for key := range services {
514-
c.queue.AddAfter(key, c.endpointUpdatesBatchPeriod)
542+
c.serviceQueue.AddAfter(key, c.endpointUpdatesBatchPeriod)
515543
}
516544
}
517545

518546
func (c *Controller) updatePod(old, cur interface{}) {
519547
services := endpointsliceutil.GetServicesToUpdateOnPodChange(c.serviceLister, old, cur)
520548
for key := range services {
521-
c.queue.AddAfter(key, c.endpointUpdatesBatchPeriod)
549+
c.serviceQueue.AddAfter(key, c.endpointUpdatesBatchPeriod)
522550
}
523551
}
524552

@@ -531,24 +559,24 @@ func (c *Controller) deletePod(obj interface{}) {
531559
}
532560
}
533561

534-
func (c *Controller) addNode(logger klog.Logger, obj interface{}) {
535-
c.checkNodeTopologyDistribution(logger)
562+
func (c *Controller) addNode() {
563+
c.topologyQueue.Add(topologyQueueItemKey)
536564
}
537565

538-
func (c *Controller) updateNode(logger klog.Logger, old, cur interface{}) {
566+
func (c *Controller) updateNode(old, cur interface{}) {
539567
oldNode := old.(*v1.Node)
540568
curNode := cur.(*v1.Node)
541569

542570
// LabelTopologyZone may be added by cloud provider asynchronously after the Node is created.
543571
// The topology cache should be updated in this case.
544572
if isNodeReady(oldNode) != isNodeReady(curNode) ||
545573
oldNode.Labels[v1.LabelTopologyZone] != curNode.Labels[v1.LabelTopologyZone] {
546-
c.checkNodeTopologyDistribution(logger)
574+
c.topologyQueue.Add(topologyQueueItemKey)
547575
}
548576
}
549577

550-
func (c *Controller) deleteNode(logger klog.Logger, obj interface{}) {
551-
c.checkNodeTopologyDistribution(logger)
578+
func (c *Controller) deleteNode() {
579+
c.topologyQueue.Add(topologyQueueItemKey)
552580
}
553581

554582
// checkNodeTopologyDistribution updates Nodes in the topology cache and then
@@ -566,7 +594,7 @@ func (c *Controller) checkNodeTopologyDistribution(logger klog.Logger) {
566594
serviceKeys := c.topologyCache.GetOverloadedServices()
567595
for _, serviceKey := range serviceKeys {
568596
logger.V(2).Info("Queuing Service after Node change due to overloading", "key", serviceKey)
569-
c.queue.Add(serviceKey)
597+
c.serviceQueue.Add(serviceKey)
570598
}
571599
}
572600

pkg/controller/endpointslice/endpointslice_controller_test.go

Lines changed: 15 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -539,18 +539,18 @@ func TestOnEndpointSliceUpdate(t *testing.T) {
539539
epSlice2 := epSlice1.DeepCopy()
540540
epSlice2.Labels[discovery.LabelManagedBy] = "something else"
541541

542-
assert.Equal(t, 0, esController.queue.Len())
542+
assert.Equal(t, 0, esController.serviceQueue.Len())
543543
esController.onEndpointSliceUpdate(logger, epSlice1, epSlice2)
544544
err := wait.PollImmediate(100*time.Millisecond, 3*time.Second, func() (bool, error) {
545-
if esController.queue.Len() > 0 {
545+
if esController.serviceQueue.Len() > 0 {
546546
return true, nil
547547
}
548548
return false, nil
549549
})
550550
if err != nil {
551551
t.Fatalf("unexpected error waiting for add to queue")
552552
}
553-
assert.Equal(t, 1, esController.queue.Len())
553+
assert.Equal(t, 1, esController.serviceQueue.Len())
554554
}
555555

556556
func TestSyncService(t *testing.T) {
@@ -1947,8 +1947,8 @@ func Test_checkNodeTopologyDistribution(t *testing.T) {
19471947
logger, _ := ktesting.NewTestContext(t)
19481948
esController.checkNodeTopologyDistribution(logger)
19491949

1950-
if esController.queue.Len() != tc.expectedQueueLen {
1951-
t.Errorf("Expected %d services to be queued, got %d", tc.expectedQueueLen, esController.queue.Len())
1950+
if esController.serviceQueue.Len() != tc.expectedQueueLen {
1951+
t.Errorf("Expected %d services to be queued, got %d", tc.expectedQueueLen, esController.serviceQueue.Len())
19521952
}
19531953
})
19541954
}
@@ -2007,8 +2007,11 @@ func TestUpdateNode(t *testing.T) {
20072007
logger, _ := ktesting.NewTestContext(t)
20082008
esController.nodeStore.Add(node1)
20092009
esController.nodeStore.Add(node2)
2010-
esController.addNode(logger, node1)
2011-
esController.addNode(logger, node2)
2010+
esController.addNode()
2011+
esController.addNode()
2012+
assert.Equal(t, 1, esController.topologyQueue.Len())
2013+
esController.processNextTopologyWorkItem(logger)
2014+
assert.Equal(t, 0, esController.topologyQueue.Len())
20122015
// The Nodes don't have the zone label, AddHints should fail.
20132016
_, _, eventsBuilders := esController.topologyCache.AddHints(logger, sliceInfo)
20142017
require.Len(t, eventsBuilders, 1)
@@ -2022,8 +2025,11 @@ func TestUpdateNode(t *testing.T) {
20222025
// After adding the zone label to the Nodes and calling the event handler updateNode, AddHints should succeed.
20232026
esController.nodeStore.Update(updateNode1)
20242027
esController.nodeStore.Update(updateNode2)
2025-
esController.updateNode(logger, node1, updateNode1)
2026-
esController.updateNode(logger, node2, updateNode2)
2028+
esController.updateNode(node1, updateNode1)
2029+
esController.updateNode(node2, updateNode2)
2030+
assert.Equal(t, 1, esController.topologyQueue.Len())
2031+
esController.processNextTopologyWorkItem(logger)
2032+
assert.Equal(t, 0, esController.topologyQueue.Len())
20272033
_, _, eventsBuilders = esController.topologyCache.AddHints(logger, sliceInfo)
20282034
require.Len(t, eventsBuilders, 1)
20292035
assert.Contains(t, eventsBuilders[0].Message, topologycache.TopologyAwareHintsEnabled)

0 commit comments

Comments
 (0)