Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion components/node-labeler/cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ var (
// rootCmd represents the base command when called without any subcommands
var rootCmd = &cobra.Command{
Use: ServiceName,
Short: "node-labeler is in charge of maintining the node labels that workspaces require to run in a node",
Short: "node-labeler is in charge of maintaining the node labels that workspaces require to run in a node",
PersistentPreRun: func(cmd *cobra.Command, args []string) {
log.Init(ServiceName, Version, jsonLog, verbose)
},
Expand Down
6 changes: 3 additions & 3 deletions components/node-labeler/cmd/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ var runCmd = &cobra.Command{
LeaderElectionID: "node-labeler.gitpod.io",
})
if err != nil {
log.WithError(err).Fatal("unable to start node-labeber")
log.WithError(err).Fatal("unable to start node-labeler")
}

client, err := client.New(ctrl.GetConfigOrDie(), client.Options{})
Expand Down Expand Up @@ -123,10 +123,10 @@ var runCmd = &cobra.Command{
log.WithError(err).Fatal("unable to set up ready check")
}

log.Info("starting node-labeber")
log.Info("starting node-labeler")
err = mgr.Start(ctrl.SetupSignalHandler())
if err != nil {
log.WithError(err).Fatal("problem running node-labeber")
log.WithError(err).Fatal("problem running node-labeler")
}

log.Info("Received SIGINT - shutting down")
Expand Down
76 changes: 76 additions & 0 deletions components/ws-daemon/pkg/controller/workspace_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ import (
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/tools/record"
Expand Down Expand Up @@ -81,6 +83,41 @@ func NewWorkspaceController(c client.Client, recorder record.EventRecorder, node
}, nil
}

type PodCountController struct {
client.Client
NodeName string
}

// NewPodCountController creates a controller that tracks workspace pod counts and updates node annotations
func NewPodCountController(client client.Client, nodeName string) (*PodCountController, error) {
return &PodCountController{
Client: client,
NodeName: nodeName,
}, nil
}

func (pc *PodCountController) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
Named("pod-count").
For(&workspacev1.Workspace{}).
WithEventFilter(podEventFilter(pc.NodeName)).
Complete(pc)
}

func podEventFilter(nodeName string) predicate.Predicate {
return predicate.Funcs{
CreateFunc: func(e event.CreateEvent) bool {
return workspaceFilter(e.Object, nodeName)
},
UpdateFunc: func(e event.UpdateEvent) bool {
return workspaceFilter(e.ObjectNew, nodeName)
},
DeleteFunc: func(e event.DeleteEvent) bool {
return workspaceFilter(e.Object, nodeName)
},
}
}

// SetupWithManager sets up the controller with the Manager.
func (wsc *WorkspaceController) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
Expand Down Expand Up @@ -146,6 +183,45 @@ func (wsc *WorkspaceController) Reconcile(ctx context.Context, req ctrl.Request)
return ctrl.Result{}, nil
}

func (pc *PodCountController) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
var podList corev1.PodList
err := pc.List(ctx, &podList, &client.ListOptions{
FieldSelector: fields.SelectorFromSet(fields.Set{"spec.nodeName": pc.NodeName}),
LabelSelector: labels.SelectorFromSet(labels.Set{"component": "workspace"}),
})
if err != nil {
glog.WithError(err).WithField("nodeName", pc.NodeName).Error("failed to list pods")
return ctrl.Result{}, err
}
workspaceCount := len(podList.Items)

err = retry.RetryOnConflict(retry.DefaultBackoff, func() error {
var node corev1.Node
err := pc.Get(ctx, types.NamespacedName{Name: pc.NodeName}, &node)
if err != nil {
return fmt.Errorf("obtaining node %s: %w", pc.NodeName, err)
}

if node.Annotations == nil {
node.Annotations = make(map[string]string)
}

if workspaceCount > 0 {
node.Annotations["cluster-autoscaler.kubernetes.io/scale-down-disabled"] = "true"
} else {
delete(node.Annotations, "cluster-autoscaler.kubernetes.io/scale-down-disabled")
}

return pc.Update(ctx, &node)
})
if err != nil {
glog.WithError(err).WithField("nodeName", pc.NodeName).Error("[failed to update node")
return ctrl.Result{}, err
}

return ctrl.Result{}, nil
}

// latestWorkspace checks if the we have the latest generation of the workspace CR. We do this because
// the cache could be stale and we retrieve a workspace CR that does not have the content init/backup
// conditions even though we have set them previously. This will lead to us performing these operations
Expand Down
22 changes: 22 additions & 0 deletions components/ws-daemon/pkg/daemon/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/collectors"
"golang.org/x/xerrors"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/client-go/kubernetes"
Expand All @@ -21,6 +22,7 @@ import (
"k8s.io/client-go/tools/clientcmd"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/cache"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/manager"
"sigs.k8s.io/controller-runtime/pkg/metrics"
metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server"
Expand Down Expand Up @@ -228,6 +230,26 @@ func NewDaemon(config Config) (*Daemon, error) {
return nil, err
}

// the pod count reconciler needs an index on spec.nodeName to be able to list pods by node
if err := mgr.GetFieldIndexer().IndexField(
context.Background(),
&corev1.Pod{},
"spec.nodeName",
func(o client.Object) []string {
pod := o.(*corev1.Pod)
return []string{pod.Spec.NodeName}
}); err != nil {
return nil, err
}

pcc, err := controller.NewPodCountController(mgr.GetClient(), nodename)
if err != nil {
return nil, err
}
if err := pcc.SetupWithManager(mgr); err != nil {
return nil, err
}

ssctrl := controller.NewSnapshotController(
mgr.GetClient(), mgr.GetEventRecorderFor("snapshot"), nodename, config.WorkspaceController.MaxConcurrentReconciles, workspaceOps)
err = ssctrl.SetupWithManager(mgr)
Expand Down
Loading