Skip to content

Commit d2cdf4e

Browse files
committed
chore: throw cost tracking errors out of nodeclaim informer loop
1 parent 54d36cb commit d2cdf4e

File tree

6 files changed

+42
-22
lines changed

6 files changed

+42
-22
lines changed

pkg/controllers/disruption/consolidation_test.go

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1066,6 +1066,7 @@ var _ = Describe("Consolidation", func() {
10661066
corev1.LabelTopologyZone: mostExpSpotOffering.Requirements.Get(corev1.LabelTopologyZone).Any(),
10671067
})
10681068

1069+
ExpectSingletonReconciled(ctx, pricingController)
10691070
rs := test.ReplicaSet()
10701071
ExpectApplied(ctx, env.Client, rs)
10711072
Expect(env.Client.Get(ctx, client.ObjectKeyFromObject(rs), rs)).To(Succeed())
@@ -1180,6 +1181,7 @@ var _ = Describe("Consolidation", func() {
11801181
v1.CapacityTypeLabelKey: spotOffering.Requirements.Get(v1.CapacityTypeLabelKey).Any(),
11811182
corev1.LabelTopologyZone: spotOffering.Requirements.Get(corev1.LabelTopologyZone).Any(),
11821183
})
1184+
ExpectSingletonReconciled(ctx, pricingController)
11831185

11841186
rs := test.ReplicaSet()
11851187
ExpectApplied(ctx, env.Client, rs)
@@ -1714,7 +1716,8 @@ var _ = Describe("Consolidation", func() {
17141716
nodeClaim = lo.Ternary(spotToSpot, spotNodeClaim, nodeClaim)
17151717
node = lo.Ternary(spotToSpot, spotNode, node)
17161718
// Add these spot instance with this special condition to cloud provider instancetypes
1717-
cloudProvider.InstanceTypes = lo.Ternary(spotToSpot, spotInstances, onDemandInstances)
1719+
cloudProvider.InstanceTypes = append(cloudProvider.InstanceTypes, lo.Ternary(spotToSpot, spotInstances, onDemandInstances)...)
1720+
ExpectSingletonReconciled(ctx, pricingController)
17181721

17191722
rs := test.ReplicaSet()
17201723
ExpectApplied(ctx, env.Client, rs)
@@ -2229,6 +2232,7 @@ var _ = Describe("Consolidation", func() {
22292232
currentInstance,
22302233
replacementInstance,
22312234
}
2235+
ExpectSingletonReconciled(ctx, pricingController)
22322236

22332237
// create our RS so we can link a pod to it
22342238
rs := test.ReplicaSet()
@@ -2317,6 +2321,7 @@ var _ = Describe("Consolidation", func() {
23172321
currentInstance,
23182322
replacementInstance,
23192323
}
2324+
ExpectSingletonReconciled(ctx, pricingController)
23202325

23212326
// create our RS so we can link a pod to it
23222327
rs := test.ReplicaSet()
@@ -2917,6 +2922,7 @@ var _ = Describe("Consolidation", func() {
29172922
defaultInstanceType,
29182923
smallInstanceType,
29192924
}
2925+
ExpectSingletonReconciled(ctx, pricingController)
29202926
// create our RS so we can link a pod to it
29212927
rs := test.ReplicaSet()
29222928
ExpectApplied(ctx, env.Client, rs)
@@ -4468,6 +4474,7 @@ var _ = Describe("Consolidation", func() {
44684474
v1alpha1.LabelReservationID: mostExpensiveReservationID,
44694475
}),
44704476
})
4477+
ExpectSingletonReconciled(ctx, pricingController)
44714478
reservedNodeClaim, reservedNode = test.NodeClaimAndNode(v1.NodeClaim{
44724479
ObjectMeta: metav1.ObjectMeta{
44734480
Labels: map[string]string{

pkg/controllers/disruption/emptiness_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -65,8 +65,8 @@ var _ = Describe("Emptiness", func() {
6565
Labels: map[string]string{
6666
v1.NodePoolLabelKey: nodePool.Name,
6767
corev1.LabelInstanceTypeStable: leastExpensiveSpotInstance.Name,
68-
v1.CapacityTypeLabelKey: leastExpensiveOffering.Requirements.Get(v1.CapacityTypeLabelKey).Any(),
69-
corev1.LabelTopologyZone: leastExpensiveOffering.Requirements.Get(corev1.LabelTopologyZone).Any(),
68+
v1.CapacityTypeLabelKey: leastExpensiveSpotOffering.Requirements.Get(v1.CapacityTypeLabelKey).Any(),
69+
corev1.LabelTopologyZone: leastExpensiveSpotOffering.Requirements.Get(corev1.LabelTopologyZone).Any(),
7070
},
7171
},
7272
Status: v1.NodeClaimStatus{

pkg/controllers/disruption/suite_test.go

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,7 @@ var env *test.Environment
6767
var clusterCost *cost.ClusterCost
6868
var cluster *state.Cluster
6969
var disruptionController *disruption.Controller
70+
var pricingController *informer.PricingController
7071
var prov *provisioning.Provisioner
7172
var cloudProvider *fake.CloudProvider
7273
var nodeStateController *informer.NodeController
@@ -95,6 +96,7 @@ var _ = BeforeSuite(func() {
9596
cloudProvider = fake.NewCloudProvider()
9697
fakeClock = clock.NewFakeClock(time.Now())
9798
clusterCost = cost.NewClusterCost(ctx, cloudProvider, env.Client)
99+
pricingController = informer.NewPricingController(env.Client, cloudProvider, clusterCost)
98100
cluster = state.NewCluster(fakeClock, env.Client, cloudProvider)
99101
nodeStateController = informer.NewNodeController(env.Client, cluster)
100102
nodeClaimStateController = informer.NewNodeClaimController(env.Client, cloudProvider, cluster, clusterCost)
@@ -110,6 +112,8 @@ var _ = AfterSuite(func() {
110112
var _ = BeforeEach(func() {
111113
cloudProvider.Reset()
112114
cloudProvider.InstanceTypes = fake.InstanceTypesAssorted()
115+
clusterCost.Reset()
116+
ExpectSingletonReconciled(ctx, pricingController)
113117

114118
recorder.Reset() // Reset the events that we captured during the run
115119

@@ -580,6 +584,7 @@ var _ = Describe("Disruption Taints", func() {
580584
currentInstance,
581585
replacementInstance,
582586
}
587+
ExpectSingletonReconciled(ctx, pricingController)
583588
nodePool.Spec.Disruption.ConsolidateAfter.Duration = lo.ToPtr(time.Duration(0))
584589
nodeClaim.StatusConditions().SetTrue(v1.ConditionTypeConsolidatable)
585590
ExpectApplied(ctx, env.Client, nodeClaim, nodePool)
@@ -1778,8 +1783,7 @@ var _ = Describe("Candidate Filtering", func() {
17781783
},
17791784
},
17801785
})
1781-
// Don't apply the NodePool
1782-
ExpectApplied(ctx, env.Client, nodeClaim, node)
1786+
ExpectApplied(ctx, env.Client, nodePool, nodeClaim, node)
17831787
ExpectMakeNodesAndNodeClaimsInitializedAndStateUpdated(ctx, env.Client, nodeStateController, nodeClaimStateController, []*corev1.Node{node}, []*v1.NodeClaim{nodeClaim})
17841788

17851789
// Mock the NodePool not existing by removing it from the nodePool and nodePoolInstanceTypes maps

pkg/controllers/state/informer/nodeclaim.go

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@ import (
2424
"sigs.k8s.io/controller-runtime/pkg/builder"
2525
"sigs.k8s.io/controller-runtime/pkg/client"
2626
"sigs.k8s.io/controller-runtime/pkg/controller"
27-
"sigs.k8s.io/controller-runtime/pkg/log"
2827
"sigs.k8s.io/controller-runtime/pkg/manager"
2928
"sigs.k8s.io/controller-runtime/pkg/reconcile"
3029

@@ -68,7 +67,7 @@ func (c *NodeClaimController) Reconcile(ctx context.Context, req reconcile.Reque
6867
// notify cluster state of the node deletion
6968
c.cluster.DeleteNodeClaim(req.Name)
7069
if deleteErr := c.clusterCost.DeleteNodeClaim(ctx, nodeClaim); deleteErr != nil {
71-
log.FromContext(ctx).Error(deleteErr, "failed to remove nodeclaim from cost tracking")
70+
return reconcile.Result{}, deleteErr
7271
}
7372
}
7473
return reconcile.Result{}, client.IgnoreNotFound(err)
@@ -78,7 +77,7 @@ func (c *NodeClaimController) Reconcile(ctx context.Context, req reconcile.Reque
7877
}
7978
c.cluster.UpdateNodeClaim(nodeClaim)
8079
if err := c.clusterCost.UpdateNodeClaim(ctx, nodeClaim); err != nil {
81-
log.FromContext(ctx).Error(err, "failed to process nodeclaim for cost tracking")
80+
return reconcile.Result{}, err
8281
}
8382
// ensure it's aware of any nodes we discover, this is a no-op if the node is already known to our cluster state
8483
return reconcile.Result{RequeueAfter: stateRetryPeriod}, nil

pkg/controllers/state/informer/pricing.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@ import (
2626
"github.com/samber/lo"
2727
"go.uber.org/multierr"
2828

29+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
30+
"k8s.io/apimachinery/pkg/types"
2931
controllerruntime "sigs.k8s.io/controller-runtime"
3032
"sigs.k8s.io/controller-runtime/pkg/client"
3133
"sigs.k8s.io/controller-runtime/pkg/manager"
@@ -61,6 +63,9 @@ func (c *PricingController) Reconcile(ctx context.Context) (reconciler.Result, e
6163
return reconciler.Result{}, err
6264
}
6365

66+
// Add a fake wide open nodepool to capture manually created nodeclaims
67+
npl.Items = append(npl.Items, v1.NodePool{ObjectMeta: metav1.ObjectMeta{UID: types.UID("manual")}})
68+
6469
newNpItMap := make(map[string]map[string]*cloudprovider.InstanceType)
6570
var errs error
6671
for _, np := range npl.Items {
@@ -83,7 +88,7 @@ func (c *PricingController) Reconcile(ctx context.Context) (reconciler.Result, e
8388
})
8489
}
8590
if errs != nil {
86-
return reconciler.Result{}, fmt.Errorf("refreshing pricing info, %w", err)
91+
return reconciler.Result{}, fmt.Errorf("refreshing pricing info, %w", errs)
8792
}
8893
c.npItMap = newNpItMap
8994

pkg/state/cost/cost.go

Lines changed: 18 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ import (
2727
"github.com/prometheus/client_golang/prometheus"
2828
"github.com/samber/lo"
2929
corev1 "k8s.io/api/core/v1"
30+
"k8s.io/apimachinery/pkg/api/errors"
3031
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
3132
"k8s.io/apimachinery/pkg/types"
3233
"k8s.io/apimachinery/pkg/util/sets"
@@ -216,19 +217,18 @@ func (cc *ClusterCost) UpdateNodeClaim(ctx context.Context, nodeClaim *v1.NodeCl
216217
}
217218

218219
np := &v1.NodePool{}
219-
if err := cc.client.Get(ctx, types.NamespacedName{Name: nodeClaim.Labels[v1.NodePoolLabelKey]}, np); err != nil {
220+
err := cc.client.Get(ctx, types.NamespacedName{Name: nodeClaim.Labels[v1.NodePoolLabelKey]}, np)
221+
if err != nil && !errors.IsNotFound(err) {
220222
failed = true
221223
return serrors.Wrap(err, "nodepool", nodeClaim.Labels[v1.NodePoolLabelKey], "nodeclaim", klog.KObj(nodeClaim))
222224
}
223-
if _, found := lo.Find(nodeClaim.GetOwnerReferences(), func(o metav1.OwnerReference) bool {
224-
return o.Kind == object.GVK(np).Kind && o.UID == np.UID
225-
}); !found {
226-
failed = true
227-
return serrors.Wrap(fmt.Errorf("nodepool not found for nodeclaim"), "nodepool", nodeClaim.Labels[v1.NodePoolLabelKey], "nodeclaim", klog.KObj(nodeClaim))
225+
if errors.IsNotFound(err) {
226+
np.UID = types.UID("manual")
228227
}
228+
229229
cc.Lock()
230230
defer cc.Unlock()
231-
err := cc.internalAddOffering(ctx, np, nodeClaim.Labels[corev1.LabelInstanceTypeStable], nodeClaim.Labels[v1.CapacityTypeLabelKey], nodeClaim.Labels[corev1.LabelTopologyZone], true)
231+
err = cc.internalAddOffering(ctx, np, nodeClaim.Labels[corev1.LabelInstanceTypeStable], nodeClaim.Labels[v1.CapacityTypeLabelKey], nodeClaim.Labels[corev1.LabelTopologyZone], true)
232232
if err != nil {
233233
failed = true
234234
return serrors.Wrap(err, "nodeclaim", klog.KObj(nodeClaim), "nodepool", klog.KObj(np))
@@ -267,15 +267,13 @@ func (cc *ClusterCost) DeleteNodeClaim(ctx context.Context, nodeClaim *v1.NodeCl
267267
nodePoolName := nodeClaim.Labels[v1.NodePoolLabelKey]
268268
np := &v1.NodePool{}
269269
err := cc.client.Get(ctx, client.ObjectKey{Name: nodePoolName}, np)
270-
if err != nil {
270+
if err != nil && !errors.IsNotFound(err){
271271
failed = true
272272
return serrors.Wrap(err, "nodepool", nodePoolName, "nodeclaim", klog.KObj(nodeClaim))
273273
}
274-
if _, found := lo.Find(nodeClaim.GetOwnerReferences(), func(o metav1.OwnerReference) bool {
275-
return o.Kind == object.GVK(np).Kind && o.UID == np.UID
276-
}); !found {
277-
failed = true
278-
return serrors.Wrap(fmt.Errorf("nodepool not found for nodeclaim"), "nodepool", nodeClaim.Labels[v1.NodePoolLabelKey], "nodeclaim", klog.KObj(nodeClaim))
274+
if errors.IsNotFound(err) {
275+
// Technically not an error, as users can create NodeClaims manually without a NodePool.
276+
np.UID = types.UID("manual")
279277
}
280278
cc.Lock()
281279
defer cc.Unlock()
@@ -363,6 +361,13 @@ func (cc *ClusterCost) internalRemoveOffering(np *v1.NodePool, instanceName, cap
363361
return nil
364362
}
365363

364+
func (cc *ClusterCost) Reset() {
365+
cc.Lock()
366+
defer cc.Unlock()
367+
cc.npCostMap = make(map[types.UID]*NodePoolCost)
368+
cc.nodeClaimSet = make(sets.Set[string])
369+
}
370+
366371
// GetClusterCost returns the total cost of all compute resources across
367372
// all NodePools in the cluster.
368373
func (cc *ClusterCost) GetClusterCost() float64 {

0 commit comments

Comments
 (0)