Skip to content

Commit d293826

Browse files
committed
Redesign node monitoring to account for Node deletion
1. Split node monitoring into two reconcilers, one to monitor Nodes and one to monitor and update the designated slack ClusterQueue. 2. Remove entries from in memory caches when a Node is deleted. 3. Watch slack cluster queue to be able to react to changes in nominalQuotas and adjust lendingLimits accordingly. Fixes #252.
1 parent 4b282d0 commit d293826

File tree

5 files changed

+249
-96
lines changed

5 files changed

+249
-96
lines changed

internal/controller/appwrapper/appwrapper_controller.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -559,7 +559,7 @@ func (r *AppWrapperReconciler) getPodStatus(ctx context.Context, aw *workloadv1b
559559
if pod.DeletionTimestamp.IsZero() {
560560
summary.running += 1
561561
if checkNoExecuteNodes {
562-
noExecuteNodesMutex.RLock() // BEGIN CRITICAL SECTION
562+
nodeInfoMutex.RLock() // BEGIN CRITICAL SECTION
563563
if len(noExecuteNodes) > 0 {
564564
if resources, ok := noExecuteNodes[pod.Spec.NodeName]; ok {
565565
for badResource := range resources {
@@ -584,7 +584,7 @@ func (r *AppWrapperReconciler) getPodStatus(ctx context.Context, aw *workloadv1b
584584
}
585585
}
586586
}
587-
noExecuteNodesMutex.RUnlock() // END CRITICAL SECTION
587+
nodeInfoMutex.RUnlock() // END CRITICAL SECTION
588588
}
589589
}
590590
case v1.PodSucceeded:

internal/controller/appwrapper/node_health_monitor.go

Lines changed: 82 additions & 94 deletions
Original file line numberDiff line numberDiff line change
@@ -18,20 +18,20 @@ package appwrapper
1818

1919
import (
2020
"context"
21+
"maps"
2122
"sync"
2223

2324
v1 "k8s.io/api/core/v1"
2425
"k8s.io/apimachinery/pkg/api/errors"
2526
"k8s.io/apimachinery/pkg/api/resource"
26-
"k8s.io/apimachinery/pkg/types"
27+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
2728
"k8s.io/apimachinery/pkg/util/sets"
28-
"k8s.io/utils/ptr"
2929

3030
ctrl "sigs.k8s.io/controller-runtime"
3131
"sigs.k8s.io/controller-runtime/pkg/client"
32+
"sigs.k8s.io/controller-runtime/pkg/event"
3233
"sigs.k8s.io/controller-runtime/pkg/handler"
3334
"sigs.k8s.io/controller-runtime/pkg/log"
34-
kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1"
3535

3636
"github.com/project-codeflare/appwrapper/pkg/config"
3737
)
@@ -44,51 +44,78 @@ import (
4444
type NodeHealthMonitor struct {
4545
client.Client
4646
Config *config.AppWrapperConfig
47+
Events chan event.GenericEvent // event channel for NodeHealthMonitor to trigger SlackClusterQueueMonitor
4748
}
4849

4950
var (
50-
// noExecuteNodes is a mapping from Node names to resources with an Autopilot NoExeucte taint
51-
noExecuteNodes = make(map[string]sets.Set[string])
52-
noExecuteNodesMutex sync.RWMutex
51+
// nodeInfoMutex syncnornized writes by NodeHealthMonitor with reads from AppWrapperReconciler and SlackClusterQueueMonitor
52+
nodeInfoMutex sync.RWMutex
5353

54-
// noScheduleNodes is a mapping from Node names to resource quantities that are unschedulable.
54+
// noExecuteNodes is a mapping from Node names to resources with an Autopilot NoExecute taint
55+
noExecuteNodes = make(map[string]sets.Set[string])
56+
57+
// noScheduleNodes is a mapping from Node names to ResourceLists of unschedulable resources.
5558
// A resource may be unscheduable either because:
5659
// (a) the Node is cordoned (node.Spec.Unschedulable is true) or
57-
// (b) Autopilot has labeled the with either a NoExecute or NoSchedule taint.
58-
noScheduleNodes = make(map[string]map[string]*resource.Quantity)
60+
// (b) Autopilot has labeled the Node with a NoExecute or NoSchedule taint for the resource.
61+
noScheduleNodes = make(map[string]v1.ResourceList)
5962
)
6063

6164
// permission to watch nodes
6265
//+kubebuilder:rbac:groups="",resources=nodes,verbs=get;list;watch
63-
//+kubebuilder:rbac:groups=kueue.x-k8s.io,resources=clusterqueues,verbs=get;list;watch;update;patch
6466

6567
func (r *NodeHealthMonitor) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
6668
node := &v1.Node{}
6769
if err := r.Get(ctx, req.NamespacedName, node); err != nil {
68-
return ctrl.Result{}, nil
70+
if errors.IsNotFound(err) {
71+
r.updateForNodeDeletion(ctx, req.Name)
72+
return ctrl.Result{}, nil
73+
}
74+
return ctrl.Result{}, err
6975
}
7076

71-
r.updateNoExecuteNodes(ctx, node)
72-
73-
// If there is a slack ClusterQueue, update its lending limits
74-
75-
if r.Config.SlackQueueName == "" {
76-
return ctrl.Result{}, nil
77+
if node.DeletionTimestamp.IsZero() {
78+
r.updateNoExecuteNodes(ctx, node)
79+
r.updateNoScheduleNodes(ctx, node)
80+
} else {
81+
r.updateForNodeDeletion(ctx, req.Name)
7782
}
7883

79-
cq := &kueue.ClusterQueue{}
80-
if err := r.Get(ctx, types.NamespacedName{Name: r.Config.SlackQueueName}, cq); err != nil {
81-
if errors.IsNotFound(err) {
82-
return ctrl.Result{}, nil // give up if slack quota is not defined
84+
return ctrl.Result{}, nil
85+
}
86+
87+
// Trigger dispatch by means of "*/*" request
88+
func (r *NodeHealthMonitor) triggerDispatch() {
89+
if r.Config.SlackQueueName != "" {
90+
select {
91+
case r.Events <- event.GenericEvent{Object: &metav1.PartialObjectMetadata{ObjectMeta: metav1.ObjectMeta{Namespace: "*", Name: "*"}}}:
92+
default:
93+
// do not block if event is already in channel
8394
}
84-
return ctrl.Result{}, err
8595
}
96+
}
8697

87-
r.updateNoScheduleNodes(ctx, cq, node)
88-
89-
return r.updateLendingLimits(ctx, cq)
98+
// update for the deletion of nodeName
99+
func (r *NodeHealthMonitor) updateForNodeDeletion(ctx context.Context, nodeName string) {
100+
if _, ok := noExecuteNodes[nodeName]; ok {
101+
nodeInfoMutex.Lock() // BEGIN CRITICAL SECTION
102+
delete(noExecuteNodes, nodeName)
103+
nodeInfoMutex.Unlock() // END CRITICAL SECTION
104+
r.triggerDispatch()
105+
log.FromContext(ctx).Info("Updated NoExecute information due to Node deletion",
106+
"Number NoExecute Nodes", len(noExecuteNodes), "NoExecute Resource Details", noExecuteNodes)
107+
}
108+
if _, ok := noScheduleNodes[nodeName]; ok {
109+
nodeInfoMutex.Lock() // BEGIN CRITICAL SECTION
110+
delete(noScheduleNodes, nodeName)
111+
nodeInfoMutex.Unlock() // END CRITICAL SECTION
112+
r.triggerDispatch()
113+
log.FromContext(ctx).Info("Updated NoSchedule information due to Node deletion",
114+
"Number NoSchedule Nodes", len(noScheduleNodes), "NoSchedule Resource Details", noScheduleNodes)
115+
}
90116
}
91117

118+
// update noExecuteNodes entry for node
92119
func (r *NodeHealthMonitor) updateNoExecuteNodes(ctx context.Context, node *v1.Node) {
93120
noExecuteResources := make(sets.Set[string])
94121
for key, value := range node.GetLabels() {
@@ -102,7 +129,7 @@ func (r *NodeHealthMonitor) updateNoExecuteNodes(ctx context.Context, node *v1.N
102129
}
103130

104131
noExecuteNodesChanged := false
105-
noExecuteNodesMutex.Lock() // BEGIN CRITICAL SECTION
132+
nodeInfoMutex.Lock() // BEGIN CRITICAL SECTION
106133
if priorEntry, ok := noExecuteNodes[node.GetName()]; ok {
107134
if len(noExecuteResources) == 0 {
108135
delete(noExecuteNodes, node.GetName())
@@ -115,95 +142,56 @@ func (r *NodeHealthMonitor) updateNoExecuteNodes(ctx context.Context, node *v1.N
115142
noExecuteNodes[node.GetName()] = noExecuteResources
116143
noExecuteNodesChanged = true
117144
}
118-
noExecuteNodesMutex.Unlock() // END CRITICAL SECTION
145+
nodeInfoMutex.Unlock() // END CRITICAL SECTION
119146

120-
// Safe to log outside the mutex because because this method is the only writer of noExecuteNodes
121-
// and the controller runtime is configured to not allow concurrent execution of this controller.
122147
if noExecuteNodesChanged {
123-
log.FromContext(ctx).Info("Updated node NoExecute information", "Number NoExecute Nodes", len(noExecuteNodes), "NoExecute Resource Details", noExecuteNodes)
148+
r.triggerDispatch()
149+
log.FromContext(ctx).Info("Updated NoExecute information", "Number NoExecute Nodes", len(noExecuteNodes), "NoExecute Resource Details", noExecuteNodes)
124150
}
125151
}
126152

127-
func (r *NodeHealthMonitor) updateNoScheduleNodes(_ context.Context, cq *kueue.ClusterQueue, node *v1.Node) {
128-
// update unschedulable resource quantities for this node
129-
noScheduleQuantities := make(map[string]*resource.Quantity)
153+
// update noScheduleNodes entry for node
154+
func (r *NodeHealthMonitor) updateNoScheduleNodes(ctx context.Context, node *v1.Node) {
155+
var noScheduleResources v1.ResourceList
130156
if node.Spec.Unschedulable {
131-
// add all non-pod resources covered by cq if the node is cordoned
132-
for _, resourceName := range cq.Spec.ResourceGroups[0].Flavors[0].Resources {
133-
if string(resourceName.Name) != "pods" {
134-
noScheduleQuantities[string(resourceName.Name)] = node.Status.Capacity.Name(resourceName.Name, resource.DecimalSI)
135-
}
136-
}
157+
noScheduleResources = node.Status.Capacity.DeepCopy()
158+
delete(noScheduleResources, v1.ResourcePods)
137159
} else {
160+
noScheduleResources = make(v1.ResourceList)
138161
for key, value := range node.GetLabels() {
139162
for resourceName, taints := range r.Config.Autopilot.ResourceTaints {
140163
for _, taint := range taints {
141164
if key == taint.Key && value == taint.Value {
142-
noScheduleQuantities[resourceName] = node.Status.Capacity.Name(v1.ResourceName(resourceName), resource.DecimalSI)
165+
quantity := node.Status.Capacity.Name(v1.ResourceName(resourceName), resource.DecimalSI)
166+
if !quantity.IsZero() {
167+
noScheduleResources[v1.ResourceName(resourceName)] = *quantity
168+
}
143169
}
144170
}
145171
}
146172
}
147173
}
148174

149-
if len(noScheduleQuantities) > 0 {
150-
noScheduleNodes[node.GetName()] = noScheduleQuantities
151-
} else {
152-
delete(noScheduleNodes, node.GetName())
153-
}
154-
}
155-
156-
func (r *NodeHealthMonitor) updateLendingLimits(ctx context.Context, cq *kueue.ClusterQueue) (ctrl.Result, error) {
157-
158-
// compute unschedulable resource totals
159-
unschedulableQuantities := map[string]*resource.Quantity{}
160-
for _, quantities := range noScheduleNodes {
161-
for resourceName, quantity := range quantities {
162-
if !quantity.IsZero() {
163-
if unschedulableQuantities[resourceName] == nil {
164-
unschedulableQuantities[resourceName] = ptr.To(*quantity)
165-
} else {
166-
unschedulableQuantities[resourceName].Add(*quantity)
167-
}
168-
}
175+
noScheduleNodesChanged := false
176+
nodeInfoMutex.Lock() // BEGIN CRITICAL SECTION
177+
if priorEntry, ok := noScheduleNodes[node.GetName()]; ok {
178+
if len(noScheduleResources) == 0 {
179+
delete(noScheduleNodes, node.GetName())
180+
noScheduleNodesChanged = true
181+
} else if !maps.Equal(priorEntry, noScheduleResources) {
182+
noScheduleNodes[node.GetName()] = noScheduleResources
183+
noScheduleNodesChanged = true
169184
}
185+
} else if len(noScheduleResources) > 0 {
186+
noScheduleNodes[node.GetName()] = noScheduleResources
187+
noScheduleNodesChanged = true
170188
}
189+
nodeInfoMutex.Unlock() // END CRITICAL SECTION
171190

172-
// enforce lending limits on 1st flavor of 1st resource group
173-
resources := cq.Spec.ResourceGroups[0].Flavors[0].Resources
174-
limitsChanged := false
175-
for i, quota := range resources {
176-
var lendingLimit *resource.Quantity
177-
if unschedulableQuantity := unschedulableQuantities[quota.Name.String()]; unschedulableQuantity != nil {
178-
if quota.NominalQuota.Cmp(*unschedulableQuantity) > 0 {
179-
lendingLimit = ptr.To(quota.NominalQuota)
180-
lendingLimit.Sub(*unschedulableQuantity)
181-
} else {
182-
lendingLimit = resource.NewQuantity(0, resource.DecimalSI)
183-
}
184-
}
185-
if quota.LendingLimit == nil && lendingLimit != nil ||
186-
quota.LendingLimit != nil && lendingLimit == nil ||
187-
quota.LendingLimit != nil && lendingLimit != nil && quota.LendingLimit.Cmp(*lendingLimit) != 0 {
188-
limitsChanged = true
189-
resources[i].LendingLimit = lendingLimit
190-
}
191-
}
192-
193-
// update lending limits
194-
if limitsChanged {
195-
err := r.Update(ctx, cq)
196-
if err == nil {
197-
log.FromContext(ctx).Info("Updated lending limits", "Resources", resources)
198-
return ctrl.Result{}, nil
199-
} else if errors.IsConflict(err) {
200-
return ctrl.Result{Requeue: true}, nil
201-
} else {
202-
return ctrl.Result{}, err
203-
}
191+
if noScheduleNodesChanged {
192+
r.triggerDispatch()
193+
log.FromContext(ctx).Info("Updated NoSchedule information", "Number NoSchedule Nodes", len(noScheduleNodes), "NoSchedule Resource Details", noScheduleNodes)
204194
}
205-
206-
return ctrl.Result{}, nil
207195
}
208196

209197
// SetupWithManager sets up the controller with the Manager.

internal/controller/appwrapper/node_health_monitor_test.go

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,14 +24,17 @@ import (
2424
"k8s.io/apimachinery/pkg/api/resource"
2525
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
2626
"k8s.io/apimachinery/pkg/types"
27+
"sigs.k8s.io/controller-runtime/pkg/event"
2728
"sigs.k8s.io/controller-runtime/pkg/reconcile"
2829
)
2930

3031
var _ = Describe("NodeMonitor Controller", func() {
3132
var slackQueueName = "fake-queue"
3233
var node1Name = types.NamespacedName{Name: "fake-node-1"}
3334
var node2Name = types.NamespacedName{Name: "fake-node-2"}
35+
var dispatch = types.NamespacedName{Name: "*"}
3436
var nodeMonitor *NodeHealthMonitor
37+
var cqMonitor *SlackClusterQueueMonitor
3538
nodeGPUs := v1.ResourceList{v1.ResourceName("nvidia.com/gpu"): resource.MustParse("4")}
3639

3740
BeforeEach(func() {
@@ -49,9 +52,16 @@ var _ = Describe("NodeMonitor Controller", func() {
4952
// Create reconciller
5053
awConfig := config.NewAppWrapperConfig()
5154
awConfig.SlackQueueName = slackQueueName
55+
conduit := make(chan event.GenericEvent, 1)
5256
nodeMonitor = &NodeHealthMonitor{
5357
Client: k8sClient,
5458
Config: awConfig,
59+
Events: conduit,
60+
}
61+
cqMonitor = &SlackClusterQueueMonitor{
62+
Client: k8sClient,
63+
Config: awConfig,
64+
Events: conduit,
5565
}
5666
})
5767

@@ -124,6 +134,8 @@ var _ = Describe("NodeMonitor Controller", func() {
124134
Expect(k8sClient.Update(ctx, node1)).Should(Succeed())
125135
_, err = nodeMonitor.Reconcile(ctx, reconcile.Request{NamespacedName: node1Name})
126136
Expect(err).NotTo(HaveOccurred())
137+
_, err = cqMonitor.Reconcile(ctx, reconcile.Request{NamespacedName: dispatch})
138+
Expect(err).NotTo(HaveOccurred())
127139

128140
Expect(k8sClient.Get(ctx, types.NamespacedName{Name: slackQueueName}, queue)).Should(Succeed())
129141
Expect(queue.Spec.ResourceGroups[0].Flavors[0].Resources[0].LendingLimit.Value()).Should(Equal(int64(2)))
@@ -134,6 +146,8 @@ var _ = Describe("NodeMonitor Controller", func() {
134146
Expect(k8sClient.Update(ctx, node2)).Should(Succeed())
135147
_, err = nodeMonitor.Reconcile(ctx, reconcile.Request{NamespacedName: node2Name})
136148
Expect(err).NotTo(HaveOccurred())
149+
_, err = cqMonitor.Reconcile(ctx, reconcile.Request{NamespacedName: dispatch})
150+
Expect(err).NotTo(HaveOccurred())
137151

138152
Expect(k8sClient.Get(ctx, types.NamespacedName{Name: slackQueueName}, queue)).Should(Succeed())
139153
Expect(queue.Spec.ResourceGroups[0].Flavors[0].Resources[0].LendingLimit).ShouldNot(BeNil())
@@ -144,6 +158,8 @@ var _ = Describe("NodeMonitor Controller", func() {
144158
Expect(k8sClient.Update(ctx, node1)).Should(Succeed())
145159
_, err = nodeMonitor.Reconcile(ctx, reconcile.Request{NamespacedName: node1Name})
146160
Expect(err).NotTo(HaveOccurred())
161+
_, err = cqMonitor.Reconcile(ctx, reconcile.Request{NamespacedName: dispatch})
162+
Expect(err).NotTo(HaveOccurred())
147163

148164
Expect(k8sClient.Get(ctx, types.NamespacedName{Name: slackQueueName}, queue)).Should(Succeed())
149165
Expect(queue.Spec.ResourceGroups[0].Flavors[0].Resources[0].LendingLimit).ShouldNot(BeNil())
@@ -154,6 +170,8 @@ var _ = Describe("NodeMonitor Controller", func() {
154170
Expect(k8sClient.Update(ctx, node2)).Should(Succeed())
155171
_, err = nodeMonitor.Reconcile(ctx, reconcile.Request{NamespacedName: node2Name})
156172
Expect(err).NotTo(HaveOccurred())
173+
_, err = cqMonitor.Reconcile(ctx, reconcile.Request{NamespacedName: dispatch})
174+
Expect(err).NotTo(HaveOccurred())
157175

158176
Expect(k8sClient.Get(ctx, types.NamespacedName{Name: slackQueueName}, queue)).Should(Succeed())
159177
Expect(queue.Spec.ResourceGroups[0].Flavors[0].Resources[0].LendingLimit).Should(BeNil())
@@ -164,10 +182,22 @@ var _ = Describe("NodeMonitor Controller", func() {
164182
Expect(k8sClient.Update(ctx, node1)).Should(Succeed())
165183
_, err = nodeMonitor.Reconcile(ctx, reconcile.Request{NamespacedName: node1Name})
166184
Expect(err).NotTo(HaveOccurred())
185+
_, err = cqMonitor.Reconcile(ctx, reconcile.Request{NamespacedName: dispatch})
186+
Expect(err).NotTo(HaveOccurred())
167187

168188
Expect(k8sClient.Get(ctx, types.NamespacedName{Name: slackQueueName}, queue)).Should(Succeed())
169189
Expect(queue.Spec.ResourceGroups[0].Flavors[0].Resources[0].LendingLimit.Value()).Should(Equal(int64(2)))
170190

191+
// Increase the slack cluster queue's quota by 2 and expect LedningLimit to increase by 2 to become 4
192+
Expect(k8sClient.Get(ctx, types.NamespacedName{Name: slackQueueName}, queue)).Should(Succeed())
193+
queue.Spec.ResourceGroups[0].Flavors[0].Resources[0].NominalQuota = resource.MustParse("8")
194+
Expect(k8sClient.Update(ctx, queue)).Should(Succeed())
195+
_, err = cqMonitor.Reconcile(ctx, reconcile.Request{NamespacedName: types.NamespacedName{Name: slackQueueName}})
196+
Expect(err).NotTo(HaveOccurred())
197+
198+
Expect(k8sClient.Get(ctx, types.NamespacedName{Name: slackQueueName}, queue)).Should(Succeed())
199+
Expect(queue.Spec.ResourceGroups[0].Flavors[0].Resources[0].LendingLimit.Value()).Should(Equal(int64(4)))
200+
171201
Expect(k8sClient.Delete(ctx, queue)).To(Succeed())
172202
})
173203
})

0 commit comments

Comments
 (0)