@@ -18,13 +18,13 @@ package appwrapper
1818
1919import (
2020 "context"
21- "reflect"
2221 "sync"
2322
2423 v1 "k8s.io/api/core/v1"
2524 "k8s.io/apimachinery/pkg/api/errors"
2625 "k8s.io/apimachinery/pkg/api/resource"
2726 "k8s.io/apimachinery/pkg/types"
27+ "k8s.io/apimachinery/pkg/util/sets"
2828 "k8s.io/utils/ptr"
2929
3030 ctrl "sigs.k8s.io/controller-runtime"
@@ -43,26 +43,32 @@ type NodeHealthMonitor struct {
4343}
4444
4545var (
46- // unhealthyNodes is a mapping from Node names to a set of resource quantities that Autopilot has labeled as unhealthy on that Node
47- unhealthyNodes = make (map [string ]map [string ]* resource. Quantity )
46+ // unhealthyNodes is a mapping from Node names to a set of resources that Autopilot has labeled as unhealthy on that Node
47+ unhealthyNodes = make (map [string ]sets. Set [string ])
4848 unhealthyNodesMutex sync.RWMutex
49+
50+ // unschedulableNodes is a mapping from Node names to resource quantities than Autopilot has labeled as unschedulable on that Node
51+ unschedulableNodes = make (map [string ]map [string ]* resource.Quantity )
4952)
5053
5154// permission to watch nodes
5255//+kubebuilder:rbac:groups="",resources=nodes,verbs=get;list;watch
5356//+kubebuilder:rbac:groups=kueue.x-k8s.io,resources=clusterqueues,verbs=get;list;watch;update;patch
5457
58+ //gocyclo:ignore
5559func (r * NodeHealthMonitor ) Reconcile (ctx context.Context , req ctrl.Request ) (ctrl.Result , error ) {
5660 node := & v1.Node {}
5761 if err := r .Get (ctx , req .NamespacedName , node ); err != nil {
5862 return ctrl.Result {}, nil
5963 }
6064
61- flaggedResources := make (map [string ]* resource. Quantity )
65+ flaggedResources := make (sets. Set [string ])
6266 for key , value := range node .GetLabels () {
63- for resourceName , apLabels := range r .Config .Autopilot .ResourceUnhealthyConfig {
64- if apValue , ok := apLabels [key ]; ok && apValue == value {
65- flaggedResources [resourceName ] = node .Status .Capacity .Name (v1 .ResourceName (resourceName ), resource .DecimalSI )
67+ for resourceName , taints := range r .Config .Autopilot .ResourceTaints {
68+ for _ , taint := range taints {
69+ if key == taint .Key && value == taint .Value && taint .Effect == v1 .TaintEffectNoExecute {
70+ flaggedResources .Insert (resourceName )
71+ }
6672 }
6773 }
6874 }
@@ -73,7 +79,7 @@ func (r *NodeHealthMonitor) Reconcile(ctx context.Context, req ctrl.Request) (ct
7379 if len (flaggedResources ) == 0 {
7480 delete (unhealthyNodes , node .GetName ())
7581 nodeChanged = true
76- } else if ! reflect . DeepEqual ( priorEntry , flaggedResources ) {
82+ } else if ! priorEntry . Equal ( flaggedResources ) {
7783 unhealthyNodes [node .GetName ()] = flaggedResources
7884 nodeChanged = true
7985 }
@@ -106,15 +112,40 @@ func (r *NodeHealthMonitor) Reconcile(ctx context.Context, req ctrl.Request) (ct
106112 return ctrl.Result {}, err
107113 }
108114
109- // compute unhealthy resource totals
110- missingQuantities := map [string ]* resource.Quantity {}
111- for _ , quantities := range unhealthyNodes {
115+ // update unschedulable resource quantities for this node
116+ flaggedQuantities := make (map [string ]* resource.Quantity )
117+ if node .Spec .Unschedulable {
118+ // flag all configured resources if the node is cordoned
119+ for resourceName := range r .Config .Autopilot .ResourceTaints {
120+ flaggedQuantities [resourceName ] = node .Status .Capacity .Name (v1 .ResourceName (resourceName ), resource .DecimalSI )
121+ }
122+ } else {
123+ for key , value := range node .GetLabels () {
124+ for resourceName , taints := range r .Config .Autopilot .ResourceTaints {
125+ for _ , taint := range taints {
126+ if key == taint .Key && value == taint .Value {
127+ flaggedQuantities [resourceName ] = node .Status .Capacity .Name (v1 .ResourceName (resourceName ), resource .DecimalSI )
128+ }
129+ }
130+ }
131+ }
132+ }
133+
134+ if len (flaggedQuantities ) > 0 {
135+ unschedulableNodes [node .GetName ()] = flaggedQuantities
136+ } else {
137+ delete (unschedulableNodes , node .GetName ())
138+ }
139+
140+ // compute unschedulable resource totals
141+ unschedulableQuantities := map [string ]* resource.Quantity {}
142+ for _ , quantities := range unschedulableNodes {
112143 for resourceName , quantity := range quantities {
113144 if ! quantity .IsZero () {
114- if missingQuantities [resourceName ] == nil {
115- missingQuantities [resourceName ] = ptr .To (* quantity )
145+ if unschedulableQuantities [resourceName ] == nil {
146+ unschedulableQuantities [resourceName ] = ptr .To (* quantity )
116147 } else {
117- missingQuantities [resourceName ].Add (* quantity )
148+ unschedulableQuantities [resourceName ].Add (* quantity )
118149 }
119150 }
120151 }
@@ -125,10 +156,10 @@ func (r *NodeHealthMonitor) Reconcile(ctx context.Context, req ctrl.Request) (ct
125156 limitsChanged := false
126157 for i , quota := range resources {
127158 var lendingLimit * resource.Quantity
128- if missingQuantity := missingQuantities [quota .Name .String ()]; missingQuantity != nil {
129- if quota .NominalQuota .Cmp (* missingQuantity ) > 0 {
159+ if unschedulableQuantity := unschedulableQuantities [quota .Name .String ()]; unschedulableQuantity != nil {
160+ if quota .NominalQuota .Cmp (* unschedulableQuantity ) > 0 {
130161 lendingLimit = ptr .To (quota .NominalQuota )
131- lendingLimit .Sub (* missingQuantity )
162+ lendingLimit .Sub (* unschedulableQuantity )
132163 } else {
133164 lendingLimit = resource .NewQuantity (0 , resource .DecimalSI )
134165 }
@@ -145,7 +176,11 @@ func (r *NodeHealthMonitor) Reconcile(ctx context.Context, req ctrl.Request) (ct
145176 var err error
146177 if limitsChanged {
147178 err = r .Update (ctx , cq )
179+ if err == nil {
180+ log .FromContext (ctx ).Info ("Updated lending limits" , "Resources" , resources )
181+ }
148182 }
183+
149184 return ctrl.Result {}, err
150185}
151186
0 commit comments