Skip to content

Commit 880b75c

Browse files
committed
Associate loadBalancerIP to the network when specified in the Spec
1 parent 90adbee commit 880b75c

File tree

2 files changed

+187
-18
lines changed

2 files changed

+187
-18
lines changed

cloudstack.go

Lines changed: 106 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,14 +21,20 @@ package cloudstack
2121

2222
import (
2323
"context"
24+
"encoding/json"
2425
"errors"
2526
"fmt"
2627
"io"
2728
"os"
29+
"time"
2830

2931
"github.com/apache/cloudstack-go/v2/cloudstack"
3032
"gopkg.in/gcfg.v1"
33+
corev1 "k8s.io/api/core/v1"
34+
apierrors "k8s.io/apimachinery/pkg/api/errors"
35+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
3136
"k8s.io/apimachinery/pkg/types"
37+
"k8s.io/client-go/kubernetes"
3238
cloudprovider "k8s.io/cloud-provider"
3339
"k8s.io/klog/v2"
3440
)
@@ -50,9 +56,10 @@ type CSConfig struct {
5056

5157
// CSCloud is an implementation of Interface for CloudStack.
5258
type CSCloud struct {
53-
client *cloudstack.CloudStackClient
54-
projectID string // If non-"", all resources will be created within this project
55-
zone string
59+
client *cloudstack.CloudStackClient
60+
projectID string // If non-"", all resources will be created within this project
61+
zone string
62+
clientBuilder cloudprovider.ControllerClientBuilder
5663
}
5764

5865
func init() {
@@ -100,6 +107,7 @@ func newCSCloud(cfg *CSConfig) (*CSCloud, error) {
100107

101108
// Initialize passes a Kubernetes clientBuilder interface to the cloud provider
102109
func (cs *CSCloud) Initialize(clientBuilder cloudprovider.ControllerClientBuilder, stop <-chan struct{}) {
110+
cs.clientBuilder = clientBuilder
103111
}
104112

105113
// LoadBalancer returns an implementation of LoadBalancer for CloudStack.
@@ -238,3 +246,98 @@ func (cs *CSCloud) GetZoneByNodeName(ctx context.Context, nodeName types.NodeNam
238246

239247
return zone, nil
240248
}
249+
250+
// setServiceAnnotation updates a service annotation using the Kubernetes client.
251+
// It uses a patch operation with retry logic to handle concurrent updates safely.
252+
func (cs *CSCloud) setServiceAnnotation(ctx context.Context, service *corev1.Service, key, value string) error {
253+
if cs.clientBuilder == nil {
254+
klog.V(4).Infof("Client builder not available, skipping annotation update for service %s/%s", service.Namespace, service.Name)
255+
return nil
256+
}
257+
258+
client, err := cs.clientBuilder.Client("cloud-controller-manager")
259+
if err != nil {
260+
return fmt.Errorf("failed to get Kubernetes client: %v", err)
261+
}
262+
263+
// First, check if the annotation already has the correct value to avoid unnecessary updates
264+
svc, err := client.CoreV1().Services(service.Namespace).Get(ctx, service.Name, metav1.GetOptions{})
265+
if err != nil {
266+
if apierrors.IsNotFound(err) {
267+
klog.V(4).Infof("Service %s/%s not found, skipping annotation update", service.Namespace, service.Name)
268+
return nil
269+
}
270+
return fmt.Errorf("failed to get service: %v", err)
271+
}
272+
273+
// Check if annotation already has the correct value
274+
if svc.Annotations != nil {
275+
if existingValue, exists := svc.Annotations[key]; exists && existingValue == value {
276+
klog.V(4).Infof("Annotation %s already set to %s for service %s/%s", key, value, service.Namespace, service.Name)
277+
return nil
278+
}
279+
}
280+
281+
// Use patch operation with retry logic to handle concurrent updates
282+
return cs.patchServiceAnnotation(ctx, client, service.Namespace, service.Name, key, value)
283+
}
284+
285+
// patchServiceAnnotation patches a service annotation using a JSON merge patch with retry logic.
286+
// This method handles concurrent updates safely by retrying on conflicts.
287+
func (cs *CSCloud) patchServiceAnnotation(ctx context.Context, client kubernetes.Interface, namespace, name, key, value string) error {
288+
const maxRetries = 3
289+
const retryDelay = 500 * time.Millisecond
290+
291+
// Prepare the patch payload - merge patch that updates only the specific annotation
292+
// JSON merge patch will preserve other annotations while updating/adding this one
293+
patchData := map[string]interface{}{
294+
"metadata": map[string]interface{}{
295+
"annotations": map[string]string{
296+
key: value,
297+
},
298+
},
299+
}
300+
301+
patchBytes, err := json.Marshal(patchData)
302+
if err != nil {
303+
return fmt.Errorf("failed to marshal patch data: %v", err)
304+
}
305+
306+
for attempt := 0; attempt < maxRetries; attempt++ {
307+
// Apply the patch using JSON merge patch type
308+
// This is atomic and avoids race conditions by merging with existing annotations
309+
_, err = client.CoreV1().Services(namespace).Patch(
310+
ctx,
311+
name,
312+
types.MergePatchType,
313+
patchBytes,
314+
metav1.PatchOptions{},
315+
)
316+
317+
if err == nil {
318+
klog.V(4).Infof("Successfully set annotation %s=%s on service %s/%s", key, value, namespace, name)
319+
return nil
320+
}
321+
322+
// Handle conflict errors with retry logic
323+
if apierrors.IsConflict(err) {
324+
if attempt < maxRetries-1 {
325+
klog.V(4).Infof("Conflict updating service %s/%s annotation, retrying (attempt %d/%d): %v", namespace, name, attempt+1, maxRetries, err)
326+
time.Sleep(retryDelay)
327+
continue
328+
}
329+
return fmt.Errorf("failed to update service annotation after %d retries due to conflicts: %v", maxRetries, err)
330+
}
331+
332+
// Handle not found errors
333+
if apierrors.IsNotFound(err) {
334+
klog.V(4).Infof("Service %s/%s not found during patch, skipping annotation update", namespace, name)
335+
return nil
336+
}
337+
338+
// For other errors, return immediately
339+
return fmt.Errorf("failed to patch service annotation: %v", err)
340+
}
341+
342+
return fmt.Errorf("failed to update service annotation after %d attempts", maxRetries)
343+
}

cloudstack_loadbalancer.go

Lines changed: 81 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -44,19 +44,25 @@ const (
4444
ServiceAnnotationLoadBalancerProxyProtocol = "service.beta.kubernetes.io/cloudstack-load-balancer-proxy-protocol"
4545

4646
ServiceAnnotationLoadBalancerLoadbalancerHostname = "service.beta.kubernetes.io/cloudstack-load-balancer-hostname"
47+
48+
// ServiceAnnotationLoadBalancerIPAssociatedByController indicates that the controller
49+
// associated the IP address. This annotation is set by the controller when it associates
50+
// an unallocated IP, and is used to determine if the IP should be disassociated on deletion.
51+
ServiceAnnotationLoadBalancerIPAssociatedByController = "service.beta.kubernetes.io/cloudstack-load-balancer-ip-associated-by-controller" //nolint:gosec
4752
)
4853

4954
type loadBalancer struct {
5055
*cloudstack.CloudStackClient
5156

52-
name string
53-
algorithm string
54-
hostIDs []string
55-
ipAddr string
56-
ipAddrID string
57-
networkID string
58-
projectID string
59-
rules map[string]*cloudstack.LoadBalancerRule
57+
name string
58+
algorithm string
59+
hostIDs []string
60+
ipAddr string
61+
ipAddrID string
62+
networkID string
63+
projectID string
64+
rules map[string]*cloudstack.LoadBalancerRule
65+
ipAssociatedByController bool
6066
}
6167

6268
// GetLoadBalancer returns whether the specified load balancer exists, and if so, what its status is.
@@ -127,6 +133,14 @@ func (cs *CSCloud) EnsureLoadBalancer(ctx context.Context, clusterName string, s
127133
}
128134
}(lb)
129135
}
136+
137+
// If the controller associated the IP and matches the service spec, set the annotation to persist this information.
138+
if lb.ipAssociatedByController && lb.ipAddr == service.Spec.LoadBalancerIP {
139+
if err := cs.setServiceAnnotation(ctx, service, ServiceAnnotationLoadBalancerIPAssociatedByController, "true"); err != nil {
140+
// Log the error but don't fail - the annotation is helpful but not critical
141+
klog.Warningf("Failed to set annotation on service %s/%s: %v", service.Namespace, service.Name, err)
142+
}
143+
}
130144
}
131145

132146
klog.V(4).Infof("Load balancer %v is associated with IP %v", lb.name, lb.ipAddr)
@@ -200,11 +214,11 @@ func (cs *CSCloud) EnsureLoadBalancer(ctx context.Context, clusterName string, s
200214
for _, lbRule := range lb.rules {
201215
protocol := ProtocolFromLoadBalancer(lbRule.Protocol)
202216
if protocol == LoadBalancerProtocolInvalid {
203-
return nil, fmt.Errorf("Error parsing protocol %v: %v", lbRule.Protocol, err)
217+
return nil, fmt.Errorf("error parsing protocol %v: %v", lbRule.Protocol, err)
204218
}
205219
port, err := strconv.ParseInt(lbRule.Publicport, 10, 32)
206220
if err != nil {
207-
return nil, fmt.Errorf("Error parsing port %s: %v", lbRule.Publicport, err)
221+
return nil, fmt.Errorf("error parsing port %s: %v", lbRule.Publicport, err)
208222
}
209223

210224
klog.V(4).Infof("Deleting firewall rules associated with load balancer rule: %v (%v:%v:%v)", lbRule.Name, protocol, lbRule.Publicip, port)
@@ -353,10 +367,52 @@ func (cs *CSCloud) EnsureLoadBalancerDeleted(ctx context.Context, clusterName st
353367
}
354368
}
355369

356-
if lb.ipAddr != "" && lb.ipAddr != service.Spec.LoadBalancerIP {
357-
klog.V(4).Infof("Releasing load balancer IP: %v", lb.ipAddr)
358-
if err := lb.releaseLoadBalancerIP(); err != nil {
359-
return err
370+
if lb.ipAddr != "" {
371+
// If the IP was allocated by the controller (not specified in service spec), release it.
372+
if lb.ipAddr != service.Spec.LoadBalancerIP {
373+
klog.V(4).Infof("Releasing load balancer IP: %v", lb.ipAddr)
374+
if err := lb.releaseLoadBalancerIP(); err != nil {
375+
return err
376+
}
377+
} else {
378+
// If the IP was specified in service spec, check if it was associated by the controller.
379+
// First, check if there's an annotation indicating the controller associated it.
380+
// If not, check if there are any other load balancer rules using this IP.
381+
shouldDisassociate := getBoolFromServiceAnnotation(service, ServiceAnnotationLoadBalancerIPAssociatedByController, false)
382+
383+
if shouldDisassociate {
384+
// Annotation is set, so check if there are any other load balancer rules using this IP.
385+
// Since we've already deleted all rules for this service, any remaining rules must belong
386+
// to other services. If no other rules exist, it's safe to disassociate the IP.
387+
ip, count, err := lb.Address.GetPublicIpAddressByID(lb.ipAddrID)
388+
if err != nil {
389+
klog.Errorf("Error retrieving IP address %v for disassociation check: %v", lb.ipAddr, err)
390+
shouldDisassociate = false
391+
} else if count > 0 && ip.Allocated != "" {
392+
p := lb.LoadBalancer.NewListLoadBalancerRulesParams()
393+
p.SetPublicipid(lb.ipAddrID)
394+
p.SetListall(true)
395+
if lb.projectID != "" {
396+
p.SetProjectid(lb.projectID)
397+
}
398+
otherRules, err := lb.LoadBalancer.ListLoadBalancerRules(p)
399+
if err != nil {
400+
klog.Errorf("Error checking for other load balancer rules using IP %v: %v", lb.ipAddr, err)
401+
shouldDisassociate = false
402+
} else if otherRules.Count > 0 {
403+
// Other load balancer rules are using this IP (other services are using it),
404+
// so don't disassociate.
405+
shouldDisassociate = false
406+
}
407+
}
408+
}
409+
410+
if shouldDisassociate {
411+
klog.V(4).Infof("Disassociating IP %v that was associated by the controller", lb.ipAddr)
412+
if err := lb.releaseLoadBalancerIP(); err != nil {
413+
return err
414+
}
415+
}
360416
}
361417
}
362418

@@ -491,6 +547,7 @@ func (lb *loadBalancer) getPublicIPAddress(loadBalancerIP string) error {
491547

492548
p := lb.Address.NewListPublicIpAddressesParams()
493549
p.SetIpaddress(loadBalancerIP)
550+
p.SetAllocatedonly(false)
494551
p.SetListall(true)
495552

496553
if lb.projectID != "" {
@@ -503,12 +560,16 @@ func (lb *loadBalancer) getPublicIPAddress(loadBalancerIP string) error {
503560
}
504561

505562
if l.Count != 1 {
506-
return fmt.Errorf("could not find IP address %v", loadBalancerIP)
563+
return fmt.Errorf("could not find IP address %v. Found %d addresses", loadBalancerIP, l.Count)
507564
}
508565

509566
lb.ipAddr = l.PublicIpAddresses[0].Ipaddress
510567
lb.ipAddrID = l.PublicIpAddresses[0].Id
511568

569+
// If the IP is not allocated, associate it.
570+
if l.PublicIpAddresses[0].Allocated == "" {
571+
return lb.associatePublicIPAddress()
572+
}
512573
return nil
513574
}
514575

@@ -537,6 +598,10 @@ func (lb *loadBalancer) associatePublicIPAddress() error {
537598
p.SetProjectid(lb.projectID)
538599
}
539600

601+
if lb.ipAddr != "" {
602+
p.SetIpaddress(lb.ipAddr)
603+
}
604+
540605
// Associate a new IP address
541606
r, err := lb.Address.AssociateIpAddress(p)
542607
if err != nil {
@@ -545,6 +610,7 @@ func (lb *loadBalancer) associatePublicIPAddress() error {
545610

546611
lb.ipAddr = r.Ipaddress
547612
lb.ipAddrID = r.Id
613+
lb.ipAssociatedByController = true
548614

549615
return nil
550616
}

0 commit comments

Comments
 (0)