Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions internal/controller/appwrapper/appwrapper_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -559,7 +559,7 @@ func (r *AppWrapperReconciler) getPodStatus(ctx context.Context, aw *workloadv1b
if pod.DeletionTimestamp.IsZero() {
summary.running += 1
if checkNoExecuteNodes {
noExecuteNodesMutex.RLock() // BEGIN CRITICAL SECTION
nodeInfoMutex.RLock() // BEGIN CRITICAL SECTION
if len(noExecuteNodes) > 0 {
if resources, ok := noExecuteNodes[pod.Spec.NodeName]; ok {
for badResource := range resources {
Expand All @@ -584,7 +584,7 @@ func (r *AppWrapperReconciler) getPodStatus(ctx context.Context, aw *workloadv1b
}
}
}
noExecuteNodesMutex.RUnlock() // END CRITICAL SECTION
nodeInfoMutex.RUnlock() // END CRITICAL SECTION
}
}
case v1.PodSucceeded:
Expand Down
181 changes: 86 additions & 95 deletions internal/controller/appwrapper/node_health_monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,20 +18,20 @@ package appwrapper

import (
"context"
"maps"
"sync"

v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/resource"
"k8s.io/apimachinery/pkg/types"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/utils/ptr"

ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/log"
kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1"

"github.com/project-codeflare/appwrapper/pkg/config"
)
Expand All @@ -44,51 +44,81 @@ import (
type NodeHealthMonitor struct {
client.Client
Config *config.AppWrapperConfig
Events chan event.GenericEvent // event channel for NodeHealthMonitor to trigger SlackClusterQueueMonitor
}

var (
// noExecuteNodes is a mapping from Node names to resources with an Autopilot NoExeucte taint
noExecuteNodes = make(map[string]sets.Set[string])
noExecuteNodesMutex sync.RWMutex
// nodeInfoMutex synchronizes writes by NodeHealthMonitor with reads from AppWrapperReconciler and SlackClusterQueueMonitor
nodeInfoMutex sync.RWMutex

// noScheduleNodes is a mapping from Node names to resource quantities that are unschedulable.
// A resource may be unscheduable either because:
// noExecuteNodes is a mapping from Node names to resources with an Autopilot NoExecute taint
noExecuteNodes = make(map[string]sets.Set[string])

// noScheduleNodes is a mapping from Node names to ResourceLists of unschedulable resources.
// A resource may be unschedulable either because:
// (a) the Node is cordoned (node.Spec.Unschedulable is true) or
// (b) Autopilot has labeled the with either a NoExecute or NoSchedule taint.
noScheduleNodes = make(map[string]map[string]*resource.Quantity)
// (b) Autopilot has labeled the Node with a NoExecute or NoSchedule taint for the resource.
noScheduleNodes = make(map[string]v1.ResourceList)
)

const (
dispatchEventName = "*trigger*"
)

// permission to watch nodes
//+kubebuilder:rbac:groups="",resources=nodes,verbs=get;list;watch
//+kubebuilder:rbac:groups=kueue.x-k8s.io,resources=clusterqueues,verbs=get;list;watch;update;patch

func (r *NodeHealthMonitor) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
node := &v1.Node{}
if err := r.Get(ctx, req.NamespacedName, node); err != nil {
return ctrl.Result{}, nil
if errors.IsNotFound(err) {
r.updateForNodeDeletion(ctx, req.Name)
return ctrl.Result{}, nil
}
return ctrl.Result{}, err
}

r.updateNoExecuteNodes(ctx, node)

// If there is a slack ClusterQueue, update its lending limits

if r.Config.SlackQueueName == "" {
return ctrl.Result{}, nil
if node.DeletionTimestamp.IsZero() {
r.updateNoExecuteNodes(ctx, node)
r.updateNoScheduleNodes(ctx, node)
} else {
r.updateForNodeDeletion(ctx, req.Name)
}

cq := &kueue.ClusterQueue{}
if err := r.Get(ctx, types.NamespacedName{Name: r.Config.SlackQueueName}, cq); err != nil {
if errors.IsNotFound(err) {
return ctrl.Result{}, nil // give up if slack quota is not defined
return ctrl.Result{}, nil
}

func (r *NodeHealthMonitor) triggerSlackCQMonitor() {
if r.Config.SlackQueueName != "" {
select {
case r.Events <- event.GenericEvent{Object: &metav1.PartialObjectMetadata{ObjectMeta: metav1.ObjectMeta{Name: dispatchEventName}}}:
default:
// do not block if event is already in channel
}
return ctrl.Result{}, err
}
}

r.updateNoScheduleNodes(ctx, cq, node)

return r.updateLendingLimits(ctx, cq)
// update noExecuteNodes and noScheduleNodes for the deletion of nodeName
func (r *NodeHealthMonitor) updateForNodeDeletion(ctx context.Context, nodeName string) {
if _, ok := noExecuteNodes[nodeName]; ok {
nodeInfoMutex.Lock() // BEGIN CRITICAL SECTION
delete(noExecuteNodes, nodeName)
nodeInfoMutex.Unlock() // END CRITICAL SECTION
r.triggerSlackCQMonitor()
log.FromContext(ctx).Info("Updated NoExecute information due to Node deletion",
"Number NoExecute Nodes", len(noExecuteNodes), "NoExecute Resource Details", noExecuteNodes)
}
if _, ok := noScheduleNodes[nodeName]; ok {
nodeInfoMutex.Lock() // BEGIN CRITICAL SECTION
delete(noScheduleNodes, nodeName)
nodeInfoMutex.Unlock() // END CRITICAL SECTION
r.triggerSlackCQMonitor()
log.FromContext(ctx).Info("Updated NoSchedule information due to Node deletion",
"Number NoSchedule Nodes", len(noScheduleNodes), "NoSchedule Resource Details", noScheduleNodes)
}
}

// update noExecuteNodes entry for node
func (r *NodeHealthMonitor) updateNoExecuteNodes(ctx context.Context, node *v1.Node) {
noExecuteResources := make(sets.Set[string])
for key, value := range node.GetLabels() {
Expand All @@ -102,7 +132,7 @@ func (r *NodeHealthMonitor) updateNoExecuteNodes(ctx context.Context, node *v1.N
}

noExecuteNodesChanged := false
noExecuteNodesMutex.Lock() // BEGIN CRITICAL SECTION
nodeInfoMutex.Lock() // BEGIN CRITICAL SECTION
if priorEntry, ok := noExecuteNodes[node.GetName()]; ok {
if len(noExecuteResources) == 0 {
delete(noExecuteNodes, node.GetName())
Expand All @@ -115,95 +145,56 @@ func (r *NodeHealthMonitor) updateNoExecuteNodes(ctx context.Context, node *v1.N
noExecuteNodes[node.GetName()] = noExecuteResources
noExecuteNodesChanged = true
}
noExecuteNodesMutex.Unlock() // END CRITICAL SECTION
nodeInfoMutex.Unlock() // END CRITICAL SECTION

// Safe to log outside the mutex because because this method is the only writer of noExecuteNodes
// and the controller runtime is configured to not allow concurrent execution of this controller.
if noExecuteNodesChanged {
log.FromContext(ctx).Info("Updated node NoExecute information", "Number NoExecute Nodes", len(noExecuteNodes), "NoExecute Resource Details", noExecuteNodes)
r.triggerSlackCQMonitor()
log.FromContext(ctx).Info("Updated NoExecute information", "Number NoExecute Nodes", len(noExecuteNodes), "NoExecute Resource Details", noExecuteNodes)
}
}

func (r *NodeHealthMonitor) updateNoScheduleNodes(_ context.Context, cq *kueue.ClusterQueue, node *v1.Node) {
// update unschedulable resource quantities for this node
noScheduleQuantities := make(map[string]*resource.Quantity)
// update noScheduleNodes entry for node
func (r *NodeHealthMonitor) updateNoScheduleNodes(ctx context.Context, node *v1.Node) {
var noScheduleResources v1.ResourceList
if node.Spec.Unschedulable {
// add all non-pod resources covered by cq if the node is cordoned
for _, resourceName := range cq.Spec.ResourceGroups[0].Flavors[0].Resources {
if string(resourceName.Name) != "pods" {
noScheduleQuantities[string(resourceName.Name)] = node.Status.Capacity.Name(resourceName.Name, resource.DecimalSI)
}
}
noScheduleResources = node.Status.Capacity.DeepCopy()
delete(noScheduleResources, v1.ResourcePods)
} else {
noScheduleResources = make(v1.ResourceList)
for key, value := range node.GetLabels() {
for resourceName, taints := range r.Config.Autopilot.ResourceTaints {
for _, taint := range taints {
if key == taint.Key && value == taint.Value {
noScheduleQuantities[resourceName] = node.Status.Capacity.Name(v1.ResourceName(resourceName), resource.DecimalSI)
quantity := node.Status.Capacity.Name(v1.ResourceName(resourceName), resource.DecimalSI)
if !quantity.IsZero() {
noScheduleResources[v1.ResourceName(resourceName)] = *quantity
}
}
}
}
}
}

if len(noScheduleQuantities) > 0 {
noScheduleNodes[node.GetName()] = noScheduleQuantities
} else {
delete(noScheduleNodes, node.GetName())
}
}

func (r *NodeHealthMonitor) updateLendingLimits(ctx context.Context, cq *kueue.ClusterQueue) (ctrl.Result, error) {

// compute unschedulable resource totals
unschedulableQuantities := map[string]*resource.Quantity{}
for _, quantities := range noScheduleNodes {
for resourceName, quantity := range quantities {
if !quantity.IsZero() {
if unschedulableQuantities[resourceName] == nil {
unschedulableQuantities[resourceName] = ptr.To(*quantity)
} else {
unschedulableQuantities[resourceName].Add(*quantity)
}
}
}
}

// enforce lending limits on 1st flavor of 1st resource group
resources := cq.Spec.ResourceGroups[0].Flavors[0].Resources
limitsChanged := false
for i, quota := range resources {
var lendingLimit *resource.Quantity
if unschedulableQuantity := unschedulableQuantities[quota.Name.String()]; unschedulableQuantity != nil {
if quota.NominalQuota.Cmp(*unschedulableQuantity) > 0 {
lendingLimit = ptr.To(quota.NominalQuota)
lendingLimit.Sub(*unschedulableQuantity)
} else {
lendingLimit = resource.NewQuantity(0, resource.DecimalSI)
}
}
if quota.LendingLimit == nil && lendingLimit != nil ||
quota.LendingLimit != nil && lendingLimit == nil ||
quota.LendingLimit != nil && lendingLimit != nil && quota.LendingLimit.Cmp(*lendingLimit) != 0 {
limitsChanged = true
resources[i].LendingLimit = lendingLimit
noScheduleNodesChanged := false
nodeInfoMutex.Lock() // BEGIN CRITICAL SECTION
if priorEntry, ok := noScheduleNodes[node.GetName()]; ok {
if len(noScheduleResources) == 0 {
delete(noScheduleNodes, node.GetName())
noScheduleNodesChanged = true
} else if !maps.Equal(priorEntry, noScheduleResources) {
noScheduleNodes[node.GetName()] = noScheduleResources
noScheduleNodesChanged = true
}
} else if len(noScheduleResources) > 0 {
noScheduleNodes[node.GetName()] = noScheduleResources
noScheduleNodesChanged = true
}
nodeInfoMutex.Unlock() // END CRITICAL SECTION

// update lending limits
if limitsChanged {
err := r.Update(ctx, cq)
if err == nil {
log.FromContext(ctx).Info("Updated lending limits", "Resources", resources)
return ctrl.Result{}, nil
} else if errors.IsConflict(err) {
return ctrl.Result{Requeue: true}, nil
} else {
return ctrl.Result{}, err
}
if noScheduleNodesChanged {
r.triggerSlackCQMonitor()
log.FromContext(ctx).Info("Updated NoSchedule information", "Number NoSchedule Nodes", len(noScheduleNodes), "NoSchedule Resource Details", noScheduleNodes)
}

return ctrl.Result{}, nil
}

// SetupWithManager sets up the controller with the Manager.
Expand Down
30 changes: 30 additions & 0 deletions internal/controller/appwrapper/node_health_monitor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,17 @@ import (
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
)

var _ = Describe("NodeMonitor Controller", func() {
var slackQueueName = "fake-queue"
var node1Name = types.NamespacedName{Name: "fake-node-1"}
var node2Name = types.NamespacedName{Name: "fake-node-2"}
var dispatch = types.NamespacedName{Name: dispatchEventName}
var nodeMonitor *NodeHealthMonitor
var cqMonitor *SlackClusterQueueMonitor
nodeGPUs := v1.ResourceList{v1.ResourceName("nvidia.com/gpu"): resource.MustParse("4")}

BeforeEach(func() {
Expand All @@ -49,9 +52,16 @@ var _ = Describe("NodeMonitor Controller", func() {
// Create reconciller
awConfig := config.NewAppWrapperConfig()
awConfig.SlackQueueName = slackQueueName
conduit := make(chan event.GenericEvent, 1)
nodeMonitor = &NodeHealthMonitor{
Client: k8sClient,
Config: awConfig,
Events: conduit,
}
cqMonitor = &SlackClusterQueueMonitor{
Client: k8sClient,
Config: awConfig,
Events: conduit,
}
})

Expand Down Expand Up @@ -124,6 +134,8 @@ var _ = Describe("NodeMonitor Controller", func() {
Expect(k8sClient.Update(ctx, node1)).Should(Succeed())
_, err = nodeMonitor.Reconcile(ctx, reconcile.Request{NamespacedName: node1Name})
Expect(err).NotTo(HaveOccurred())
_, err = cqMonitor.Reconcile(ctx, reconcile.Request{NamespacedName: dispatch})
Expect(err).NotTo(HaveOccurred())

Expect(k8sClient.Get(ctx, types.NamespacedName{Name: slackQueueName}, queue)).Should(Succeed())
Expect(queue.Spec.ResourceGroups[0].Flavors[0].Resources[0].LendingLimit.Value()).Should(Equal(int64(2)))
Expand All @@ -134,6 +146,8 @@ var _ = Describe("NodeMonitor Controller", func() {
Expect(k8sClient.Update(ctx, node2)).Should(Succeed())
_, err = nodeMonitor.Reconcile(ctx, reconcile.Request{NamespacedName: node2Name})
Expect(err).NotTo(HaveOccurred())
_, err = cqMonitor.Reconcile(ctx, reconcile.Request{NamespacedName: dispatch})
Expect(err).NotTo(HaveOccurred())

Expect(k8sClient.Get(ctx, types.NamespacedName{Name: slackQueueName}, queue)).Should(Succeed())
Expect(queue.Spec.ResourceGroups[0].Flavors[0].Resources[0].LendingLimit).ShouldNot(BeNil())
Expand All @@ -144,6 +158,8 @@ var _ = Describe("NodeMonitor Controller", func() {
Expect(k8sClient.Update(ctx, node1)).Should(Succeed())
_, err = nodeMonitor.Reconcile(ctx, reconcile.Request{NamespacedName: node1Name})
Expect(err).NotTo(HaveOccurred())
_, err = cqMonitor.Reconcile(ctx, reconcile.Request{NamespacedName: dispatch})
Expect(err).NotTo(HaveOccurred())

Expect(k8sClient.Get(ctx, types.NamespacedName{Name: slackQueueName}, queue)).Should(Succeed())
Expect(queue.Spec.ResourceGroups[0].Flavors[0].Resources[0].LendingLimit).ShouldNot(BeNil())
Expand All @@ -154,6 +170,8 @@ var _ = Describe("NodeMonitor Controller", func() {
Expect(k8sClient.Update(ctx, node2)).Should(Succeed())
_, err = nodeMonitor.Reconcile(ctx, reconcile.Request{NamespacedName: node2Name})
Expect(err).NotTo(HaveOccurred())
_, err = cqMonitor.Reconcile(ctx, reconcile.Request{NamespacedName: dispatch})
Expect(err).NotTo(HaveOccurred())

Expect(k8sClient.Get(ctx, types.NamespacedName{Name: slackQueueName}, queue)).Should(Succeed())
Expect(queue.Spec.ResourceGroups[0].Flavors[0].Resources[0].LendingLimit).Should(BeNil())
Expand All @@ -164,10 +182,22 @@ var _ = Describe("NodeMonitor Controller", func() {
Expect(k8sClient.Update(ctx, node1)).Should(Succeed())
_, err = nodeMonitor.Reconcile(ctx, reconcile.Request{NamespacedName: node1Name})
Expect(err).NotTo(HaveOccurred())
_, err = cqMonitor.Reconcile(ctx, reconcile.Request{NamespacedName: dispatch})
Expect(err).NotTo(HaveOccurred())

Expect(k8sClient.Get(ctx, types.NamespacedName{Name: slackQueueName}, queue)).Should(Succeed())
Expect(queue.Spec.ResourceGroups[0].Flavors[0].Resources[0].LendingLimit.Value()).Should(Equal(int64(2)))

// Increase the slack cluster queue's quota by 2 and expect LedningLimit to increase by 2 to become 4
Expect(k8sClient.Get(ctx, types.NamespacedName{Name: slackQueueName}, queue)).Should(Succeed())
queue.Spec.ResourceGroups[0].Flavors[0].Resources[0].NominalQuota = resource.MustParse("8")
Expect(k8sClient.Update(ctx, queue)).Should(Succeed())
_, err = cqMonitor.Reconcile(ctx, reconcile.Request{NamespacedName: types.NamespacedName{Name: slackQueueName}})
Expect(err).NotTo(HaveOccurred())

Expect(k8sClient.Get(ctx, types.NamespacedName{Name: slackQueueName}, queue)).Should(Succeed())
Expect(queue.Spec.ResourceGroups[0].Flavors[0].Resources[0].LendingLimit.Value()).Should(Equal(int64(4)))

Expect(k8sClient.Delete(ctx, queue)).To(Succeed())
})
})
Loading