Skip to content

Commit 0405fcd

Browse files
committed
Address review comments
1 parent f333539 commit 0405fcd

File tree

1 file changed

+72
-54
lines changed
  • components/node-labeler/cmd

1 file changed

+72
-54
lines changed

components/node-labeler/cmd/run.go

Lines changed: 72 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@ import (
1212
"net/http"
1313
"strconv"
1414
"strings"
15-
"sync"
1615
"time"
1716

1817
"github.com/bombsimon/logrusr/v2"
@@ -135,7 +134,9 @@ var runCmd = &cobra.Command{
135134
log.WithError(err).Fatal("unable to bind node scaledown annotation controller")
136135
}
137136

138-
err = mgr.Add(manager.RunnableFunc(func(context.Context) error {
137+
err = mgr.Add(manager.RunnableFunc(func(ctx context.Context) error {
138+
<-ctx.Done()
139+
log.Info("Received shutdown signal - stopping NodeScaledownAnnotationController")
139140
nsac.Stop()
140141
return nil
141142
}))
@@ -288,21 +289,22 @@ func (r *PodReconciler) Reconcile(ctx context.Context, req reconcile.Request) (r
288289

289290
type NodeScaledownAnnotationController struct {
290291
client.Client
291-
nodesToReconcile chan string
292-
stopChan chan struct{}
293-
nodeReconcileLock sync.Map
292+
nodesToReconcile chan string
293+
stopChan chan struct{}
294294
}
295295

296296
func NewNodeScaledownAnnotationController(client client.Client) (*NodeScaledownAnnotationController, error) {
297-
return &NodeScaledownAnnotationController{
297+
controller := &NodeScaledownAnnotationController{
298298
Client: client,
299299
nodesToReconcile: make(chan string, 1000),
300300
stopChan: make(chan struct{}),
301-
}, nil
301+
}
302+
303+
return controller, nil
302304
}
303305

304306
func (c *NodeScaledownAnnotationController) SetupWithManager(mgr ctrl.Manager) error {
305-
// Start the periodic reconciliation goroutine
307+
go c.reconciliationWorker()
306308
go c.periodicReconciliation()
307309

308310
return ctrl.NewControllerManagedBy(mgr).
@@ -312,6 +314,7 @@ func (c *NodeScaledownAnnotationController) SetupWithManager(mgr ctrl.Manager) e
312314
Complete(c)
313315
}
314316

317+
// periodicReconciliation periodically reconciles all nodes in the cluster
315318
func (c *NodeScaledownAnnotationController) periodicReconciliation() {
316319
ticker := time.NewTicker(5 * time.Minute)
317320
defer ticker.Stop()
@@ -330,6 +333,23 @@ func (c *NodeScaledownAnnotationController) periodicReconciliation() {
330333
}
331334
}
332335

336+
// reconciliationWorker consumes nodesToReconcile and reconciles each node
337+
func (c *NodeScaledownAnnotationController) reconciliationWorker() {
338+
log.Info("reconciliation worker started")
339+
for {
340+
select {
341+
case nodeName := <-c.nodesToReconcile:
342+
ctx := context.Background()
343+
if err := c.reconcileNode(ctx, nodeName); err != nil {
344+
log.WithError(err).WithField("node", nodeName).Error("failed to reconcile node from queue")
345+
}
346+
case <-c.stopChan:
347+
log.Info("reconciliation worker stopping")
348+
return
349+
}
350+
}
351+
}
352+
333353
func (c *NodeScaledownAnnotationController) workspaceFilter() predicate.Predicate {
334354
return predicate.Funcs{
335355
CreateFunc: func(e event.CreateEvent) bool {
@@ -344,8 +364,13 @@ func (c *NodeScaledownAnnotationController) workspaceFilter() predicate.Predicat
344364
UpdateFunc: func(e event.UpdateEvent) bool {
345365
wsOld := e.ObjectOld.(*workspacev1.Workspace)
346366
ws := e.ObjectNew.(*workspacev1.Workspace)
347-
// if we haven't seen runtime info before and now it's there, let's reconcile
348-
if wsOld.Status.Runtime == nil && ws.Status.Runtime != nil && ws.Status.Runtime.NodeName != "" {
367+
// if we haven't seen runtime info before and now it's there, let's reconcile.
368+
// similarly, if the node name changed, we need to reconcile the old node as well.
369+
if (wsOld.Status.Runtime == nil && ws.Status.Runtime != nil && ws.Status.Runtime.NodeName != "") || // we just got runtime info
370+
(wsOld.Status.Runtime != nil && ws.Status.Runtime != nil && wsOld.Status.Runtime.NodeName != ws.Status.Runtime.NodeName) { // node name changed
371+
if wsOld.Status.Runtime != nil && wsOld.Status.Runtime.NodeName != "" {
372+
c.queueNodeForReconciliation(wsOld.Status.Runtime.NodeName)
373+
}
349374
return true
350375
}
351376

@@ -354,33 +379,26 @@ func (c *NodeScaledownAnnotationController) workspaceFilter() predicate.Predicat
354379
DeleteFunc: func(e event.DeleteEvent) bool {
355380
ws := e.Object.(*workspacev1.Workspace)
356381
if ws.Status.Runtime != nil && ws.Status.Runtime.NodeName != "" {
357-
// Queue the node for reconciliation
358-
select {
359-
case c.nodesToReconcile <- ws.Status.Runtime.NodeName:
360-
log.WithField("node", ws.Status.Runtime.NodeName).Info("queued node for reconciliation from delete")
361-
default:
362-
log.WithField("node", ws.Status.Runtime.NodeName).Warn("reconciliation queue full")
363-
}
364-
NodeScaledownAnnotationReconciliationQueueSize.Set(float64(len(c.nodesToReconcile)))
382+
c.queueNodeForReconciliation(ws.Status.Runtime.NodeName)
365383
return true
366384
}
367385
return false
368386
},
369387
}
370388
}
371389

372-
func (c *NodeScaledownAnnotationController) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
373-
log.WithField("request", req.NamespacedName.String()).Info("WorkspaceCountController reconciling")
374-
375-
// Process any queued nodes first, logging errors (not returning)
390+
func (c *NodeScaledownAnnotationController) queueNodeForReconciliation(nodeName string) {
376391
select {
377-
case nodeName := <-c.nodesToReconcile:
378-
if err := c.reconcileNode(ctx, nodeName); err != nil {
379-
log.WithError(err).WithField("node", nodeName).Error("failed to reconcile node from queue")
380-
}
392+
case c.nodesToReconcile <- nodeName:
393+
log.WithField("node", nodeName).Info("queued node for reconciliation")
381394
default:
382-
// No nodes in queue, continue with regular reconciliation
395+
log.WithField("node", nodeName).Warn("reconciliation queue full")
383396
}
397+
NodeScaledownAnnotationReconciliationQueueSize.Set(float64(len(c.nodesToReconcile)))
398+
}
399+
400+
func (c *NodeScaledownAnnotationController) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
401+
log.WithField("request", req.NamespacedName.String()).Info("WorkspaceCountController reconciling")
384402

385403
var ws workspacev1.Workspace
386404
if err := c.Get(ctx, req.NamespacedName, &ws); err != nil {
@@ -392,10 +410,7 @@ func (c *NodeScaledownAnnotationController) Reconcile(ctx context.Context, req c
392410
}
393411

394412
if ws.Status.Runtime != nil && ws.Status.Runtime.NodeName != "" {
395-
if err := c.reconcileNode(ctx, ws.Status.Runtime.NodeName); err != nil {
396-
log.WithError(err).WithField("node", ws.Status.Runtime.NodeName).Error("failed to reconcile node")
397-
return ctrl.Result{}, err
398-
}
413+
c.queueNodeForReconciliation(ws.Status.Runtime.NodeName)
399414
}
400415

401416
return ctrl.Result{}, nil
@@ -406,35 +421,24 @@ func (wc *NodeScaledownAnnotationController) Stop() {
406421
close(wc.stopChan)
407422
}
408423

409-
// reconcileAllNodes lists all nodes and reconciles each one
410-
func (wc *NodeScaledownAnnotationController) reconcileAllNodes(ctx context.Context) (ctrl.Result, error) {
424+
func (c *NodeScaledownAnnotationController) reconcileAllNodes(ctx context.Context) (ctrl.Result, error) {
411425
timer := prometheus.NewTimer(NodeScaledownAnnotationReconcileDuration.WithLabelValues("all_nodes"))
412426
defer timer.ObserveDuration()
413427

414428
var nodes corev1.NodeList
415-
if err := wc.List(ctx, &nodes); err != nil {
429+
if err := c.List(ctx, &nodes); err != nil {
416430
log.WithError(err).Error("failed to list nodes")
417431
return ctrl.Result{}, err
418432
}
419433

420434
for _, node := range nodes.Items {
421-
if err := wc.reconcileNode(ctx, node.Name); err != nil {
422-
log.WithError(err).WithField("node", node.Name).Error("failed to reconcile node")
423-
continue
424-
}
435+
c.queueNodeForReconciliation(node.Name)
425436
}
426437

427438
return ctrl.Result{}, nil
428439
}
429440

430-
// reconcileNode counts the workspaces running on a node and updates the autoscaler annotation accordingly
431441
func (c *NodeScaledownAnnotationController) reconcileNode(ctx context.Context, nodeName string) error {
432-
mutexInterface, _ := c.nodeReconcileLock.LoadOrStore(nodeName, &sync.Mutex{})
433-
mutex := mutexInterface.(*sync.Mutex)
434-
435-
mutex.Lock()
436-
defer mutex.Unlock()
437-
438442
timer := prometheus.NewTimer(NodeScaledownAnnotationReconcileDuration.WithLabelValues("node"))
439443
defer timer.ObserveDuration()
440444

@@ -444,6 +448,7 @@ func (c *NodeScaledownAnnotationController) reconcileNode(ctx context.Context, n
444448
}); err != nil {
445449
return fmt.Errorf("failed to list workspaces: %w", err)
446450
}
451+
447452
log.WithField("node", nodeName).WithField("count", len(workspaceList.Items)).Info("acting on workspaces")
448453
count := len(workspaceList.Items)
449454

@@ -461,19 +466,32 @@ func (c *NodeScaledownAnnotationController) updateNodeAnnotation(ctx context.Con
461466
return fmt.Errorf("obtaining node %s: %w", nodeName, err)
462467
}
463468

464-
if node.Annotations == nil {
465-
node.Annotations = make(map[string]string)
469+
shouldDisableScaleDown := count > 0
470+
currentlyDisabled := false
471+
if val, exists := node.Annotations["cluster-autoscaler.kubernetes.io/scale-down-disabled"]; exists {
472+
currentlyDisabled = val == "true"
466473
}
467474

468-
if count > 0 {
469-
node.Annotations["cluster-autoscaler.kubernetes.io/scale-down-disabled"] = "true"
470-
log.WithField("nodeName", nodeName).Info("disabling scale-down for node")
471-
} else {
472-
delete(node.Annotations, "cluster-autoscaler.kubernetes.io/scale-down-disabled")
473-
log.WithField("nodeName", nodeName).Info("enabling scale-down for node")
475+
// Only update if the state needs to change
476+
if shouldDisableScaleDown != currentlyDisabled {
477+
if node.Annotations == nil {
478+
node.Annotations = make(map[string]string)
479+
}
480+
481+
if shouldDisableScaleDown {
482+
node.Annotations["cluster-autoscaler.kubernetes.io/scale-down-disabled"] = "true"
483+
log.WithField("nodeName", nodeName).Info("disabling scale-down for node")
484+
} else {
485+
delete(node.Annotations, "cluster-autoscaler.kubernetes.io/scale-down-disabled")
486+
log.WithField("nodeName", nodeName).Info("enabling scale-down for node")
487+
}
488+
489+
return c.Update(ctx, &node)
474490
}
475491

476-
return c.Update(ctx, &node)
492+
log.WithField("nodeName", nodeName).Info("skipped updating node annotation: no change needed")
493+
494+
return nil
477495
})
478496
}
479497

0 commit comments

Comments
 (0)