Skip to content
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions components/node-labeler/BUILD.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,16 @@ packages:
- "**/*.go"
- "go.mod"
- "go.sum"
- "crd/*.yaml"
deps:
- components/common-go:lib
- components/ws-manager-api/go:lib
- components/ws-manager-mk2:crd
env:
- CGO_ENABLED=0
- GOOS=linux
prep:
- ["mv", "_deps/components-ws-manager-mk2--crd/workspace.gitpod.io_workspaces.yaml", "crd/workspace.gitpod.io_workspaces.yaml"]
config:
packaging: app
buildCommand: ["go", "build", "-trimpath", "-ldflags", "-buildid= -w -s -X 'github.com/gitpod-io/gitpod/node-labeler/cmd.Version=commit-${__git_commit}'"]
Expand All @@ -34,5 +39,10 @@ packages:
- "**/*.go"
- "go.mod"
- "go.sum"
- "crd/*.yaml"
deps:
- components/common-go:lib
- components/ws-manager-api/go:lib
- components/ws-manager-mk2:crd
prep:
- ["mv", "_deps/components-ws-manager-mk2--crd/workspace.gitpod.io_workspaces.yaml", "crd/workspace.gitpod.io_workspaces.yaml"]
2 changes: 1 addition & 1 deletion components/node-labeler/cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ var (
// rootCmd represents the base command when called without any subcommands
var rootCmd = &cobra.Command{
Use: ServiceName,
Short: "node-labeler is in charge of maintining the node labels that workspaces require to run in a node",
Short: "node-labeler is in charge of maintaining the node labels that workspaces require to run in a node",
PersistentPreRun: func(cmd *cobra.Command, args []string) {
log.Init(ServiceName, Version, jsonLog, verbose)
},
Expand Down
221 changes: 216 additions & 5 deletions components/node-labeler/cmd/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"time"

"github.com/bombsimon/logrusr/v2"
workspacev1 "github.com/gitpod-io/gitpod/ws-manager/api/crd/v1"
"github.com/spf13/cobra"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
Expand All @@ -31,7 +32,9 @@ import (
"sigs.k8s.io/controller-runtime/pkg/cache"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/healthz"
"sigs.k8s.io/controller-runtime/pkg/manager"
"sigs.k8s.io/controller-runtime/pkg/metrics"
metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server"
"sigs.k8s.io/controller-runtime/pkg/predicate"
Expand Down Expand Up @@ -78,16 +81,16 @@ var runCmd = &cobra.Command{
LeaderElectionID: "node-labeler.gitpod.io",
})
if err != nil {
log.WithError(err).Fatal("unable to start node-labeber")
log.WithError(err).Fatal("unable to start node-labeler")
}

client, err := client.New(ctrl.GetConfigOrDie(), client.Options{})
kClient, err := client.New(ctrl.GetConfigOrDie(), client.Options{})
if err != nil {
log.WithError(err).Fatal("unable to create client")
}

r := &PodReconciler{
client,
kClient,
}

componentPredicate, err := predicate.LabelSelectorPredicate(metav1.LabelSelector{
Expand All @@ -110,6 +113,34 @@ var runCmd = &cobra.Command{
log.WithError(err).Fatal("unable to bind controller watch event handler")
}

if err := mgr.GetFieldIndexer().IndexField(context.Background(), &workspacev1.Workspace{}, "status.runtime.nodeName", func(o client.Object) []string {
ws := o.(*workspacev1.Workspace)
if ws.Status.Runtime == nil {
return nil
}
return []string{ws.Status.Runtime.NodeName}
}); err != nil {
log.WithError(err).Fatal("unable to create workspace indexer")
return
}

wc, err := NewWorkspaceCountController(mgr.GetClient())
if err != nil {
log.WithError(err).Fatal("unable to create workspace count controller")
}
err = wc.SetupWithManager(mgr)
if err != nil {
log.WithError(err).Fatal("unable to bind workspace count controller")
}

err = mgr.Add(manager.RunnableFunc(func(context.Context) error {
wc.Stop()
return nil
}))
if err != nil {
log.WithError(err).Fatal("couldn't properly clean up WorkspaceCountController")
}

metrics.Registry.MustRegister(NodeLabelerCounterVec)
metrics.Registry.MustRegister(NodeLabelerTimeHistVec)

Expand All @@ -123,10 +154,10 @@ var runCmd = &cobra.Command{
log.WithError(err).Fatal("unable to set up ready check")
}

log.Info("starting node-labeber")
log.Info("starting node-labeler")
err = mgr.Start(ctrl.SetupSignalHandler())
if err != nil {
log.WithError(err).Fatal("problem running node-labeber")
log.WithError(err).Fatal("problem running node-labeler")
}

log.Info("Received SIGINT - shutting down")
Expand All @@ -135,6 +166,8 @@ var runCmd = &cobra.Command{

func init() {
utilruntime.Must(clientgoscheme.AddToScheme(scheme))
utilruntime.Must(workspacev1.AddToScheme(scheme))

rootCmd.AddCommand(runCmd)
}

Expand Down Expand Up @@ -249,6 +282,184 @@ func (r *PodReconciler) Reconcile(ctx context.Context, req reconcile.Request) (r
return reconcile.Result{}, nil
}

type WorkspaceCountController struct {
client.Client
nodesToReconcile chan string
stopChan chan struct{}
}

func NewWorkspaceCountController(client client.Client) (*WorkspaceCountController, error) {
return &WorkspaceCountController{
Client: client,
nodesToReconcile: make(chan string, 1000),
stopChan: make(chan struct{}),
}, nil
}

func (wc *WorkspaceCountController) SetupWithManager(mgr ctrl.Manager) error {
// Start the periodic reconciliation goroutine
go wc.periodicReconciliation()

return ctrl.NewControllerManagedBy(mgr).
Named("workspace-count").
For(&workspacev1.Workspace{}).
WithEventFilter(wc.workspaceFilter()).
Complete(wc)
}

func (wc *WorkspaceCountController) periodicReconciliation() {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We do a manual reconciliation every 5m. Why? Because SyncPeriod in the manager options doesn't seem to really be doing what I expected it to do - I didn't see the reconciliation trigger via an external trigger even once during testing.

Hence the goroutine, which we properly dispose of with the stopChan

ticker := time.NewTicker(5 * time.Minute)
defer ticker.Stop()

for {
select {
case <-ticker.C:
log.Info("Starting periodic full reconciliation")
ctx := context.Background()
if _, err := wc.reconcileAllNodes(ctx); err != nil {
log.WithError(err).Error("periodic reconciliation failed")
}
case <-wc.stopChan:
return
}
}
}

func (wc *WorkspaceCountController) workspaceFilter() predicate.Predicate {
return predicate.Funcs{
CreateFunc: func(e event.CreateEvent) bool {
ws := e.Object.(*workspacev1.Workspace)
if ws.Status.Runtime == nil {
log.WithField("workspace", ws.Name).Info("workspace not ready yet")
return false
}

return ws.Status.Runtime != nil && ws.Status.Runtime.NodeName != ""
},
UpdateFunc: func(e event.UpdateEvent) bool {
wsOld := e.ObjectOld.(*workspacev1.Workspace)
ws := e.ObjectNew.(*workspacev1.Workspace)
// if we haven't seen runtime info before and now it's there, let's reconcile
if wsOld.Status.Runtime == nil && ws.Status.Runtime != nil && ws.Status.Runtime.NodeName != "" {
return true
}

return false
},
DeleteFunc: func(e event.DeleteEvent) bool {
ws := e.Object.(*workspacev1.Workspace)
if ws.Status.Runtime != nil && ws.Status.Runtime.NodeName != "" {
// Queue the node for reconciliation
select {
case wc.nodesToReconcile <- ws.Status.Runtime.NodeName:
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When deleting, we won't be able to query the workspaces' node name, since it already won't exist at the time we'll query for it. Because of that, we rather capture node names in a channel, which is consumed any time Reconcile is run.

log.WithField("node", ws.Status.Runtime.NodeName).Info("queued node for reconciliation from delete")
default:
log.WithField("node", ws.Status.Runtime.NodeName).Warn("reconciliation queue full")
}
return true
}
return false
},
}
}

func (wc *WorkspaceCountController) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
log.WithField("request", req.NamespacedName.String()).Info("WorkspaceCountController reconciling")

// Process any queued nodes first
select {
case nodeName := <-wc.nodesToReconcile:
if err := wc.reconcileNode(ctx, nodeName); err != nil {
log.WithError(err).WithField("node", nodeName).Error("failed to reconcile node from queue")
return ctrl.Result{}, err
}
default:
// No nodes in queue, continue with regular reconciliation
}

var ws workspacev1.Workspace
if err := wc.Get(ctx, req.NamespacedName, &ws); err != nil {
if !errors.IsNotFound(err) {
log.WithError(err).WithField("workspace", req.NamespacedName).Error("unable to fetch Workspace")
return ctrl.Result{}, err
}
return ctrl.Result{}, nil
}

if ws.Status.Runtime != nil && ws.Status.Runtime.NodeName != "" {
if err := wc.reconcileNode(ctx, ws.Status.Runtime.NodeName); err != nil {
log.WithError(err).WithField("node", ws.Status.Runtime.NodeName).Error("failed to reconcile node")
return ctrl.Result{}, err
}
}

return ctrl.Result{}, nil
}

// Cleanup method to be called when shutting down the controller
func (wc *WorkspaceCountController) Stop() {
close(wc.stopChan)
}

// reconcileAllNodes lists all nodes and reconciles each one
func (wc *WorkspaceCountController) reconcileAllNodes(ctx context.Context) (ctrl.Result, error) {
var nodes corev1.NodeList
if err := wc.List(ctx, &nodes); err != nil {
log.WithError(err).Error("failed to list nodes")
return ctrl.Result{}, err
}

for _, node := range nodes.Items {
if err := wc.reconcileNode(ctx, node.Name); err != nil {
log.WithError(err).WithField("node", node.Name).Error("failed to reconcile node")
continue
}
}

return ctrl.Result{}, nil
}

// reconcileNode counts the workspaces running on a node and updates the autoscaler annotation accordingly
func (wc *WorkspaceCountController) reconcileNode(ctx context.Context, nodeName string) error {
var workspaceList workspacev1.WorkspaceList
if err := wc.List(ctx, &workspaceList, client.MatchingFields{
"status.runtime.nodeName": nodeName,
}); err != nil {
return fmt.Errorf("failed to list workspaces: %w", err)
}
log.WithField("node", nodeName).WithField("count", len(workspaceList.Items)).Info("acting on workspaces")
count := len(workspaceList.Items)

return wc.updateNodeAnnotation(ctx, nodeName, count)
}

func (wc *WorkspaceCountController) updateNodeAnnotation(ctx context.Context, nodeName string, count int) error {
return retry.RetryOnConflict(retry.DefaultBackoff, func() error {
ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()

var node corev1.Node
err := wc.Get(ctx, types.NamespacedName{Name: nodeName}, &node)
if err != nil {
return fmt.Errorf("obtaining node %s: %w", nodeName, err)
}

if node.Annotations == nil {
node.Annotations = make(map[string]string)
}

if count > 0 {
node.Annotations["cluster-autoscaler.kubernetes.io/scale-down-disabled"] = "true"
log.WithField("nodeName", nodeName).Info("disabling scale-down for node")
} else {
delete(node.Annotations, "cluster-autoscaler.kubernetes.io/scale-down-disabled")
log.WithField("nodeName", nodeName).Info("enabling scale-down for node")
}

return wc.Update(ctx, &node)
})
}

func updateLabel(label string, add bool, nodeName string, client client.Client) error {
return retry.RetryOnConflict(retry.DefaultBackoff, func() error {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
Expand Down
Loading
Loading