Skip to content

Commit cb272b8

Browse files
authored
Merge pull request #51 from Mmduh-483/machine-no-provider-id
Tolerate Machine ProviderID to support slow Infrastructure Providers
2 parents a28d7b2 + b64a507 commit cb272b8

File tree

4 files changed

+175
-85
lines changed

4 files changed

+175
-85
lines changed

pkg/cloudprovider/cloudprovider.go

Lines changed: 154 additions & 74 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import (
2121
"context"
2222
_ "embed"
2323
"fmt"
24+
"log"
2425
"slices"
2526
"strings"
2627
"sync"
@@ -56,6 +57,8 @@ const (
5657
labelsKey = "capacity.cluster-autoscaler.kubernetes.io/labels"
5758
taintsKey = "capacity.cluster-autoscaler.kubernetes.io/taints"
5859
maxPodsKey = "capacity.cluster-autoscaler.kubernetes.io/maxPods"
60+
61+
machineAnnotation = "cluster.x-k8s.io/machine"
5962
)
6063

6164
func NewCloudProvider(ctx context.Context, kubeClient client.Client, machineProvider machine.Provider, machineDeploymentProvider machinedeployment.Provider) *CloudProvider {
@@ -89,67 +92,13 @@ func (c *CloudProvider) Create(ctx context.Context, nodeClaim *karpv1.NodeClaim)
8992
return nil, fmt.Errorf("cannot satisfy create, NodeClaim is nil")
9093
}
9194

92-
nodeClass, err := c.resolveNodeClassFromNodeClaim(ctx, nodeClaim)
93-
if err != nil {
94-
return nil, fmt.Errorf("cannot satisfy create, unable to resolve NodeClass from NodeClaim %q: %w", nodeClaim.Name, err)
95-
}
96-
97-
instanceTypes, err := c.findInstanceTypesForNodeClass(ctx, nodeClass)
98-
if err != nil {
99-
return nil, fmt.Errorf("cannot satisfy create, unable to get instance types for NodeClass %q of NodeClaim %q: %w", nodeClass.Name, nodeClaim.Name, err)
100-
}
101-
102-
// identify which fit requirements
103-
compatibleInstanceTypes := filterCompatibleInstanceTypes(instanceTypes, nodeClaim)
104-
if len(compatibleInstanceTypes) == 0 {
105-
return nil, cloudprovider.NewInsufficientCapacityError(fmt.Errorf("cannot satisfy create, no compatible instance types found"))
106-
}
107-
108-
// TODO (elmiko) if multiple instance types are found to be compatible we need to select one.
109-
// for now, we sort by resource name and take the first in the list. In the future, this should
110-
// be an option or something more useful like minimum size or cost.
111-
slices.SortFunc(compatibleInstanceTypes, func(a, b *ClusterAPIInstanceType) int {
112-
return cmp.Compare(strings.ToLower(a.Name), strings.ToLower(b.Name))
113-
})
114-
selectedInstanceType := compatibleInstanceTypes[0]
115-
116-
// once scalable resource is identified, increase replicas
117-
machineDeployment, err := c.machineDeploymentProvider.Get(ctx, selectedInstanceType.MachineDeploymentName, selectedInstanceType.MachineDeploymentNamespace)
118-
if err != nil {
119-
return nil, fmt.Errorf("cannot satisfy create, unable to find MachineDeployment %q for InstanceType %q: %w", selectedInstanceType.MachineDeploymentName, selectedInstanceType.Name, err)
120-
}
121-
originalReplicas := *machineDeployment.Spec.Replicas
122-
machineDeployment.Spec.Replicas = ptr.To(originalReplicas + 1)
123-
if err := c.machineDeploymentProvider.Update(ctx, machineDeployment); err != nil {
124-
return nil, fmt.Errorf("cannot satisfy create, unable to update MachineDeployment %q replicas: %w", machineDeployment.Name, err)
125-
}
126-
127-
// TODO (elmiko) it would be nice to have a more elegant solution to the asynchronous machine creation.
128-
// Initially, it appeared that we could have a Machine controller which could reconcile new Machines and
129-
// then associate them with NodeClaims by using a sentinel value for the Provider ID. But, this may not
130-
// work as we expect since the karpenter core can use the Provider ID as a key into one of its internal caches.
131-
// For now, the method of waiting for the Machine seemed straightforward although it does make the `Create` method a blocking call.
132-
// Try to find an unclaimed Machine resource for 1 minute.
133-
machine, err := c.pollForUnclaimedMachineInMachineDeploymentWithTimeout(ctx, machineDeployment, time.Minute)
95+
machineDeployment, machine, err := c.provisionMachine(ctx, nodeClaim)
13496
if err != nil {
135-
// unable to find a Machine for the NodeClaim, this could be due to timeout or error, but the replica count needs to be reset.
136-
// TODO (elmiko) this could probably use improvement to make it more resilient to errors.
137-
machineDeployment.Spec.Replicas = ptr.To(originalReplicas)
138-
if err := c.machineDeploymentProvider.Update(ctx, machineDeployment); err != nil {
139-
return nil, fmt.Errorf("cannot satisfy create, error while recovering from failure to find an unclaimed Machine: %w", err)
140-
}
141-
return nil, fmt.Errorf("cannot satisfy create, unable to find an unclaimed Machine for MachineDeployment %q: %w", machineDeployment.Name, err)
97+
return nil, err
14298
}
14399

144-
// now that we have a Machine for the NodeClaim, we label it as a karpenter member
145-
labels := machine.GetLabels()
146-
labels[providers.NodePoolMemberLabel] = ""
147-
machine.SetLabels(labels)
148-
if err := c.machineProvider.Update(ctx, machine); err != nil {
149-
// if we can't update the Machine with the member label, we need to unwind the addition
150-
// TODO (elmiko) add more logic here to fix the error, if we are in this state it's not clear how to fix,
151-
// since we have a Machine, we should be reducing the replicas and annotating the Machine for deletion.
152-
return nil, fmt.Errorf("cannot satisfy create, unable to label Machine %q as a member: %w", machine.Name, err)
100+
if machine.Spec.ProviderID == nil {
101+
return nil, fmt.Errorf("cannot satisfy create, waiting for Machine %q to have ProviderID", machine.Name)
153102
}
154103

155104
// fill out nodeclaim with details
@@ -164,15 +113,29 @@ func (c *CloudProvider) Delete(ctx context.Context, nodeClaim *karpv1.NodeClaim)
164113
c.accessLock.Lock()
165114
defer c.accessLock.Unlock()
166115

167-
if len(nodeClaim.Status.ProviderID) == 0 {
168-
return fmt.Errorf("NodeClaim %q does not have a provider ID, cannot delete", nodeClaim.Name)
169-
}
116+
var machine *capiv1beta1.Machine
117+
var err error
170118

171119
// find machine
172-
machine, err := c.machineProvider.Get(ctx, nodeClaim.Status.ProviderID)
173-
if err != nil {
174-
return fmt.Errorf("error finding Machine with provider ID %q to Delete NodeClaim %q: %w", nodeClaim.Status.ProviderID, nodeClaim.Name, err)
120+
if len(nodeClaim.Status.ProviderID) != 0 {
121+
machine, err = c.machineProvider.GetByProviderID(ctx, nodeClaim.Status.ProviderID)
122+
if err != nil {
123+
return fmt.Errorf("error finding Machine with provider ID %q to Delete NodeClaim %q: %w", nodeClaim.Status.ProviderID, nodeClaim.Name, err)
124+
}
125+
} else if machineAnno, ok := nodeClaim.Annotations[machineAnnotation]; ok {
126+
machineNamespace, machineName, err := parseMachineAnnotation(machineAnno)
127+
if err != nil {
128+
return fmt.Errorf("error parsing machine annotation: %w", err)
129+
}
130+
131+
machine, err = c.machineProvider.Get(ctx, machineName, machineNamespace)
132+
if err != nil {
133+
return fmt.Errorf("error finding Machine %q in namespace %s to Delete NodeClaim %q: %w", machineName, machineNamespace, nodeClaim.Name, err)
134+
}
135+
} else {
136+
return fmt.Errorf("NodeClaim %q does not have a provider ID or Machine annotations, cannot delete", nodeClaim.Name)
175137
}
138+
176139
if machine == nil {
177140
return cloudprovider.NewNodeClaimNotFoundError(fmt.Errorf("unable to find Machine with provider ID %q to Delete NodeClaim %q", nodeClaim.Status.ProviderID, nodeClaim.Name))
178141
}
@@ -225,7 +188,7 @@ func (c *CloudProvider) Get(ctx context.Context, providerID string) (*karpv1.Nod
225188
return nil, fmt.Errorf("no providerID supplied to Get, cannot continue")
226189
}
227190

228-
machine, err := c.machineProvider.Get(ctx, providerID)
191+
machine, err := c.machineProvider.GetByProviderID(ctx, providerID)
229192
if err != nil {
230193
return nil, fmt.Errorf("error getting Machine: %w", err)
231194
}
@@ -318,6 +281,113 @@ func (c *CloudProvider) RepairPolicies() []cloudprovider.RepairPolicy {
318281
return []cloudprovider.RepairPolicy{}
319282
}
320283

284+
// ProvisionMachine ensures a CAPI Machine exists for the given NodeClaim.
285+
// It creates the Machine if missing, or "gets" it to confirm the asynchronous provisioning status.
286+
func (c *CloudProvider) provisionMachine(ctx context.Context, nodeClaim *karpv1.NodeClaim) (*capiv1beta1.MachineDeployment, *capiv1beta1.Machine, error) {
287+
machineAnno, ok := nodeClaim.Annotations[machineAnnotation]
288+
if !ok {
289+
return c.createMachine(ctx, nodeClaim)
290+
}
291+
292+
machineNamespace, machineName, err := parseMachineAnnotation(machineAnno)
293+
if err != nil {
294+
return nil, nil, fmt.Errorf("error parsing machine annotation: %w", err)
295+
}
296+
297+
machine, err := c.machineProvider.Get(ctx, machineName, machineNamespace)
298+
if err != nil {
299+
return nil, nil, fmt.Errorf("failed to get NodeClaim's Machine %s : %w", machineName, err)
300+
}
301+
302+
machineDeployment, err := c.machineDeploymentFromMachine(ctx, machine)
303+
if err != nil {
304+
return nil, nil, fmt.Errorf("failed to get NodeClaim's MachineDeployment %s : %w", machineName, err)
305+
}
306+
307+
return machineDeployment, machine, nil
308+
}
309+
310+
func (c *CloudProvider) createMachine(ctx context.Context, nodeClaim *karpv1.NodeClaim) (*capiv1beta1.MachineDeployment, *capiv1beta1.Machine, error) {
311+
nodeClass, err := c.resolveNodeClassFromNodeClaim(ctx, nodeClaim)
312+
if err != nil {
313+
return nil, nil, fmt.Errorf("cannot satisfy create, unable to resolve NodeClass from NodeClaim %q: %w", nodeClaim.Name, err)
314+
}
315+
316+
instanceTypes, err := c.findInstanceTypesForNodeClass(ctx, nodeClass)
317+
if err != nil {
318+
return nil, nil, fmt.Errorf("cannot satisfy create, unable to get instance types for NodeClass %q of NodeClaim %q: %w", nodeClass.Name, nodeClaim.Name, err)
319+
}
320+
321+
// identify which fit requirements
322+
compatibleInstanceTypes := filterCompatibleInstanceTypes(instanceTypes, nodeClaim)
323+
if len(compatibleInstanceTypes) == 0 {
324+
return nil, nil, cloudprovider.NewInsufficientCapacityError(fmt.Errorf("cannot satisfy create, no compatible instance types found"))
325+
}
326+
327+
// TODO (elmiko) if multiple instance types are found to be compatible we need to select one.
328+
// for now, we sort by resource name and take the first in the list. In the future, this should
329+
// be an option or something more useful like minimum size or cost.
330+
slices.SortFunc(compatibleInstanceTypes, func(a, b *ClusterAPIInstanceType) int {
331+
return cmp.Compare(strings.ToLower(a.Name), strings.ToLower(b.Name))
332+
})
333+
selectedInstanceType := compatibleInstanceTypes[0]
334+
335+
// once scalable resource is identified, increase replicas
336+
machineDeployment, err := c.machineDeploymentProvider.Get(ctx, selectedInstanceType.MachineDeploymentName, selectedInstanceType.MachineDeploymentNamespace)
337+
if err != nil {
338+
return nil, nil, fmt.Errorf("cannot satisfy create, unable to find MachineDeployment %q for InstanceType %q: %w", selectedInstanceType.MachineDeploymentName, selectedInstanceType.Name, err)
339+
}
340+
originalReplicas := *machineDeployment.Spec.Replicas
341+
machineDeployment.Spec.Replicas = ptr.To(originalReplicas + 1)
342+
if err := c.machineDeploymentProvider.Update(ctx, machineDeployment); err != nil {
343+
return nil, nil, fmt.Errorf("cannot satisfy create, unable to update MachineDeployment %q replicas: %w", machineDeployment.Name, err)
344+
}
345+
346+
// TODO (elmiko) it would be nice to have a more elegant solution to the asynchronous machine creation.
347+
// Initially, it appeared that we could have a Machine controller which could reconcile new Machines and
348+
// then associate them with NodeClaims by using a sentinel value for the Provider ID. But, this may not
349+
// work as we expect since the karpenter core can use the Provider ID as a key into one of its internal caches.
350+
// For now, the method of waiting for the Machine seemed straightforward although it does make the `Create` method a blocking call.
351+
// Try to find an unclaimed Machine resource for 1 minute.
352+
machine, err := c.pollForUnclaimedMachineInMachineDeploymentWithTimeout(ctx, machineDeployment, time.Minute)
353+
if err != nil {
354+
// unable to find a Machine for the NodeClaim, this could be due to timeout or error, but the replica count needs to be reset.
355+
// TODO (elmiko) this could probably use improvement to make it more resilient to errors.
356+
defer func() {
357+
machineDeployment, err = c.machineDeploymentProvider.Get(ctx, selectedInstanceType.MachineDeploymentName, selectedInstanceType.MachineDeploymentNamespace)
358+
if err != nil {
359+
log.Println(fmt.Errorf("error while recovering from failure to find an unclaimed Machine, unable to find MachineDeployment %q for InstanceType %q: %w", selectedInstanceType.MachineDeploymentName, selectedInstanceType.Name, err))
360+
}
361+
362+
machineDeployment.Spec.Replicas = ptr.To(originalReplicas)
363+
if err = c.machineDeploymentProvider.Update(ctx, machineDeployment); err != nil {
364+
log.Println(fmt.Errorf("error while recovering from failure to find an unclaimed Machine: %w", err))
365+
}
366+
}()
367+
368+
return nil, nil, fmt.Errorf("cannot satisfy create, unable to find an unclaimed Machine for MachineDeployment %q: %w", machineDeployment.Name, err)
369+
}
370+
371+
// now that we have a Machine for the NodeClaim, we label it as a karpenter member
372+
labels := machine.GetLabels()
373+
labels[providers.NodePoolMemberLabel] = ""
374+
machine.SetLabels(labels)
375+
if err := c.machineProvider.Update(ctx, machine); err != nil {
376+
// if we can't update the Machine with the member label, we need to unwind the addition
377+
// TODO (elmiko) add more logic here to fix the error, if we are in this state it's not clear how to fix,
378+
// since we have a Machine, we should be reducing the replicas and annotating the Machine for deletion.
379+
return nil, nil, fmt.Errorf("cannot satisfy create, unable to label Machine %q as a member: %w", machine.Name, err)
380+
}
381+
382+
// Bind the NodeClaim with this machine.
383+
nodeClaim.Annotations[machineAnnotation] = fmt.Sprintf("%s/%s", machine.Namespace, machine.Name)
384+
if err = c.kubeClient.Update(ctx, nodeClaim); err != nil {
385+
return nil, nil, fmt.Errorf("cannot satisfy create, unable to update NodeClaim annotations %q: %w", nodeClaim.Name, err)
386+
}
387+
388+
return machineDeployment, machine, nil
389+
}
390+
321391
func (c *CloudProvider) machineDeploymentFromMachine(ctx context.Context, machine *capiv1beta1.Machine) (*capiv1beta1.MachineDeployment, error) {
322392
mdName, found := machine.GetLabels()[capiv1beta1.MachineDeploymentNameLabel]
323393
if !found {
@@ -421,15 +491,8 @@ func (c *CloudProvider) pollForUnclaimedMachineInMachineDeploymentWithTimeout(ct
421491
return false, nil
422492
}
423493

424-
// find the first machine with a provider id
425-
for i, m := range machineList {
426-
if m.Spec.ProviderID != nil {
427-
machine = machineList[i]
428-
return true, nil
429-
}
430-
}
431-
432-
return false, nil
494+
machine = machineList[0]
495+
return true, nil
433496
})
434497
if err != nil {
435498
return nil, fmt.Errorf("error polling for an unclaimed Machine in MachineDeployment %q: %w", machineDeployment.Name, err)
@@ -669,3 +732,20 @@ func zoneLabelFromLabels(labels map[string]string) string {
669732

670733
return zone
671734
}
735+
736+
func parseMachineAnnotation(annotationValue string) (string, string, error) {
737+
parts := strings.Split(annotationValue, "/")
738+
if len(parts) != 2 {
739+
return "", "", fmt.Errorf("invalid machine annotations '%s'. Expected 'namespace/name'", annotationValue)
740+
}
741+
742+
ns := strings.TrimSpace(parts[0])
743+
name := strings.TrimSpace(parts[1])
744+
745+
// Additional validation for empty strings
746+
if ns == "" || name == "" {
747+
return "", "", fmt.Errorf("invalid machine format '%s'. Namespace and name cannot be empty", annotationValue)
748+
}
749+
750+
return ns, name, nil
751+
}

pkg/cloudprovider/cloudprovider_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,7 @@ var _ = Describe("CloudProvider.Delete method", func() {
105105
nodeClaim := karpv1.NodeClaim{}
106106
nodeClaim.Name = "some-node-claim"
107107
err := provider.Delete(context.Background(), &nodeClaim)
108-
Expect(err).To(MatchError(fmt.Errorf("NodeClaim %q does not have a provider ID, cannot delete", nodeClaim.Name)))
108+
Expect(err).To(MatchError(fmt.Errorf("NodeClaim %q does not have a provider ID or Machine annotations, cannot delete", nodeClaim.Name)))
109109
})
110110

111111
It("returns an error when the referenced Machine is not found", func() {
@@ -195,7 +195,7 @@ var _ = Describe("CloudProvider.Delete method", func() {
195195
Expect(err).ToNot(HaveOccurred())
196196

197197
Eventually(func() map[string]string {
198-
m, err := provider.machineProvider.Get(context.Background(), providerID)
198+
m, err := provider.machineProvider.GetByProviderID(context.Background(), providerID)
199199
Expect(err).ToNot(HaveOccurred())
200200
return m.GetAnnotations()
201201
}).Should(HaveKey(capiv1beta1.DeleteMachineAnnotation))

pkg/providers/machine/machine.go

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,8 @@ import (
2727
)
2828

2929
type Provider interface {
30-
Get(context.Context, string) (*capiv1beta1.Machine, error)
30+
Get(context.Context, string, string) (*capiv1beta1.Machine, error)
31+
GetByProviderID(context.Context, string) (*capiv1beta1.Machine, error)
3132
List(context.Context, *metav1.LabelSelector) ([]*capiv1beta1.Machine, error)
3233
IsDeleting(*capiv1beta1.Machine) bool
3334
AddDeleteAnnotation(context.Context, *capiv1beta1.Machine) error
@@ -45,10 +46,19 @@ func NewDefaultProvider(_ context.Context, kubeClient client.Client) *DefaultPro
4546
}
4647
}
4748

48-
// Get returns the Machine indicated by the supplied Provider ID or nil if not found.
49+
func (p *DefaultProvider) Get(ctx context.Context, name string, namespace string) (*capiv1beta1.Machine, error) {
50+
machine := &capiv1beta1.Machine{}
51+
err := p.kubeClient.Get(ctx, client.ObjectKey{Name: name, Namespace: namespace}, machine)
52+
if err != nil {
53+
machine = nil
54+
}
55+
return machine, err
56+
}
57+
58+
// GetByProviderID returns the Machine indicated by the supplied Provider ID or nil if not found.
4959
// Because Get is used with a provider ID, it may return a Machine that does not have
5060
// a label for node pool membership.
51-
func (p *DefaultProvider) Get(ctx context.Context, providerID string) (*capiv1beta1.Machine, error) {
61+
func (p *DefaultProvider) GetByProviderID(ctx context.Context, providerID string) (*capiv1beta1.Machine, error) {
5262
machineList := &capiv1beta1.MachineList{}
5363
err := p.kubeClient.List(ctx, machineList)
5464
if err != nil {

pkg/providers/machine/machine_test.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ var _ = Describe("Machine DefaultProvider.Get method", func() {
7777
})
7878

7979
It("returns nil when there are no Machines present in API", func() {
80-
machine, err := provider.Get(context.Background(), "")
80+
machine, err := provider.GetByProviderID(context.Background(), "")
8181
Expect(err).ToNot(HaveOccurred())
8282
Expect(machine).To(BeNil())
8383
})
@@ -86,7 +86,7 @@ var _ = Describe("Machine DefaultProvider.Get method", func() {
8686
machine := newMachine("karpenter-1", "karpenter-cluster", true)
8787
Expect(cl.Create(context.Background(), machine)).To(Succeed())
8888

89-
machine, err := provider.Get(context.Background(), "clusterapi://the-wrong-provider-id")
89+
machine, err := provider.GetByProviderID(context.Background(), "clusterapi://the-wrong-provider-id")
9090
Expect(err).ToNot(HaveOccurred())
9191
Expect(machine).To(BeNil())
9292
})
@@ -98,7 +98,7 @@ var _ = Describe("Machine DefaultProvider.Get method", func() {
9898
Expect(cl.Create(context.Background(), machine)).To(Succeed())
9999

100100
providerID := *machine.Spec.ProviderID
101-
machine, err := provider.Get(context.Background(), providerID)
101+
machine, err := provider.GetByProviderID(context.Background(), providerID)
102102
Expect(err).ToNot(HaveOccurred())
103103
Expect(machine).Should(HaveField("Name", "karpenter-2"))
104104
})
@@ -110,7 +110,7 @@ var _ = Describe("Machine DefaultProvider.Get method", func() {
110110
Expect(cl.Create(context.Background(), machine)).To(Succeed())
111111

112112
providerID := *machine.Spec.ProviderID
113-
machine, err := provider.Get(context.Background(), providerID)
113+
machine, err := provider.GetByProviderID(context.Background(), providerID)
114114
Expect(err).ToNot(HaveOccurred())
115115
Expect(machine).Should(HaveField("Name", "karpenter-2"))
116116
})
@@ -213,7 +213,7 @@ var _ = Describe("Machine DefaultProvider.AddDeleteAnnotation method", func() {
213213
Expect(err).ToNot(HaveOccurred())
214214

215215
Eventually(func() map[string]string {
216-
m, err := provider.Get(context.Background(), *machine.Spec.ProviderID)
216+
m, err := provider.GetByProviderID(context.Background(), *machine.Spec.ProviderID)
217217
Expect(err).ToNot(HaveOccurred())
218218
return m.GetAnnotations()
219219
}).Should(HaveKey(capiv1beta1.DeleteMachineAnnotation))
@@ -263,7 +263,7 @@ var _ = Describe("Machine DefaultProvider.RemoveDeleteAnnotation method", func()
263263
Expect(err).ToNot(HaveOccurred())
264264

265265
Eventually(func() map[string]string {
266-
m, err := provider.Get(context.Background(), *machine.Spec.ProviderID)
266+
m, err := provider.GetByProviderID(context.Background(), *machine.Spec.ProviderID)
267267
Expect(err).ToNot(HaveOccurred())
268268
return m.GetAnnotations()
269269
}).ShouldNot(HaveKey(capiv1beta1.DeleteMachineAnnotation))

0 commit comments

Comments
 (0)