@@ -18,77 +18,104 @@ package appwrapper
1818
1919import (
2020 "context"
21+ "maps"
2122 "sync"
2223
2324 v1 "k8s.io/api/core/v1"
2425 "k8s.io/apimachinery/pkg/api/errors"
2526 "k8s.io/apimachinery/pkg/api/resource"
26- "k8s.io/apimachinery/pkg/types "
27+ metav1 "k8s.io/apimachinery/pkg/apis/meta/v1 "
2728 "k8s.io/apimachinery/pkg/util/sets"
28- "k8s.io/utils/ptr"
2929
3030 ctrl "sigs.k8s.io/controller-runtime"
3131 "sigs.k8s.io/controller-runtime/pkg/client"
32+ "sigs.k8s.io/controller-runtime/pkg/event"
3233 "sigs.k8s.io/controller-runtime/pkg/handler"
3334 "sigs.k8s.io/controller-runtime/pkg/log"
34- kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1"
3535
3636 "github.com/project-codeflare/appwrapper/pkg/config"
3737)
3838
3939// NodeHealthMonitor watches Nodes and maintains mappings of Nodes that have either
4040// been marked as Unschedulable or that have been labeled to indicate that
41- // they have resources that Autopilot has tainted as NoSchedule or NoExeucte .
41+ // they have resources that Autopilot has tainted as NoSchedule or NoExecute .
4242// This information is used to automate the maintenance of the lendingLimit of
4343// a designated slack ClusterQueue and to migrate running workloads away from NoExecute resources.
4444type NodeHealthMonitor struct {
4545 client.Client
4646 Config * config.AppWrapperConfig
47+ Events chan event.GenericEvent // event channel for NodeHealthMonitor to trigger SlackClusterQueueMonitor
4748}
4849
4950var (
50- // noExecuteNodes is a mapping from Node names to resources with an Autopilot NoExeucte taint
51- noExecuteNodes = make (map [string ]sets.Set [string ])
51+ // noExecuteNodes is a mapping from Node names to resources with an Autopilot NoExecute taint
52+ noExecuteNodes = make (map [string ]sets.Set [string ])
53+ // noExecuteNodesMutex synchronizes access to noExecuteNodes
5254 noExecuteNodesMutex sync.RWMutex
5355
54- // noScheduleNodes is a mapping from Node names to resource quantities that are unschedulable.
55- // A resource may be unscheduable either because:
56+ // noScheduleNodes is a mapping from Node names to ResourceLists of unschedulable resources .
57+ // A resource may be unschedulable either because:
5658 // (a) the Node is cordoned (node.Spec.Unschedulable is true) or
57- // (b) Autopilot has labeled the with either a NoExecute or NoSchedule taint.
58- noScheduleNodes = make (map [string ]map [string ]* resource.Quantity )
59+ // (b) Autopilot has labeled the Node with a NoExecute or NoSchedule taint for the resource.
60+ noScheduleNodes = make (map [string ]v1.ResourceList )
61+ // noScheduleNodesMutex synchronizes access to noScheduleNodes
62+ noScheduleNodesMutex sync.RWMutex
5963)
6064
6165// permission to watch nodes
6266//+kubebuilder:rbac:groups="",resources=nodes,verbs=get;list;watch
63- //+kubebuilder:rbac:groups=kueue.x-k8s.io,resources=clusterqueues,verbs=get;list;watch;update;patch
6467
6568func (r * NodeHealthMonitor ) Reconcile (ctx context.Context , req ctrl.Request ) (ctrl.Result , error ) {
6669 node := & v1.Node {}
6770 if err := r .Get (ctx , req .NamespacedName , node ); err != nil {
68- return ctrl.Result {}, nil
71+ if errors .IsNotFound (err ) {
72+ r .updateForNodeDeletion (ctx , req .Name )
73+ return ctrl.Result {}, nil
74+ }
75+ return ctrl.Result {}, err
6976 }
7077
71- r .updateNoExecuteNodes (ctx , node )
72-
73- // If there is a slack ClusterQueue, update its lending limits
74-
75- if r .Config .SlackQueueName == "" {
76- return ctrl.Result {}, nil
78+ if node .DeletionTimestamp .IsZero () {
79+ r .updateNoExecuteNodes (ctx , node )
80+ r .updateNoScheduleNodes (ctx , node )
81+ } else {
82+ r .updateForNodeDeletion (ctx , req .Name )
7783 }
7884
79- cq := & kueue.ClusterQueue {}
80- if err := r .Get (ctx , types.NamespacedName {Name : r .Config .SlackQueueName }, cq ); err != nil {
81- if errors .IsNotFound (err ) {
82- return ctrl.Result {}, nil // give up if slack quota is not defined
85+ return ctrl.Result {}, nil
86+ }
87+
88+ func (r * NodeHealthMonitor ) triggerSlackCQMonitor () {
89+ if r .Config .SlackQueueName != "" {
90+ select {
91+ case r .Events <- event.GenericEvent {Object : & metav1.PartialObjectMetadata {ObjectMeta : metav1.ObjectMeta {Name : r .Config .SlackQueueName }}}:
92+ default :
93+ // do not block if event is already in channel
8394 }
84- return ctrl.Result {}, err
8595 }
96+ }
8697
87- r .updateNoScheduleNodes (ctx , cq , node )
88-
89- return r .updateLendingLimits (ctx , cq )
98+ // update noExecuteNodes and noScheduleNodes for the deletion of nodeName
99+ func (r * NodeHealthMonitor ) updateForNodeDeletion (ctx context.Context , nodeName string ) {
100+ if _ , ok := noExecuteNodes [nodeName ]; ok {
101+ noExecuteNodesMutex .Lock () // BEGIN CRITICAL SECTION
102+ delete (noExecuteNodes , nodeName )
103+ noExecuteNodesMutex .Unlock () // END CRITICAL SECTION
104+ log .FromContext (ctx ).Info ("Updated NoExecute information due to Node deletion" ,
105+ "Number NoExecute Nodes" , len (noExecuteNodes ), "NoExecute Resource Details" , noExecuteNodes )
106+ r .triggerSlackCQMonitor ()
107+ }
108+ if _ , ok := noScheduleNodes [nodeName ]; ok {
109+ noScheduleNodesMutex .Lock () // BEGIN CRITICAL SECTION
110+ delete (noScheduleNodes , nodeName )
111+ noScheduleNodesMutex .Unlock () // END CRITICAL SECTION
112+ log .FromContext (ctx ).Info ("Updated NoSchedule information due to Node deletion" ,
113+ "Number NoSchedule Nodes" , len (noScheduleNodes ), "NoSchedule Resource Details" , noScheduleNodes )
114+ r .triggerSlackCQMonitor ()
115+ }
90116}
91117
118+ // update noExecuteNodes entry for node
92119func (r * NodeHealthMonitor ) updateNoExecuteNodes (ctx context.Context , node * v1.Node ) {
93120 noExecuteResources := make (sets.Set [string ])
94121 for key , value := range node .GetLabels () {
@@ -117,93 +144,54 @@ func (r *NodeHealthMonitor) updateNoExecuteNodes(ctx context.Context, node *v1.N
117144 }
118145 noExecuteNodesMutex .Unlock () // END CRITICAL SECTION
119146
120- // Safe to log outside the mutex because because this method is the only writer of noExecuteNodes
121- // and the controller runtime is configured to not allow concurrent execution of this controller.
122147 if noExecuteNodesChanged {
123- log .FromContext (ctx ).Info ("Updated node NoExecute information" , "Number NoExecute Nodes" , len (noExecuteNodes ), "NoExecute Resource Details" , noExecuteNodes )
148+ log .FromContext (ctx ).Info ("Updated NoExecute information" , "Number NoExecute Nodes" , len (noExecuteNodes ), "NoExecute Resource Details" , noExecuteNodes )
149+ r .triggerSlackCQMonitor ()
124150 }
125151}
126152
127- func ( r * NodeHealthMonitor ) updateNoScheduleNodes ( _ context. Context , cq * kueue. ClusterQueue , node * v1. Node ) {
128- // update unschedulable resource quantities for this node
129- noScheduleQuantities := make ( map [ string ] * resource. Quantity )
153+ // update noScheduleNodes entry for node
154+ func ( r * NodeHealthMonitor ) updateNoScheduleNodes ( ctx context. Context , node * v1. Node ) {
155+ var noScheduleResources v1. ResourceList
130156 if node .Spec .Unschedulable {
131- // add all non-pod resources covered by cq if the node is cordoned
132- for _ , resourceName := range cq .Spec .ResourceGroups [0 ].Flavors [0 ].Resources {
133- if string (resourceName .Name ) != "pods" {
134- noScheduleQuantities [string (resourceName .Name )] = node .Status .Capacity .Name (resourceName .Name , resource .DecimalSI )
135- }
136- }
157+ noScheduleResources = node .Status .Capacity .DeepCopy ()
158+ delete (noScheduleResources , v1 .ResourcePods )
137159 } else {
160+ noScheduleResources = make (v1.ResourceList )
138161 for key , value := range node .GetLabels () {
139162 for resourceName , taints := range r .Config .Autopilot .ResourceTaints {
140163 for _ , taint := range taints {
141164 if key == taint .Key && value == taint .Value {
142- noScheduleQuantities [resourceName ] = node .Status .Capacity .Name (v1 .ResourceName (resourceName ), resource .DecimalSI )
165+ quantity := node .Status .Capacity .Name (v1 .ResourceName (resourceName ), resource .DecimalSI )
166+ if ! quantity .IsZero () {
167+ noScheduleResources [v1 .ResourceName (resourceName )] = * quantity
168+ }
143169 }
144170 }
145171 }
146172 }
147173 }
148174
149- if len (noScheduleQuantities ) > 0 {
150- noScheduleNodes [node .GetName ()] = noScheduleQuantities
151- } else {
152- delete (noScheduleNodes , node .GetName ())
153- }
154- }
155-
156- func (r * NodeHealthMonitor ) updateLendingLimits (ctx context.Context , cq * kueue.ClusterQueue ) (ctrl.Result , error ) {
157-
158- // compute unschedulable resource totals
159- unschedulableQuantities := map [string ]* resource.Quantity {}
160- for _ , quantities := range noScheduleNodes {
161- for resourceName , quantity := range quantities {
162- if ! quantity .IsZero () {
163- if unschedulableQuantities [resourceName ] == nil {
164- unschedulableQuantities [resourceName ] = ptr .To (* quantity )
165- } else {
166- unschedulableQuantities [resourceName ].Add (* quantity )
167- }
168- }
169- }
170- }
171-
172- // enforce lending limits on 1st flavor of 1st resource group
173- resources := cq .Spec .ResourceGroups [0 ].Flavors [0 ].Resources
174- limitsChanged := false
175- for i , quota := range resources {
176- var lendingLimit * resource.Quantity
177- if unschedulableQuantity := unschedulableQuantities [quota .Name .String ()]; unschedulableQuantity != nil {
178- if quota .NominalQuota .Cmp (* unschedulableQuantity ) > 0 {
179- lendingLimit = ptr .To (quota .NominalQuota )
180- lendingLimit .Sub (* unschedulableQuantity )
181- } else {
182- lendingLimit = resource .NewQuantity (0 , resource .DecimalSI )
183- }
184- }
185- if quota .LendingLimit == nil && lendingLimit != nil ||
186- quota .LendingLimit != nil && lendingLimit == nil ||
187- quota .LendingLimit != nil && lendingLimit != nil && quota .LendingLimit .Cmp (* lendingLimit ) != 0 {
188- limitsChanged = true
189- resources [i ].LendingLimit = lendingLimit
175+ noScheduleNodesChanged := false
176+ noScheduleNodesMutex .Lock () // BEGIN CRITICAL SECTION
177+ if priorEntry , ok := noScheduleNodes [node .GetName ()]; ok {
178+ if len (noScheduleResources ) == 0 {
179+ delete (noScheduleNodes , node .GetName ())
180+ noScheduleNodesChanged = true
181+ } else if ! maps .Equal (priorEntry , noScheduleResources ) {
182+ noScheduleNodes [node .GetName ()] = noScheduleResources
183+ noScheduleNodesChanged = true
190184 }
185+ } else if len (noScheduleResources ) > 0 {
186+ noScheduleNodes [node .GetName ()] = noScheduleResources
187+ noScheduleNodesChanged = true
191188 }
189+ noScheduleNodesMutex .Unlock () // END CRITICAL SECTION
192190
193- // update lending limits
194- if limitsChanged {
195- err := r .Update (ctx , cq )
196- if err == nil {
197- log .FromContext (ctx ).Info ("Updated lending limits" , "Resources" , resources )
198- return ctrl.Result {}, nil
199- } else if errors .IsConflict (err ) {
200- return ctrl.Result {Requeue : true }, nil
201- } else {
202- return ctrl.Result {}, err
203- }
191+ if noScheduleNodesChanged {
192+ log .FromContext (ctx ).Info ("Updated NoSchedule information" , "Number NoSchedule Nodes" , len (noScheduleNodes ), "NoSchedule Resource Details" , noScheduleNodes )
193+ r .triggerSlackCQMonitor ()
204194 }
205-
206- return ctrl.Result {}, nil
207195}
208196
209197// SetupWithManager sets up the controller with the Manager.
0 commit comments