Skip to content
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions components/node-labeler/BUILD.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,16 @@ packages:
- "**/*.go"
- "go.mod"
- "go.sum"
- "crd/*.yaml"
deps:
- components/common-go:lib
- components/ws-manager-api/go:lib
- components/ws-manager-mk2:crd
env:
- CGO_ENABLED=0
- GOOS=linux
prep:
- ["mv", "_deps/components-ws-manager-mk2--crd/workspace.gitpod.io_workspaces.yaml", "crd/workspace.gitpod.io_workspaces.yaml"]
config:
packaging: app
buildCommand: ["go", "build", "-trimpath", "-ldflags", "-buildid= -w -s -X 'github.com/gitpod-io/gitpod/node-labeler/cmd.Version=commit-${__git_commit}'"]
Expand All @@ -36,3 +41,4 @@ packages:
- "go.sum"
deps:
- components/common-go:lib
- components/ws-manager-api/go:lib
2 changes: 1 addition & 1 deletion components/node-labeler/cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ var (
// rootCmd represents the base command when called without any subcommands
var rootCmd = &cobra.Command{
Use: ServiceName,
Short: "node-labeler is in charge of maintining the node labels that workspaces require to run in a node",
Short: "node-labeler is in charge of maintaining the node labels that workspaces require to run in a node",
PersistentPreRun: func(cmd *cobra.Command, args []string) {
log.Init(ServiceName, Version, jsonLog, verbose)
},
Expand Down
167 changes: 162 additions & 5 deletions components/node-labeler/cmd/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"time"

"github.com/bombsimon/logrusr/v2"
workspacev1 "github.com/gitpod-io/gitpod/ws-manager/api/crd/v1"
"github.com/spf13/cobra"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
Expand All @@ -31,6 +32,7 @@ import (
"sigs.k8s.io/controller-runtime/pkg/cache"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/healthz"
"sigs.k8s.io/controller-runtime/pkg/metrics"
metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server"
Expand Down Expand Up @@ -78,16 +80,16 @@ var runCmd = &cobra.Command{
LeaderElectionID: "node-labeler.gitpod.io",
})
if err != nil {
log.WithError(err).Fatal("unable to start node-labeber")
log.WithError(err).Fatal("unable to start node-labeler")
}

client, err := client.New(ctrl.GetConfigOrDie(), client.Options{})
kClient, err := client.New(ctrl.GetConfigOrDie(), client.Options{})
if err != nil {
log.WithError(err).Fatal("unable to create client")
}

r := &PodReconciler{
client,
kClient,
}

componentPredicate, err := predicate.LabelSelectorPredicate(metav1.LabelSelector{
Expand All @@ -110,6 +112,15 @@ var runCmd = &cobra.Command{
log.WithError(err).Fatal("unable to bind controller watch event handler")
}

wc, err := NewWorkspaceCountController(mgr.GetClient())
if err != nil {
log.WithError(err).Fatal("unable to create workspace count controller")
}
err = wc.SetupWithManager(mgr)
if err != nil {
log.WithError(err).Fatal("unable to bind workspace count controller")
}

metrics.Registry.MustRegister(NodeLabelerCounterVec)
metrics.Registry.MustRegister(NodeLabelerTimeHistVec)

Expand All @@ -123,10 +134,10 @@ var runCmd = &cobra.Command{
log.WithError(err).Fatal("unable to set up ready check")
}

log.Info("starting node-labeber")
log.Info("starting node-labeler")
err = mgr.Start(ctrl.SetupSignalHandler())
if err != nil {
log.WithError(err).Fatal("problem running node-labeber")
log.WithError(err).Fatal("problem running node-labeler")
}

log.Info("Received SIGINT - shutting down")
Expand All @@ -135,6 +146,8 @@ var runCmd = &cobra.Command{

func init() {
utilruntime.Must(clientgoscheme.AddToScheme(scheme))
utilruntime.Must(workspacev1.AddToScheme(scheme))

rootCmd.AddCommand(runCmd)
}

Expand Down Expand Up @@ -249,6 +262,150 @@ func (r *PodReconciler) Reconcile(ctx context.Context, req reconcile.Request) (r
return reconcile.Result{}, nil
}

type WorkspaceCountController struct {
client.Client
}

func NewWorkspaceCountController(client client.Client) (*WorkspaceCountController, error) {
return &WorkspaceCountController{
Client: client,
}, nil
}

func (wc *WorkspaceCountController) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
Named("workspace-count").
For(&workspacev1.Workspace{}).
WithEventFilter(workspaceFilter()).
Complete(wc)
}

func workspaceFilter() predicate.Predicate {
return predicate.Funcs{
CreateFunc: func(e event.CreateEvent) bool {
ws := e.Object.(*workspacev1.Workspace)
if ws.Status.Runtime == nil {
log.WithField("workspace", ws.Name).Info("workspace not ready yet")
return false
}

return ws.Status.Runtime != nil && ws.Status.Runtime.NodeName != ""
},
UpdateFunc: func(e event.UpdateEvent) bool {
wsOld := e.ObjectOld.(*workspacev1.Workspace)
ws := e.ObjectNew.(*workspacev1.Workspace)
if wsOld.Status.Runtime == nil && ws.Status.Runtime != nil {
return true
}

// if we've seen runtime info before, there's no need to reconcile again
return false
},
DeleteFunc: func(e event.DeleteEvent) bool {
ws := e.Object.(*workspacev1.Workspace)
return ws.Status.Runtime != nil && ws.Status.Runtime.NodeName != ""
},
}
}

func (wc *WorkspaceCountController) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
log.WithField("request", req.NamespacedName.String()).Info("WorkspaceCountController reconciling")

var ws workspacev1.Workspace
if err := wc.Get(ctx, req.NamespacedName, &ws); err != nil {
if !errors.IsNotFound(err) {
log.WithError(err).WithField("workspace", req.NamespacedName).Error("unable to fetch Workspace")
return ctrl.Result{}, err
}
// If workspace not found, do a full reconciliation
log.WithField("workspace", req.NamespacedName).Info("Workspace not found, reconciling all nodes")
return wc.reconcileAllNodes(ctx)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Blocking:
Is return wc.reconcileAllNodes(ctx) necessary?

I ask because I would expect this controller to evaluate all workspaces every 2m (the sync period for the controller's manager).

It seems potentially unnecessary to me. Can you run a test, where the none of the events trigger (return false), and then check to see if the reconcile function triggers in 2m?

Copy link
Member Author

@filiptronicek filiptronicek Jan 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was going off of @iQQBot's comment here, where he mentioned listing nodes and workspaces often shouldn't be an issue.

To answer your question: because we reconcile at least every 2m, this reconcileAllNodes call is not strictly necessary, it just makes us faster to respond to changes - i.e. if we roll out this change without it, it will add up to 2 extra minutes of keeping every node around that just lost its last workspace pod.

... and it also helps with testing 😄

}

if ws.Status.Runtime != nil && ws.Status.Runtime.NodeName != "" {
var workspaceList workspacev1.WorkspaceList
if err := wc.List(ctx, &workspaceList); err != nil {
log.WithError(err).Error("failed to list workspaces")
return ctrl.Result{}, err
}

count := 0
nodeName := ws.Status.Runtime.NodeName
for _, ws := range workspaceList.Items {
if ws.Status.Runtime != nil &&
ws.Status.Runtime.NodeName == nodeName &&
ws.DeletionTimestamp.IsZero() {
count++
}
}

if err := wc.updateNodeAnnotation(ctx, nodeName, count); err != nil {
return ctrl.Result{}, err
}
log.WithField("node", nodeName).WithField("count", count).Info("updated node annotation")
}

return ctrl.Result{}, nil
}

func (wc *WorkspaceCountController) reconcileAllNodes(ctx context.Context) (ctrl.Result, error) {
var workspaceList workspacev1.WorkspaceList
if err := wc.List(ctx, &workspaceList); err != nil {
log.WithError(err).Error("failed to list workspaces")
return ctrl.Result{}, err
}

workspaceCounts := make(map[string]int)
for _, ws := range workspaceList.Items {
if ws.Status.Runtime != nil &&
ws.Status.Runtime.NodeName != "" &&
ws.DeletionTimestamp.IsZero() {
workspaceCounts[ws.Status.Runtime.NodeName]++
}
}

var nodes corev1.NodeList
if err := wc.List(ctx, &nodes); err != nil {
log.WithError(err).Error("failed to list nodes")
return ctrl.Result{}, err
}

for _, node := range nodes.Items {
count := workspaceCounts[node.Name]
if err := wc.updateNodeAnnotation(ctx, node.Name, count); err != nil {
log.WithError(err).WithField("node", node.Name).Error("failed to update node")
continue
}
log.WithField("node", node.Name).WithField("count", count).Info("updated node annotation")
}

return ctrl.Result{}, nil
}

func (wc *WorkspaceCountController) updateNodeAnnotation(ctx context.Context, nodeName string, count int) error {
return retry.RetryOnConflict(retry.DefaultBackoff, func() error {
var node corev1.Node
err := wc.Get(ctx, types.NamespacedName{Name: nodeName}, &node)
if err != nil {
return fmt.Errorf("obtaining node %s: %w", nodeName, err)
}

if node.Annotations == nil {
node.Annotations = make(map[string]string)
}

if count > 0 {
node.Annotations["cluster-autoscaler.kubernetes.io/scale-down-disabled"] = "true"
log.WithField("nodeName", nodeName).Info("disabling scale-down for node")
} else {
delete(node.Annotations, "cluster-autoscaler.kubernetes.io/scale-down-disabled")
log.WithField("nodeName", nodeName).Info("enabling scale-down for node")
}

return wc.Update(ctx, &node)
})
}

func updateLabel(label string, add bool, nodeName string, client client.Client) error {
return retry.RetryOnConflict(retry.DefaultBackoff, func() error {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
Expand Down
Loading
Loading