Skip to content
Merged
Show file tree
Hide file tree
Changes from 17 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions components/node-labeler/BUILD.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,16 @@ packages:
- "**/*.go"
- "go.mod"
- "go.sum"
- "crd/*.yaml"
deps:
- components/common-go:lib
- components/ws-manager-api/go:lib
- components/ws-manager-mk2:crd
env:
- CGO_ENABLED=0
- GOOS=linux
prep:
- ["mv", "_deps/components-ws-manager-mk2--crd/workspace.gitpod.io_workspaces.yaml", "crd/workspace.gitpod.io_workspaces.yaml"]
config:
packaging: app
buildCommand: ["go", "build", "-trimpath", "-ldflags", "-buildid= -w -s -X 'github.com/gitpod-io/gitpod/node-labeler/cmd.Version=commit-${__git_commit}'"]
Expand All @@ -34,5 +39,10 @@ packages:
- "**/*.go"
- "go.mod"
- "go.sum"
- "crd/*.yaml"
deps:
- components/common-go:lib
- components/ws-manager-api/go:lib
- components/ws-manager-mk2:crd
prep:
- ["mv", "_deps/components-ws-manager-mk2--crd/workspace.gitpod.io_workspaces.yaml", "crd/workspace.gitpod.io_workspaces.yaml"]
17 changes: 17 additions & 0 deletions components/node-labeler/cmd/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,4 +26,21 @@ var (
Help: "time it took for a pods to reach the running phase and the ready label was applied to the node",
Buckets: []float64{5, 10, 15, 20, 25, 30, 45, 60, 75},
}, []string{"component"})

// Track reconciliation durations for the NodeScaledownAnnotationController
NodeScaledownAnnotationReconcileDuration = prometheus.NewHistogramVec(prometheus.HistogramOpts{
Namespace: metricsNamespace,
Subsystem: metricsWorkspaceSubsystem,
Name: "node_scaledown_annotation_reconcile_duration_seconds",
Help: "Duration of NodeScaledownAnnotationController reconciliations",
Buckets: []float64{0.1, 0.5, 1, 2.5, 5, 10, 30},
}, []string{"operation"})

// Track queue size for the NodeScaledownAnnotationController
NodeScaledownAnnotationReconciliationQueueSize = prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: metricsNamespace,
Subsystem: metricsWorkspaceSubsystem,
Name: "node_scaledown_annotation_reconciliation_queue_size",
Help: "Current size of the NodeScaledownAnnotationController reconciliation queue",
})
)
2 changes: 1 addition & 1 deletion components/node-labeler/cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ var (
// rootCmd represents the base command when called without any subcommands
var rootCmd = &cobra.Command{
Use: ServiceName,
Short: "node-labeler is in charge of maintining the node labels that workspaces require to run in a node",
Short: "node-labeler is in charge of maintaining the node labels that workspaces require to run in a node",
PersistentPreRun: func(cmd *cobra.Command, args []string) {
log.Init(ServiceName, Version, jsonLog, verbose)
},
Expand Down
238 changes: 233 additions & 5 deletions components/node-labeler/cmd/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,12 @@ import (
"net/http"
"strconv"
"strings"
"sync"
"time"

"github.com/bombsimon/logrusr/v2"
workspacev1 "github.com/gitpod-io/gitpod/ws-manager/api/crd/v1"
"github.com/prometheus/client_golang/prometheus"
"github.com/spf13/cobra"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
Expand All @@ -31,7 +34,9 @@ import (
"sigs.k8s.io/controller-runtime/pkg/cache"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/healthz"
"sigs.k8s.io/controller-runtime/pkg/manager"
"sigs.k8s.io/controller-runtime/pkg/metrics"
metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server"
"sigs.k8s.io/controller-runtime/pkg/predicate"
Expand Down Expand Up @@ -78,16 +83,16 @@ var runCmd = &cobra.Command{
LeaderElectionID: "node-labeler.gitpod.io",
})
if err != nil {
log.WithError(err).Fatal("unable to start node-labeber")
log.WithError(err).Fatal("unable to start node-labeler")
}

client, err := client.New(ctrl.GetConfigOrDie(), client.Options{})
kClient, err := client.New(ctrl.GetConfigOrDie(), client.Options{})
if err != nil {
log.WithError(err).Fatal("unable to create client")
}

r := &PodReconciler{
client,
kClient,
}

componentPredicate, err := predicate.LabelSelectorPredicate(metav1.LabelSelector{
Expand All @@ -110,8 +115,38 @@ var runCmd = &cobra.Command{
log.WithError(err).Fatal("unable to bind controller watch event handler")
}

if err := mgr.GetFieldIndexer().IndexField(context.Background(), &workspacev1.Workspace{}, "status.runtime.nodeName", func(o client.Object) []string {
ws := o.(*workspacev1.Workspace)
if ws.Status.Runtime == nil {
return nil
}
return []string{ws.Status.Runtime.NodeName}
}); err != nil {
log.WithError(err).Fatal("unable to create workspace indexer")
return
}

nsac, err := NewNodeScaledownAnnotationController(mgr.GetClient())
if err != nil {
log.WithError(err).Fatal("unable to create node scaledown annotation controller")
}
err = nsac.SetupWithManager(mgr)
if err != nil {
log.WithError(err).Fatal("unable to bind node scaledown annotation controller")
}

err = mgr.Add(manager.RunnableFunc(func(context.Context) error {
nsac.Stop()
return nil
}))
if err != nil {
log.WithError(err).Fatal("couldn't properly clean up node scaledown annotation controller")
}

metrics.Registry.MustRegister(NodeLabelerCounterVec)
metrics.Registry.MustRegister(NodeLabelerTimeHistVec)
metrics.Registry.MustRegister(NodeScaledownAnnotationReconcileDuration)
metrics.Registry.MustRegister(NodeScaledownAnnotationReconciliationQueueSize)

err = mgr.AddHealthzCheck("healthz", healthz.Ping)
if err != nil {
Expand All @@ -123,10 +158,10 @@ var runCmd = &cobra.Command{
log.WithError(err).Fatal("unable to set up ready check")
}

log.Info("starting node-labeber")
log.Info("starting node-labeler")
err = mgr.Start(ctrl.SetupSignalHandler())
if err != nil {
log.WithError(err).Fatal("problem running node-labeber")
log.WithError(err).Fatal("problem running node-labeler")
}

log.Info("Received SIGINT - shutting down")
Expand All @@ -135,6 +170,8 @@ var runCmd = &cobra.Command{

func init() {
utilruntime.Must(clientgoscheme.AddToScheme(scheme))
utilruntime.Must(workspacev1.AddToScheme(scheme))

rootCmd.AddCommand(runCmd)
}

Expand Down Expand Up @@ -249,6 +286,197 @@ func (r *PodReconciler) Reconcile(ctx context.Context, req reconcile.Request) (r
return reconcile.Result{}, nil
}

type NodeScaledownAnnotationController struct {
client.Client
nodesToReconcile chan string
stopChan chan struct{}
nodeReconcileLock sync.Map
}

func NewNodeScaledownAnnotationController(client client.Client) (*NodeScaledownAnnotationController, error) {
return &NodeScaledownAnnotationController{
Client: client,
nodesToReconcile: make(chan string, 1000),
stopChan: make(chan struct{}),
}, nil
}

func (c *NodeScaledownAnnotationController) SetupWithManager(mgr ctrl.Manager) error {
// Start the periodic reconciliation goroutine
go c.periodicReconciliation()

return ctrl.NewControllerManagedBy(mgr).
Named("node-scaledown-annotation-controller").
For(&workspacev1.Workspace{}).
WithEventFilter(c.workspaceFilter()).
Complete(c)
}

func (c *NodeScaledownAnnotationController) periodicReconciliation() {
ticker := time.NewTicker(5 * time.Minute)
defer ticker.Stop()

for {
select {
case <-ticker.C:
log.Info("starting periodic full reconciliation")
ctx := context.Background()
if _, err := c.reconcileAllNodes(ctx); err != nil {
log.WithError(err).Error("periodic reconciliation failed")
}
case <-c.stopChan:
return
}
}
}

func (c *NodeScaledownAnnotationController) workspaceFilter() predicate.Predicate {
return predicate.Funcs{
CreateFunc: func(e event.CreateEvent) bool {
ws := e.Object.(*workspacev1.Workspace)
if ws.Status.Runtime == nil {
log.WithField("workspace", ws.Name).Info("workspace not ready yet")
return false
}

return ws.Status.Runtime != nil && ws.Status.Runtime.NodeName != ""
},
UpdateFunc: func(e event.UpdateEvent) bool {
wsOld := e.ObjectOld.(*workspacev1.Workspace)
ws := e.ObjectNew.(*workspacev1.Workspace)
// if we haven't seen runtime info before and now it's there, let's reconcile
if wsOld.Status.Runtime == nil && ws.Status.Runtime != nil && ws.Status.Runtime.NodeName != "" {
return true
}

return false
},
DeleteFunc: func(e event.DeleteEvent) bool {
ws := e.Object.(*workspacev1.Workspace)
if ws.Status.Runtime != nil && ws.Status.Runtime.NodeName != "" {
// Queue the node for reconciliation
select {
case c.nodesToReconcile <- ws.Status.Runtime.NodeName:
log.WithField("node", ws.Status.Runtime.NodeName).Info("queued node for reconciliation from delete")
default:
log.WithField("node", ws.Status.Runtime.NodeName).Warn("reconciliation queue full")
}
NodeScaledownAnnotationReconciliationQueueSize.Set(float64(len(c.nodesToReconcile)))
return true
}
return false
},
}
}

func (c *NodeScaledownAnnotationController) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
log.WithField("request", req.NamespacedName.String()).Info("WorkspaceCountController reconciling")

// Process any queued nodes first, logging errors (not returning)
select {
case nodeName := <-c.nodesToReconcile:
if err := c.reconcileNode(ctx, nodeName); err != nil {
log.WithError(err).WithField("node", nodeName).Error("failed to reconcile node from queue")
}
default:
// No nodes in queue, continue with regular reconciliation
}

var ws workspacev1.Workspace
if err := c.Get(ctx, req.NamespacedName, &ws); err != nil {
if !errors.IsNotFound(err) {
log.WithError(err).WithField("workspace", req.NamespacedName).Error("unable to fetch Workspace")
return ctrl.Result{}, err
}
return ctrl.Result{}, nil
}

if ws.Status.Runtime != nil && ws.Status.Runtime.NodeName != "" {
if err := c.reconcileNode(ctx, ws.Status.Runtime.NodeName); err != nil {
log.WithError(err).WithField("node", ws.Status.Runtime.NodeName).Error("failed to reconcile node")
return ctrl.Result{}, err
}
}

return ctrl.Result{}, nil
}

// Cleanup method to be called when shutting down the controller
func (wc *NodeScaledownAnnotationController) Stop() {
close(wc.stopChan)
}

// reconcileAllNodes lists all nodes and reconciles each one
func (wc *NodeScaledownAnnotationController) reconcileAllNodes(ctx context.Context) (ctrl.Result, error) {
timer := prometheus.NewTimer(NodeScaledownAnnotationReconcileDuration.WithLabelValues("all_nodes"))
defer timer.ObserveDuration()

var nodes corev1.NodeList
if err := wc.List(ctx, &nodes); err != nil {
log.WithError(err).Error("failed to list nodes")
return ctrl.Result{}, err
}

for _, node := range nodes.Items {
if err := wc.reconcileNode(ctx, node.Name); err != nil {
log.WithError(err).WithField("node", node.Name).Error("failed to reconcile node")
continue
}
}

return ctrl.Result{}, nil
}

// reconcileNode counts the workspaces running on a node and updates the autoscaler annotation accordingly
func (c *NodeScaledownAnnotationController) reconcileNode(ctx context.Context, nodeName string) error {
mutexInterface, _ := c.nodeReconcileLock.LoadOrStore(nodeName, &sync.Mutex{})
mutex := mutexInterface.(*sync.Mutex)

mutex.Lock()
defer mutex.Unlock()

timer := prometheus.NewTimer(NodeScaledownAnnotationReconcileDuration.WithLabelValues("node"))
defer timer.ObserveDuration()

var workspaceList workspacev1.WorkspaceList
if err := c.List(ctx, &workspaceList, client.MatchingFields{
"status.runtime.nodeName": nodeName,
}); err != nil {
return fmt.Errorf("failed to list workspaces: %w", err)
}
log.WithField("node", nodeName).WithField("count", len(workspaceList.Items)).Info("acting on workspaces")
count := len(workspaceList.Items)

return c.updateNodeAnnotation(ctx, nodeName, count)
}

func (c *NodeScaledownAnnotationController) updateNodeAnnotation(ctx context.Context, nodeName string, count int) error {
return retry.RetryOnConflict(retry.DefaultBackoff, func() error {
ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()

var node corev1.Node
err := c.Get(ctx, types.NamespacedName{Name: nodeName}, &node)
if err != nil {
return fmt.Errorf("obtaining node %s: %w", nodeName, err)
}

if node.Annotations == nil {
node.Annotations = make(map[string]string)
}

if count > 0 {
node.Annotations["cluster-autoscaler.kubernetes.io/scale-down-disabled"] = "true"
log.WithField("nodeName", nodeName).Info("disabling scale-down for node")
} else {
delete(node.Annotations, "cluster-autoscaler.kubernetes.io/scale-down-disabled")
log.WithField("nodeName", nodeName).Info("enabling scale-down for node")
}

return c.Update(ctx, &node)
})
}

func updateLabel(label string, add bool, nodeName string, client client.Client) error {
return retry.RetryOnConflict(retry.DefaultBackoff, func() error {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
Expand Down
Loading
Loading