Skip to content

Commit 52c3b91

Browse files
authored
chore: throw errors out of nodeclaim informer loop (#2749)
1 parent 6b1da14 commit 52c3b91

File tree

6 files changed

+42
-24
lines changed

6 files changed

+42
-24
lines changed

pkg/controllers/disruption/consolidation_test.go

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1066,6 +1066,7 @@ var _ = Describe("Consolidation", func() {
10661066
corev1.LabelTopologyZone: mostExpSpotOffering.Requirements.Get(corev1.LabelTopologyZone).Any(),
10671067
})
10681068

1069+
ExpectSingletonReconciled(ctx, pricingController)
10691070
rs := test.ReplicaSet()
10701071
ExpectApplied(ctx, env.Client, rs)
10711072
Expect(env.Client.Get(ctx, client.ObjectKeyFromObject(rs), rs)).To(Succeed())
@@ -1180,6 +1181,7 @@ var _ = Describe("Consolidation", func() {
11801181
v1.CapacityTypeLabelKey: spotOffering.Requirements.Get(v1.CapacityTypeLabelKey).Any(),
11811182
corev1.LabelTopologyZone: spotOffering.Requirements.Get(corev1.LabelTopologyZone).Any(),
11821183
})
1184+
ExpectSingletonReconciled(ctx, pricingController)
11831185

11841186
rs := test.ReplicaSet()
11851187
ExpectApplied(ctx, env.Client, rs)
@@ -1714,7 +1716,8 @@ var _ = Describe("Consolidation", func() {
17141716
nodeClaim = lo.Ternary(spotToSpot, spotNodeClaim, nodeClaim)
17151717
node = lo.Ternary(spotToSpot, spotNode, node)
17161718
// Add these spot instance with this special condition to cloud provider instancetypes
1717-
cloudProvider.InstanceTypes = lo.Ternary(spotToSpot, spotInstances, onDemandInstances)
1719+
cloudProvider.InstanceTypes = append(cloudProvider.InstanceTypes, lo.Ternary(spotToSpot, spotInstances, onDemandInstances)...)
1720+
ExpectSingletonReconciled(ctx, pricingController)
17181721

17191722
rs := test.ReplicaSet()
17201723
ExpectApplied(ctx, env.Client, rs)
@@ -2229,6 +2232,7 @@ var _ = Describe("Consolidation", func() {
22292232
currentInstance,
22302233
replacementInstance,
22312234
}
2235+
ExpectSingletonReconciled(ctx, pricingController)
22322236

22332237
// create our RS so we can link a pod to it
22342238
rs := test.ReplicaSet()
@@ -2317,6 +2321,7 @@ var _ = Describe("Consolidation", func() {
23172321
currentInstance,
23182322
replacementInstance,
23192323
}
2324+
ExpectSingletonReconciled(ctx, pricingController)
23202325

23212326
// create our RS so we can link a pod to it
23222327
rs := test.ReplicaSet()
@@ -2917,6 +2922,7 @@ var _ = Describe("Consolidation", func() {
29172922
defaultInstanceType,
29182923
smallInstanceType,
29192924
}
2925+
ExpectSingletonReconciled(ctx, pricingController)
29202926
// create our RS so we can link a pod to it
29212927
rs := test.ReplicaSet()
29222928
ExpectApplied(ctx, env.Client, rs)
@@ -4468,6 +4474,7 @@ var _ = Describe("Consolidation", func() {
44684474
v1alpha1.LabelReservationID: mostExpensiveReservationID,
44694475
}),
44704476
})
4477+
ExpectSingletonReconciled(ctx, pricingController)
44714478
reservedNodeClaim, reservedNode = test.NodeClaimAndNode(v1.NodeClaim{
44724479
ObjectMeta: metav1.ObjectMeta{
44734480
Labels: map[string]string{

pkg/controllers/disruption/emptiness_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -65,8 +65,8 @@ var _ = Describe("Emptiness", func() {
6565
Labels: map[string]string{
6666
v1.NodePoolLabelKey: nodePool.Name,
6767
corev1.LabelInstanceTypeStable: leastExpensiveSpotInstance.Name,
68-
v1.CapacityTypeLabelKey: leastExpensiveOffering.Requirements.Get(v1.CapacityTypeLabelKey).Any(),
69-
corev1.LabelTopologyZone: leastExpensiveOffering.Requirements.Get(corev1.LabelTopologyZone).Any(),
68+
v1.CapacityTypeLabelKey: leastExpensiveSpotOffering.Requirements.Get(v1.CapacityTypeLabelKey).Any(),
69+
corev1.LabelTopologyZone: leastExpensiveSpotOffering.Requirements.Get(corev1.LabelTopologyZone).Any(),
7070
},
7171
},
7272
Status: v1.NodeClaimStatus{

pkg/controllers/disruption/suite_test.go

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,7 @@ var env *test.Environment
6767
var clusterCost *cost.ClusterCost
6868
var cluster *state.Cluster
6969
var disruptionController *disruption.Controller
70+
var pricingController *informer.PricingController
7071
var prov *provisioning.Provisioner
7172
var cloudProvider *fake.CloudProvider
7273
var nodeStateController *informer.NodeController
@@ -95,6 +96,7 @@ var _ = BeforeSuite(func() {
9596
cloudProvider = fake.NewCloudProvider()
9697
fakeClock = clock.NewFakeClock(time.Now())
9798
clusterCost = cost.NewClusterCost(ctx, cloudProvider, env.Client)
99+
pricingController = informer.NewPricingController(env.Client, cloudProvider, clusterCost)
98100
cluster = state.NewCluster(fakeClock, env.Client, cloudProvider)
99101
nodeStateController = informer.NewNodeController(env.Client, cluster)
100102
nodeClaimStateController = informer.NewNodeClaimController(env.Client, cloudProvider, cluster, clusterCost)
@@ -110,6 +112,8 @@ var _ = AfterSuite(func() {
110112
var _ = BeforeEach(func() {
111113
cloudProvider.Reset()
112114
cloudProvider.InstanceTypes = fake.InstanceTypesAssorted()
115+
clusterCost.Reset()
116+
ExpectSingletonReconciled(ctx, pricingController)
113117

114118
recorder.Reset() // Reset the events that we captured during the run
115119

@@ -580,6 +584,7 @@ var _ = Describe("Disruption Taints", func() {
580584
currentInstance,
581585
replacementInstance,
582586
}
587+
ExpectSingletonReconciled(ctx, pricingController)
583588
nodePool.Spec.Disruption.ConsolidateAfter.Duration = lo.ToPtr(time.Duration(0))
584589
nodeClaim.StatusConditions().SetTrue(v1.ConditionTypeConsolidatable)
585590
ExpectApplied(ctx, env.Client, nodeClaim, nodePool)
@@ -1778,8 +1783,7 @@ var _ = Describe("Candidate Filtering", func() {
17781783
},
17791784
},
17801785
})
1781-
// Don't apply the NodePool
1782-
ExpectApplied(ctx, env.Client, nodeClaim, node)
1786+
ExpectApplied(ctx, env.Client, nodePool, nodeClaim, node)
17831787
ExpectMakeNodesAndNodeClaimsInitializedAndStateUpdated(ctx, env.Client, nodeStateController, nodeClaimStateController, []*corev1.Node{node}, []*v1.NodeClaim{nodeClaim})
17841788

17851789
// Mock the NodePool not existing by removing it from the nodePool and nodePoolInstanceTypes maps

pkg/controllers/state/informer/nodeclaim.go

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@ import (
2424
"sigs.k8s.io/controller-runtime/pkg/builder"
2525
"sigs.k8s.io/controller-runtime/pkg/client"
2626
"sigs.k8s.io/controller-runtime/pkg/controller"
27-
"sigs.k8s.io/controller-runtime/pkg/log"
2827
"sigs.k8s.io/controller-runtime/pkg/manager"
2928
"sigs.k8s.io/controller-runtime/pkg/reconcile"
3029

@@ -68,7 +67,7 @@ func (c *NodeClaimController) Reconcile(ctx context.Context, req reconcile.Reque
6867
// notify cluster state of the node deletion
6968
c.cluster.DeleteNodeClaim(req.Name)
7069
if deleteErr := c.clusterCost.DeleteNodeClaim(ctx, nodeClaim); deleteErr != nil {
71-
log.FromContext(ctx).Error(deleteErr, "failed to remove nodeclaim from cost tracking")
70+
return reconcile.Result{}, deleteErr
7271
}
7372
}
7473
return reconcile.Result{}, client.IgnoreNotFound(err)
@@ -78,7 +77,7 @@ func (c *NodeClaimController) Reconcile(ctx context.Context, req reconcile.Reque
7877
}
7978
c.cluster.UpdateNodeClaim(nodeClaim)
8079
if err := c.clusterCost.UpdateNodeClaim(ctx, nodeClaim); err != nil {
81-
log.FromContext(ctx).Error(err, "failed to process nodeclaim for cost tracking")
80+
return reconcile.Result{}, err
8281
}
8382
// ensure it's aware of any nodes we discover, this is a no-op if the node is already known to our cluster state
8483
return reconcile.Result{RequeueAfter: stateRetryPeriod}, nil

pkg/controllers/state/informer/pricing.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@ import (
2626
"github.com/samber/lo"
2727
"go.uber.org/multierr"
2828

29+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
30+
"k8s.io/apimachinery/pkg/types"
2931
controllerruntime "sigs.k8s.io/controller-runtime"
3032
"sigs.k8s.io/controller-runtime/pkg/client"
3133
"sigs.k8s.io/controller-runtime/pkg/manager"
@@ -61,6 +63,9 @@ func (c *PricingController) Reconcile(ctx context.Context) (reconciler.Result, e
6163
return reconciler.Result{}, err
6264
}
6365

66+
// Add a fake wide open nodepool to capture manually created nodeclaims
67+
npl.Items = append(npl.Items, v1.NodePool{ObjectMeta: metav1.ObjectMeta{UID: types.UID("manual")}})
68+
6469
newNpItMap := make(map[string]map[string]*cloudprovider.InstanceType)
6570
var errs error
6671
for _, np := range npl.Items {
@@ -83,7 +88,7 @@ func (c *PricingController) Reconcile(ctx context.Context) (reconciler.Result, e
8388
})
8489
}
8590
if errs != nil {
86-
return reconciler.Result{}, fmt.Errorf("refreshing pricing info, %w", err)
91+
return reconciler.Result{}, fmt.Errorf("refreshing pricing info, %w", errs)
8792
}
8893
c.npItMap = newNpItMap
8994

pkg/state/cost/cost.go

Lines changed: 18 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -22,12 +22,11 @@ import (
2222
"sync"
2323

2424
opmetrics "github.com/awslabs/operatorpkg/metrics"
25-
"github.com/awslabs/operatorpkg/object"
2625
"github.com/awslabs/operatorpkg/serrors"
2726
"github.com/prometheus/client_golang/prometheus"
2827
"github.com/samber/lo"
2928
corev1 "k8s.io/api/core/v1"
30-
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
29+
"k8s.io/apimachinery/pkg/api/errors"
3130
"k8s.io/apimachinery/pkg/types"
3231
"k8s.io/apimachinery/pkg/util/sets"
3332
"k8s.io/klog/v2"
@@ -216,19 +215,18 @@ func (cc *ClusterCost) UpdateNodeClaim(ctx context.Context, nodeClaim *v1.NodeCl
216215
}
217216

218217
np := &v1.NodePool{}
219-
if err := cc.client.Get(ctx, types.NamespacedName{Name: nodeClaim.Labels[v1.NodePoolLabelKey]}, np); err != nil {
218+
err := cc.client.Get(ctx, types.NamespacedName{Name: nodeClaim.Labels[v1.NodePoolLabelKey]}, np)
219+
if err != nil && !errors.IsNotFound(err) {
220220
failed = true
221221
return serrors.Wrap(err, "nodepool", nodeClaim.Labels[v1.NodePoolLabelKey], "nodeclaim", klog.KObj(nodeClaim))
222222
}
223-
if _, found := lo.Find(nodeClaim.GetOwnerReferences(), func(o metav1.OwnerReference) bool {
224-
return o.Kind == object.GVK(np).Kind && o.UID == np.UID
225-
}); !found {
226-
failed = true
227-
return serrors.Wrap(fmt.Errorf("nodepool not found for nodeclaim"), "nodepool", nodeClaim.Labels[v1.NodePoolLabelKey], "nodeclaim", klog.KObj(nodeClaim))
223+
if errors.IsNotFound(err) {
224+
np.UID = types.UID("manual")
228225
}
226+
229227
cc.Lock()
230228
defer cc.Unlock()
231-
err := cc.internalAddOffering(ctx, np, nodeClaim.Labels[corev1.LabelInstanceTypeStable], nodeClaim.Labels[v1.CapacityTypeLabelKey], nodeClaim.Labels[corev1.LabelTopologyZone], true)
229+
err = cc.internalAddOffering(ctx, np, nodeClaim.Labels[corev1.LabelInstanceTypeStable], nodeClaim.Labels[v1.CapacityTypeLabelKey], nodeClaim.Labels[corev1.LabelTopologyZone], true)
232230
if err != nil {
233231
failed = true
234232
return serrors.Wrap(err, "nodeclaim", klog.KObj(nodeClaim), "nodepool", klog.KObj(np))
@@ -267,15 +265,13 @@ func (cc *ClusterCost) DeleteNodeClaim(ctx context.Context, nodeClaim *v1.NodeCl
267265
nodePoolName := nodeClaim.Labels[v1.NodePoolLabelKey]
268266
np := &v1.NodePool{}
269267
err := cc.client.Get(ctx, client.ObjectKey{Name: nodePoolName}, np)
270-
if err != nil {
268+
if err != nil && !errors.IsNotFound(err) {
271269
failed = true
272270
return serrors.Wrap(err, "nodepool", nodePoolName, "nodeclaim", klog.KObj(nodeClaim))
273271
}
274-
if _, found := lo.Find(nodeClaim.GetOwnerReferences(), func(o metav1.OwnerReference) bool {
275-
return o.Kind == object.GVK(np).Kind && o.UID == np.UID
276-
}); !found {
277-
failed = true
278-
return serrors.Wrap(fmt.Errorf("nodepool not found for nodeclaim"), "nodepool", nodeClaim.Labels[v1.NodePoolLabelKey], "nodeclaim", klog.KObj(nodeClaim))
272+
if errors.IsNotFound(err) {
273+
// Technically not an error, as users can create NodeClaims manually without a NodePool.
274+
np.UID = types.UID("manual")
279275
}
280276
cc.Lock()
281277
defer cc.Unlock()
@@ -363,6 +359,13 @@ func (cc *ClusterCost) internalRemoveOffering(np *v1.NodePool, instanceName, cap
363359
return nil
364360
}
365361

362+
func (cc *ClusterCost) Reset() {
363+
cc.Lock()
364+
defer cc.Unlock()
365+
cc.npCostMap = make(map[types.UID]*NodePoolCost)
366+
cc.nodeClaimSet = make(sets.Set[string])
367+
}
368+
366369
// GetClusterCost returns the total cost of all compute resources across
367370
// all NodePools in the cluster.
368371
func (cc *ClusterCost) GetClusterCost() float64 {

0 commit comments

Comments
 (0)