diff --git a/cloudstack.go b/cloudstack.go index 8d78a861..72373f84 100644 --- a/cloudstack.go +++ b/cloudstack.go @@ -21,14 +21,20 @@ package cloudstack import ( "context" + "encoding/json" "errors" "fmt" "io" "os" + "time" "github.com/apache/cloudstack-go/v2/cloudstack" "gopkg.in/gcfg.v1" + corev1 "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/kubernetes" cloudprovider "k8s.io/cloud-provider" "k8s.io/klog/v2" ) @@ -50,9 +56,10 @@ type CSConfig struct { // CSCloud is an implementation of Interface for CloudStack. type CSCloud struct { - client *cloudstack.CloudStackClient - projectID string // If non-"", all resources will be created within this project - zone string + client *cloudstack.CloudStackClient + projectID string // If non-"", all resources will be created within this project + zone string + clientBuilder cloudprovider.ControllerClientBuilder } func init() { @@ -100,6 +107,7 @@ func newCSCloud(cfg *CSConfig) (*CSCloud, error) { // Initialize passes a Kubernetes clientBuilder interface to the cloud provider func (cs *CSCloud) Initialize(clientBuilder cloudprovider.ControllerClientBuilder, stop <-chan struct{}) { + cs.clientBuilder = clientBuilder } // LoadBalancer returns an implementation of LoadBalancer for CloudStack. @@ -238,3 +246,98 @@ func (cs *CSCloud) GetZoneByNodeName(ctx context.Context, nodeName types.NodeNam return zone, nil } + +// setServiceAnnotation updates a service annotation using the Kubernetes client. +// It uses a patch operation with retry logic to handle concurrent updates safely. +func (cs *CSCloud) setServiceAnnotation(ctx context.Context, service *corev1.Service, key, value string) error { + if cs.clientBuilder == nil { + klog.V(4).Infof("Client builder not available, skipping annotation update for service %s/%s", service.Namespace, service.Name) + return nil + } + + client, err := cs.clientBuilder.Client("cloud-controller-manager") + if err != nil { + return fmt.Errorf("failed to get Kubernetes client: %v", err) + } + + // First, check if the annotation already has the correct value to avoid unnecessary updates + svc, err := client.CoreV1().Services(service.Namespace).Get(ctx, service.Name, metav1.GetOptions{}) + if err != nil { + if apierrors.IsNotFound(err) { + klog.V(4).Infof("Service %s/%s not found, skipping annotation update", service.Namespace, service.Name) + return nil + } + return fmt.Errorf("failed to get service: %v", err) + } + + // Check if annotation already has the correct value + if svc.Annotations != nil { + if existingValue, exists := svc.Annotations[key]; exists && existingValue == value { + klog.V(4).Infof("Annotation %s already set to %s for service %s/%s", key, value, service.Namespace, service.Name) + return nil + } + } + + // Use patch operation with retry logic to handle concurrent updates + return cs.patchServiceAnnotation(ctx, client, service.Namespace, service.Name, key, value) +} + +// patchServiceAnnotation patches a service annotation using a JSON merge patch with retry logic. +// This method handles concurrent updates safely by retrying on conflicts. +func (cs *CSCloud) patchServiceAnnotation(ctx context.Context, client kubernetes.Interface, namespace, name, key, value string) error { + const maxRetries = 3 + const retryDelay = 500 * time.Millisecond + + // Prepare the patch payload - merge patch that updates only the specific annotation + // JSON merge patch will preserve other annotations while updating/adding this one + patchData := map[string]interface{}{ + "metadata": map[string]interface{}{ + "annotations": map[string]string{ + key: value, + }, + }, + } + + patchBytes, err := json.Marshal(patchData) + if err != nil { + return fmt.Errorf("failed to marshal patch data: %v", err) + } + + for attempt := 0; attempt < maxRetries; attempt++ { + // Apply the patch using JSON merge patch type + // This is atomic and avoids race conditions by merging with existing annotations + _, err = client.CoreV1().Services(namespace).Patch( + ctx, + name, + types.MergePatchType, + patchBytes, + metav1.PatchOptions{}, + ) + + if err == nil { + klog.V(4).Infof("Successfully set annotation %s=%s on service %s/%s", key, value, namespace, name) + return nil + } + + // Handle conflict errors with retry logic + if apierrors.IsConflict(err) { + if attempt < maxRetries-1 { + klog.V(4).Infof("Conflict updating service %s/%s annotation, retrying (attempt %d/%d): %v", namespace, name, attempt+1, maxRetries, err) + time.Sleep(retryDelay) + continue + } + return fmt.Errorf("failed to update service annotation after %d retries due to conflicts: %v", maxRetries, err) + } + + // Handle not found errors + if apierrors.IsNotFound(err) { + klog.V(4).Infof("Service %s/%s not found during patch, skipping annotation update", namespace, name) + return nil + } + + // For other errors, return immediately + return fmt.Errorf("failed to patch service annotation: %v", err) + } + + return fmt.Errorf("failed to update service annotation after %d attempts", maxRetries) +} diff --git a/cloudstack_loadbalancer.go b/cloudstack_loadbalancer.go index 7a3fd6b0..43ea884e 100644 --- a/cloudstack_loadbalancer.go +++ b/cloudstack_loadbalancer.go @@ -44,19 +44,25 @@ const ( ServiceAnnotationLoadBalancerProxyProtocol = "service.beta.kubernetes.io/cloudstack-load-balancer-proxy-protocol" ServiceAnnotationLoadBalancerLoadbalancerHostname = "service.beta.kubernetes.io/cloudstack-load-balancer-hostname" + + // ServiceAnnotationLoadBalancerIPAssociatedByController indicates that the controller + // associated the IP address. This annotation is set by the controller when it associates + // an unallocated IP, and is used to determine if the IP should be disassociated on deletion. + ServiceAnnotationLoadBalancerIPAssociatedByController = "service.beta.kubernetes.io/cloudstack-load-balancer-ip-associated-by-controller" //nolint:gosec ) type loadBalancer struct { *cloudstack.CloudStackClient - name string - algorithm string - hostIDs []string - ipAddr string - ipAddrID string - networkID string - projectID string - rules map[string]*cloudstack.LoadBalancerRule + name string + algorithm string + hostIDs []string + ipAddr string + ipAddrID string + networkID string + projectID string + rules map[string]*cloudstack.LoadBalancerRule + ipAssociatedByController bool } // GetLoadBalancer returns whether the specified load balancer exists, and if so, what its status is. @@ -127,6 +133,14 @@ func (cs *CSCloud) EnsureLoadBalancer(ctx context.Context, clusterName string, s } }(lb) } + + // If the controller associated the IP and matches the service spec, set the annotation to persist this information. + if lb.ipAssociatedByController && lb.ipAddr == service.Spec.LoadBalancerIP { + if err := cs.setServiceAnnotation(ctx, service, ServiceAnnotationLoadBalancerIPAssociatedByController, "true"); err != nil { + // Log the error but don't fail - the annotation is helpful but not critical + klog.Warningf("Failed to set annotation on service %s/%s: %v", service.Namespace, service.Name, err) + } + } } klog.V(4).Infof("Load balancer %v is associated with IP %v", lb.name, lb.ipAddr) @@ -200,11 +214,11 @@ func (cs *CSCloud) EnsureLoadBalancer(ctx context.Context, clusterName string, s for _, lbRule := range lb.rules { protocol := ProtocolFromLoadBalancer(lbRule.Protocol) if protocol == LoadBalancerProtocolInvalid { - return nil, fmt.Errorf("Error parsing protocol %v: %v", lbRule.Protocol, err) + return nil, fmt.Errorf("error parsing protocol %v: %v", lbRule.Protocol, err) } port, err := strconv.ParseInt(lbRule.Publicport, 10, 32) if err != nil { - return nil, fmt.Errorf("Error parsing port %s: %v", lbRule.Publicport, err) + return nil, fmt.Errorf("error parsing port %s: %v", lbRule.Publicport, err) } klog.V(4).Infof("Deleting firewall rules associated with load balancer rule: %v (%v:%v:%v)", lbRule.Name, protocol, lbRule.Publicip, port) @@ -353,10 +367,52 @@ func (cs *CSCloud) EnsureLoadBalancerDeleted(ctx context.Context, clusterName st } } - if lb.ipAddr != "" && lb.ipAddr != service.Spec.LoadBalancerIP { - klog.V(4).Infof("Releasing load balancer IP: %v", lb.ipAddr) - if err := lb.releaseLoadBalancerIP(); err != nil { - return err + if lb.ipAddr != "" { + // If the IP was allocated by the controller (not specified in service spec), release it. + if lb.ipAddr != service.Spec.LoadBalancerIP { + klog.V(4).Infof("Releasing load balancer IP: %v", lb.ipAddr) + if err := lb.releaseLoadBalancerIP(); err != nil { + return err + } + } else { + // If the IP was specified in service spec, check if it was associated by the controller. + // First, check if there's an annotation indicating the controller associated it. + // If not, check if there are any other load balancer rules using this IP. + shouldDisassociate := getBoolFromServiceAnnotation(service, ServiceAnnotationLoadBalancerIPAssociatedByController, false) + + if shouldDisassociate { + // Annotation is set, so check if there are any other load balancer rules using this IP. + // Since we've already deleted all rules for this service, any remaining rules must belong + // to other services. If no other rules exist, it's safe to disassociate the IP. + ip, count, err := lb.Address.GetPublicIpAddressByID(lb.ipAddrID) + if err != nil { + klog.Errorf("Error retrieving IP address %v for disassociation check: %v", lb.ipAddr, err) + shouldDisassociate = false + } else if count > 0 && ip.Allocated != "" { + p := lb.LoadBalancer.NewListLoadBalancerRulesParams() + p.SetPublicipid(lb.ipAddrID) + p.SetListall(true) + if lb.projectID != "" { + p.SetProjectid(lb.projectID) + } + otherRules, err := lb.LoadBalancer.ListLoadBalancerRules(p) + if err != nil { + klog.Errorf("Error checking for other load balancer rules using IP %v: %v", lb.ipAddr, err) + shouldDisassociate = false + } else if otherRules.Count > 0 { + // Other load balancer rules are using this IP (other services are using it), + // so don't disassociate. + shouldDisassociate = false + } + } + } + + if shouldDisassociate { + klog.V(4).Infof("Disassociating IP %v that was associated by the controller", lb.ipAddr) + if err := lb.releaseLoadBalancerIP(); err != nil { + return err + } + } } } @@ -491,6 +547,7 @@ func (lb *loadBalancer) getPublicIPAddress(loadBalancerIP string) error { p := lb.Address.NewListPublicIpAddressesParams() p.SetIpaddress(loadBalancerIP) + p.SetAllocatedonly(false) p.SetListall(true) if lb.projectID != "" { @@ -503,12 +560,16 @@ func (lb *loadBalancer) getPublicIPAddress(loadBalancerIP string) error { } if l.Count != 1 { - return fmt.Errorf("could not find IP address %v", loadBalancerIP) + return fmt.Errorf("could not find IP address %v. Found %d addresses", loadBalancerIP, l.Count) } lb.ipAddr = l.PublicIpAddresses[0].Ipaddress lb.ipAddrID = l.PublicIpAddresses[0].Id + // If the IP is not allocated, associate it. + if l.PublicIpAddresses[0].Allocated == "" { + return lb.associatePublicIPAddress() + } return nil } @@ -537,6 +598,10 @@ func (lb *loadBalancer) associatePublicIPAddress() error { p.SetProjectid(lb.projectID) } + if lb.ipAddr != "" { + p.SetIpaddress(lb.ipAddr) + } + // Associate a new IP address r, err := lb.Address.AssociateIpAddress(p) if err != nil { @@ -545,6 +610,7 @@ func (lb *loadBalancer) associatePublicIPAddress() error { lb.ipAddr = r.Ipaddress lb.ipAddrID = r.Id + lb.ipAssociatedByController = true return nil }