Skip to content

Commit 56a4333

Browse files
committed
Queue deleted nodes and periodically reconcile it all
1 parent f043f94 commit 56a4333

File tree

1 file changed

+86
-46
lines changed
  • components/node-labeler/cmd

1 file changed

+86
-46
lines changed

components/node-labeler/cmd/run.go

Lines changed: 86 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -68,9 +68,6 @@ var runCmd = &cobra.Command{
6868
DefaultNamespaces: map[string]cache.Config{
6969
namespace: {},
7070
},
71-
ByObject: map[client.Object]cache.ByObject{
72-
&workspacev1.Workspace{}: {},
73-
},
7471
// default sync period is 10h.
7572
// in case node-labeler is restarted and not change happens, we could waste (at least) 20m in a node
7673
// that never will run workspaces and the additional nodes cluster-autoscaler adds to compensate
@@ -115,6 +112,17 @@ var runCmd = &cobra.Command{
115112
log.WithError(err).Fatal("unable to bind controller watch event handler")
116113
}
117114

115+
if err := mgr.GetFieldIndexer().IndexField(context.Background(), &workspacev1.Workspace{}, "status.runtime.nodeName", func(o client.Object) []string {
116+
ws := o.(*workspacev1.Workspace)
117+
if ws.Status.Runtime == nil {
118+
return nil
119+
}
120+
return []string{ws.Status.Runtime.NodeName}
121+
}); err != nil {
122+
log.WithError(err).Fatal("unable to create workspace indexer")
123+
return
124+
}
125+
118126
wc, err := NewWorkspaceCountController(mgr.GetClient())
119127
if err != nil {
120128
log.WithError(err).Fatal("unable to create workspace count controller")
@@ -267,23 +275,48 @@ func (r *PodReconciler) Reconcile(ctx context.Context, req reconcile.Request) (r
267275

268276
type WorkspaceCountController struct {
269277
client.Client
278+
nodesToReconcile chan string
279+
stopChan chan struct{}
270280
}
271281

272282
func NewWorkspaceCountController(client client.Client) (*WorkspaceCountController, error) {
273283
return &WorkspaceCountController{
274-
Client: client,
284+
Client: client,
285+
nodesToReconcile: make(chan string, 1000),
286+
stopChan: make(chan struct{}),
275287
}, nil
276288
}
277289

278290
func (wc *WorkspaceCountController) SetupWithManager(mgr ctrl.Manager) error {
291+
// Start the periodic reconciliation goroutine
292+
go wc.periodicReconciliation()
293+
279294
return ctrl.NewControllerManagedBy(mgr).
280295
Named("workspace-count").
281296
For(&workspacev1.Workspace{}).
282-
WithEventFilter(workspaceFilter()).
297+
WithEventFilter(wc.workspaceFilter()).
283298
Complete(wc)
284299
}
285300

286-
func workspaceFilter() predicate.Predicate {
301+
func (wc *WorkspaceCountController) periodicReconciliation() {
302+
ticker := time.NewTicker(5 * time.Minute)
303+
defer ticker.Stop()
304+
305+
for {
306+
select {
307+
case <-ticker.C:
308+
log.Info("Starting periodic full reconciliation")
309+
ctx := context.Background()
310+
if _, err := wc.reconcileAllNodes(ctx); err != nil {
311+
log.WithError(err).Error("periodic reconciliation failed")
312+
}
313+
case <-wc.stopChan:
314+
return
315+
}
316+
}
317+
}
318+
319+
func (wc *WorkspaceCountController) workspaceFilter() predicate.Predicate {
287320
return predicate.Funcs{
288321
CreateFunc: func(e event.CreateEvent) bool {
289322
ws := e.Object.(*workspacev1.Workspace)
@@ -306,84 +339,91 @@ func workspaceFilter() predicate.Predicate {
306339
},
307340
DeleteFunc: func(e event.DeleteEvent) bool {
308341
ws := e.Object.(*workspacev1.Workspace)
309-
return ws.Status.Runtime != nil && ws.Status.Runtime.NodeName != ""
342+
if ws.Status.Runtime != nil && ws.Status.Runtime.NodeName != "" {
343+
// Queue the node for reconciliation
344+
select {
345+
case wc.nodesToReconcile <- ws.Status.Runtime.NodeName:
346+
log.WithField("node", ws.Status.Runtime.NodeName).Info("queued node for reconciliation from delete")
347+
default:
348+
log.WithField("node", ws.Status.Runtime.NodeName).Warn("reconciliation queue full")
349+
}
350+
return true
351+
}
352+
return false
310353
},
311354
}
312355
}
313356

314357
func (wc *WorkspaceCountController) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
315358
log.WithField("request", req.NamespacedName.String()).Info("WorkspaceCountController reconciling")
316359

360+
// Process any queued nodes first
361+
select {
362+
case nodeName := <-wc.nodesToReconcile:
363+
if err := wc.reconcileNode(ctx, nodeName); err != nil {
364+
log.WithError(err).WithField("node", nodeName).Error("failed to reconcile node from queue")
365+
return ctrl.Result{}, err
366+
}
367+
default:
368+
// No nodes in queue, continue with regular reconciliation
369+
}
370+
317371
var ws workspacev1.Workspace
318372
if err := wc.Get(ctx, req.NamespacedName, &ws); err != nil {
319373
if !errors.IsNotFound(err) {
320374
log.WithError(err).WithField("workspace", req.NamespacedName).Error("unable to fetch Workspace")
321375
return ctrl.Result{}, err
322376
}
323-
// If workspace not found, do a full reconciliation
324-
log.WithField("workspace", req.NamespacedName).Info("Workspace not found, reconciling all nodes")
325-
return wc.reconcileAllNodes(ctx)
377+
return ctrl.Result{}, nil
326378
}
327379

328380
if ws.Status.Runtime != nil && ws.Status.Runtime.NodeName != "" {
329-
var workspaceList workspacev1.WorkspaceList
330-
if err := wc.List(ctx, &workspaceList); err != nil {
331-
log.WithError(err).Error("failed to list workspaces")
381+
if err := wc.reconcileNode(ctx, ws.Status.Runtime.NodeName); err != nil {
382+
log.WithError(err).WithField("node", ws.Status.Runtime.NodeName).Error("failed to reconcile node")
332383
return ctrl.Result{}, err
333384
}
334-
335-
count := 0
336-
nodeName := ws.Status.Runtime.NodeName
337-
for _, ws := range workspaceList.Items {
338-
if ws.Status.Runtime != nil &&
339-
ws.Status.Runtime.NodeName == nodeName {
340-
count++
341-
}
342-
}
343-
344-
if err := wc.updateNodeAnnotation(ctx, nodeName, count); err != nil {
345-
log.WithError(err).WithField("node", nodeName).Error("failed to update node")
346-
return ctrl.Result{}, err
347-
}
348-
log.WithField("node", nodeName).WithField("count", count).Info("updated node annotation")
349385
}
350386

351387
return ctrl.Result{}, nil
352388
}
353389

354-
func (wc *WorkspaceCountController) reconcileAllNodes(ctx context.Context) (ctrl.Result, error) {
355-
var workspaceList workspacev1.WorkspaceList
356-
if err := wc.List(ctx, &workspaceList); err != nil {
357-
log.WithError(err).Error("failed to list workspaces")
358-
return ctrl.Result{}, err
359-
}
360-
361-
workspaceCounts := make(map[string]int)
362-
for _, ws := range workspaceList.Items {
363-
if ws.Status.Runtime != nil &&
364-
ws.Status.Runtime.NodeName != "" {
365-
workspaceCounts[ws.Status.Runtime.NodeName]++
366-
}
367-
}
390+
// Cleanup method to be called when shutting down the controller
391+
func (wc *WorkspaceCountController) Stop() {
392+
close(wc.stopChan)
393+
}
368394

395+
// reconcileAllNodes lists all nodes and reconciles each one
396+
func (wc *WorkspaceCountController) reconcileAllNodes(ctx context.Context) (ctrl.Result, error) {
369397
var nodes corev1.NodeList
370398
if err := wc.List(ctx, &nodes); err != nil {
371399
log.WithError(err).Error("failed to list nodes")
372400
return ctrl.Result{}, err
373401
}
374402

375403
for _, node := range nodes.Items {
376-
count := workspaceCounts[node.Name]
377-
if err := wc.updateNodeAnnotation(ctx, node.Name, count); err != nil {
378-
log.WithError(err).WithField("node", node.Name).Error("failed to update node")
404+
if err := wc.reconcileNode(ctx, node.Name); err != nil {
405+
log.WithError(err).WithField("node", node.Name).Error("failed to reconcile node")
379406
continue
380407
}
381-
log.WithField("node", node.Name).WithField("count", count).Info("updated node annotation")
382408
}
383409

384410
return ctrl.Result{}, nil
385411
}
386412

413+
// reconcileNode counts the workspaces running on a node and updates the autoscaler annotation accordingly
414+
func (wc *WorkspaceCountController) reconcileNode(ctx context.Context, nodeName string) error {
415+
var workspaceList workspacev1.WorkspaceList
416+
if err := wc.List(ctx, &workspaceList, client.MatchingFields{
417+
"status.runtime.nodeName": nodeName,
418+
}); err != nil {
419+
return fmt.Errorf("failed to list workspaces: %w", err)
420+
}
421+
log.WithField("node", nodeName).WithField("count", len(workspaceList.Items)).Info("acting on workspaces")
422+
count := len(workspaceList.Items)
423+
424+
return wc.updateNodeAnnotation(ctx, nodeName, count)
425+
}
426+
387427
func (wc *WorkspaceCountController) updateNodeAnnotation(ctx context.Context, nodeName string, count int) error {
388428
return retry.RetryOnConflict(retry.DefaultBackoff, func() error {
389429
ctx, cancel := context.WithTimeout(ctx, 5*time.Second)

0 commit comments

Comments
 (0)