@@ -36,74 +36,44 @@ import (
3636 "github.com/project-codeflare/appwrapper/pkg/config"
3737)
3838
39- // NodeHealthMonitor maintains the set of nodes that Autopilot has labelled as unhealthy
39+ // NodeHealthMonitor watches Nodes and maintains mappings of Nodes that have either
40+ // been marked as Unschedulable or that have been labeled to indicate that
41+ // they have resources that Autopilot has tainted as NoSchedule or NoExeucte.
42+ // This information is used to automate the maintenance of the lendingLimit of
43+ // the designated slack ClusterQueue and to migrate workloads away from NoExecute resources.
4044type NodeHealthMonitor struct {
4145 client.Client
4246 Config * config.AppWrapperConfig
4347}
4448
4549var (
46- // unhealthyNodes is a mapping from Node names to a set of resources that Autopilot has labeled as unhealthy on that Node
47- unhealthyNodes = make (map [string ]sets.Set [string ])
48- unhealthyNodesMutex sync.RWMutex
50+ // noExecuteNodes is a mapping from Node names to sets of resources that Autopilot has labeled with a NoExeucte taint on that Node
51+ noExecuteNodes = make (map [string ]sets.Set [string ])
52+ noExecuteNodesMutex sync.RWMutex
4953
50- // unschedulableNodes is a mapping from Node names to resource quantities than Autopilot has labeled as unschedulable on that Node
51- unschedulableNodes = make (map [string ]map [string ]* resource.Quantity )
54+ // noScheduleNodes is a mapping from Node names to resource quantities that are unschedulable
55+ // either because (a) the Node is cordoned or (b) Autopilot has labeled resources on the Node with either a NoExeucte or NoScheduler taint.
56+ noScheduleNodes = make (map [string ]map [string ]* resource.Quantity )
5257)
5358
5459// permission to watch nodes
5560//+kubebuilder:rbac:groups="",resources=nodes,verbs=get;list;watch
5661//+kubebuilder:rbac:groups=kueue.x-k8s.io,resources=clusterqueues,verbs=get;list;watch;update;patch
5762
58- //gocyclo:ignore
5963func (r * NodeHealthMonitor ) Reconcile (ctx context.Context , req ctrl.Request ) (ctrl.Result , error ) {
6064 node := & v1.Node {}
6165 if err := r .Get (ctx , req .NamespacedName , node ); err != nil {
6266 return ctrl.Result {}, nil
6367 }
6468
65- flaggedResources := make (sets.Set [string ])
66- for key , value := range node .GetLabels () {
67- for resourceName , taints := range r .Config .Autopilot .ResourceTaints {
68- for _ , taint := range taints {
69- if key == taint .Key && value == taint .Value && taint .Effect == v1 .TaintEffectNoExecute {
70- flaggedResources .Insert (resourceName )
71- }
72- }
73- }
74- }
69+ r .updateNoExecuteNodes (ctx , node )
7570
76- nodeChanged := false
77- unhealthyNodesMutex .Lock () // BEGIN CRITICAL SECTION
78- if priorEntry , ok := unhealthyNodes [node .GetName ()]; ok {
79- if len (flaggedResources ) == 0 {
80- delete (unhealthyNodes , node .GetName ())
81- nodeChanged = true
82- } else if ! priorEntry .Equal (flaggedResources ) {
83- unhealthyNodes [node .GetName ()] = flaggedResources
84- nodeChanged = true
85- }
86- } else if len (flaggedResources ) > 0 {
87- unhealthyNodes [node .GetName ()] = flaggedResources
88- nodeChanged = true
89- }
90- unhealthyNodesMutex .Unlock () // END CRITICAL SECTION
91-
92- // Unsynchronized reads of unhealthyNodes below are safe because this method
93- // is the only writer to the map and the controller runtime is configured to
94- // not allow concurrent execution of this method.
95-
96- if nodeChanged {
97- log .FromContext (ctx ).Info ("Updated node health information" , "Number Unhealthy Nodes" , len (unhealthyNodes ), "Unhealthy Resource Details" , unhealthyNodes )
98- }
99-
100- // update lending limits on slack quota if configured
71+ // If there is a slack ClusterQueue, update its lending limits
10172
10273 if r .Config .SlackQueueName == "" {
10374 return ctrl.Result {}, nil
10475 }
10576
106- // get slack quota
10777 cq := & kueue.ClusterQueue {}
10878 if err := r .Get (ctx , types.NamespacedName {Name : r .Config .SlackQueueName }, cq ); err != nil {
10979 if errors .IsNotFound (err ) {
@@ -112,36 +82,80 @@ func (r *NodeHealthMonitor) Reconcile(ctx context.Context, req ctrl.Request) (ct
11282 return ctrl.Result {}, err
11383 }
11484
85+ r .updateNoScheduleNodes (ctx , cq , node )
86+
87+ return r .updateLendingLimits (ctx , cq )
88+ }
89+
90+ func (r * NodeHealthMonitor ) updateNoExecuteNodes (ctx context.Context , node * v1.Node ) {
91+ noExecuteResources := make (sets.Set [string ])
92+ for key , value := range node .GetLabels () {
93+ for resourceName , taints := range r .Config .Autopilot .ResourceTaints {
94+ for _ , taint := range taints {
95+ if key == taint .Key && value == taint .Value && taint .Effect == v1 .TaintEffectNoExecute {
96+ noExecuteResources .Insert (resourceName )
97+ }
98+ }
99+ }
100+ }
101+
102+ noExecuteNodesChanged := false
103+ noExecuteNodesMutex .Lock () // BEGIN CRITICAL SECTION
104+ if priorEntry , ok := noExecuteNodes [node .GetName ()]; ok {
105+ if len (noExecuteResources ) == 0 {
106+ delete (noExecuteNodes , node .GetName ())
107+ noExecuteNodesChanged = true
108+ } else if ! priorEntry .Equal (noExecuteResources ) {
109+ noExecuteNodes [node .GetName ()] = noExecuteResources
110+ noExecuteNodesChanged = true
111+ }
112+ } else if len (noExecuteResources ) > 0 {
113+ noExecuteNodes [node .GetName ()] = noExecuteResources
114+ noExecuteNodesChanged = true
115+ }
116+ noExecuteNodesMutex .Unlock () // END CRITICAL SECTION
117+
118+ // Safe to log outside the mutex because because this method is the only writer
119+ // and the controller runtime is configured to not allow concurrent execution of this method.
120+ if noExecuteNodesChanged {
121+ log .FromContext (ctx ).Info ("Updated node NoExecute information" , "Number NoExecute Nodes" , len (noExecuteNodes ), "NoExecute Resource Details" , noExecuteNodes )
122+ }
123+ }
124+
125+ func (r * NodeHealthMonitor ) updateNoScheduleNodes (_ context.Context , cq * kueue.ClusterQueue , node * v1.Node ) {
115126 // update unschedulable resource quantities for this node
116- flaggedQuantities := make (map [string ]* resource.Quantity )
127+ noScheduleQuantities := make (map [string ]* resource.Quantity )
117128 if node .Spec .Unschedulable {
118- // flag all non-pod resources covered by cq if the node is cordoned
129+ // add all non-pod resources covered by cq if the node is cordoned
119130 for _ , resourceName := range cq .Spec .ResourceGroups [0 ].Flavors [0 ].Resources {
120131 if string (resourceName .Name ) != "pods" {
121- flaggedQuantities [string (resourceName .Name )] = node .Status .Capacity .Name (resourceName .Name , resource .DecimalSI )
132+ noScheduleQuantities [string (resourceName .Name )] = node .Status .Capacity .Name (resourceName .Name , resource .DecimalSI )
122133 }
123134 }
124135 } else {
125136 for key , value := range node .GetLabels () {
126137 for resourceName , taints := range r .Config .Autopilot .ResourceTaints {
127138 for _ , taint := range taints {
128139 if key == taint .Key && value == taint .Value {
129- flaggedQuantities [resourceName ] = node .Status .Capacity .Name (v1 .ResourceName (resourceName ), resource .DecimalSI )
140+ noScheduleQuantities [resourceName ] = node .Status .Capacity .Name (v1 .ResourceName (resourceName ), resource .DecimalSI )
130141 }
131142 }
132143 }
133144 }
134145 }
135146
136- if len (flaggedQuantities ) > 0 {
137- unschedulableNodes [node .GetName ()] = flaggedQuantities
147+ if len (noScheduleQuantities ) > 0 {
148+ noScheduleNodes [node .GetName ()] = noScheduleQuantities
138149 } else {
139- delete (unschedulableNodes , node .GetName ())
150+ delete (noScheduleNodes , node .GetName ())
140151 }
152+ }
153+
154+ func (r * NodeHealthMonitor ) updateLendingLimits (ctx context.Context , cq * kueue.ClusterQueue ) (ctrl.Result , error ) {
141155
142156 // compute unschedulable resource totals
143157 unschedulableQuantities := map [string ]* resource.Quantity {}
144- for _ , quantities := range unschedulableNodes {
158+ for _ , quantities := range noScheduleNodes {
145159 for resourceName , quantity := range quantities {
146160 if ! quantity .IsZero () {
147161 if unschedulableQuantities [resourceName ] == nil {
0 commit comments