From e4c7e91a94129b1c7881c0d877e7f04e2a7ce41d Mon Sep 17 00:00:00 2001 From: justinsb Date: Thu, 24 Jul 2025 20:19:15 -0400 Subject: [PATCH 01/10] Add doc for MixedProtocolLBService support --- docs/design-docs/multi-protocol-cidr.md | 77 +++++++++++++++++++++++++ 1 file changed, 77 insertions(+) create mode 100644 docs/design-docs/multi-protocol-cidr.md diff --git a/docs/design-docs/multi-protocol-cidr.md b/docs/design-docs/multi-protocol-cidr.md new file mode 100644 index 0000000000..15458ccf05 --- /dev/null +++ b/docs/design-docs/multi-protocol-cidr.md @@ -0,0 +1,77 @@ +## Native MixedProtocolLBService Support in the GKE Service Controller + +### **1\. Abstract** + +This document proposes a design to implement support for Kubernetes Service objects of `type: LoadBalancer` +with mixed protocols (e.g., TCP and UDP) in the cloud-provider-gcp controller. +The core of this proposal is to evolve the controller's logic from managing a +single GCP Forwarding Rule per Kubernetes Service to managing a set of forwarding rules, +one for each protocol specified in the Service manifest. +This change will align GCP's behavior with the generally available `MixedProtocolLBService` +feature in upstream Kubernetes, eliminating the need for the current "dual-service, single-IP" workaround. + +### **2\. Motivation** + +The `MixedProtocolLBService` feature gate has been stable in Kubernetes since v1.26, allowing users to define a single LoadBalancer Service with ports for both TCP and UDP. +However, the GCP Service controller does not currently support this. +The controller's logic is based on a one-to-one mapping between a Service object +and a single GCP Forwarding Rule. +Since a GCP Forwarding Rule is inherently bound to a single protocol (TCP or UDP), the controller cannot provision the necessary infrastructure for a mixed-protocol service. + +This forces users to adopt a less intuitive workaround: +deploying two separate Service objects (one for TCP, one for UDP) +and manually assigning them to the same reserved static IP address. +While functional, this approach is cumbersome, increases configuration complexity, +and is not aligned with the declarative intent of the Kubernetes API. + +Implementing `MixedProtocolLBService` support would provide a superior user experience, +reduce configuration errors, +and make GCP's networking capabilities consistent with the core Kubernetes feature set here. + +### **3\. Proposed Design** + +The proposed design modifies the reconciliation loop within the GCP Service controller +to manage a collection of forwarding rules for each LoadBalancer Service, rather than just one. + +#### **3.1. Core Controller Logic Modification** + +The primary changes will be within the `EnsureLoadBalancer` method in the GCP cloud provider's Service controller. The existing logic creates a single load balancer configuration. The new logic will perform the following steps during its reconciliation loop: + +1. **Protocol Grouping:** Upon receiving a Service object, the controller will first inspect the `spec.ports` array and group the ports by their declared protocol (e.g., create a map of `corev1.Protocol` -> `corev1.ServicePort`). +2. **IP Address Management:** + * If a static IP is specified in `spec.loadBalancerIP`, it will be used for all forwarding rules. + * If no IP is specified, the controller will reserve a new static IP address upon the first reconciliation. This address will be used for all forwarding rules created for this Service. The controller must ensure this IP is retained across updates and released only upon Service deletion. +3. **Per-Protocol Reconciliation Loop:** The controller will iterate through the grouped protocols. For each protocol (e.g., TCP, UDP): + * **Generate Desired State:** It will construct a set of desired GCP Forwarding Rule object. + * **Naming Convention:** To avoid collisions and maintain a clear association, forwarding rules will be named using a convention that includes the service UID and the protocol. A proposed convention is `k8s-fw-\[service-uid\]-\[protocol\]`, for example, `k8s-fw-a8b4f12-tcp`. + * **Configuration:** Each forwarding rule will be configured with the shared static IP, the specific protocol (TCP or UDP), and the list of ports for that protocol. + * **Reconcile with Actual State:** The controller will check if a forwarding rule with the generated name already exists in GCP. + * **Create:** If the rule does not exist, it will be created. + * **Update:** If the rule exists, its configuration will be compared against the desired state and updated if necessary. + * **No-Op:** If the existing rule matches the desired state, no action is taken. +4. **Resource Garbage Collection:** After the reconciliation loop for all protocols present in the Service spec, the controller must clean up any orphaned resources. It will list all GCP Forwarding Rules that match the Service's UID pattern (`k8s-fw-\[service-uid\]-\*`). If any of these existing rules correspond to a protocol that has been removed from the Service spec, the controller will delete that now-orphaned forwarding rule. +5. **Backend Resource Management:** The underlying GCP Backend Service or Target Pool can typically be shared across the different forwarding rules, as it defines the set of backend nodes, not the frontend protocol. The controller logic for managing the backend service will largely remain the same, ensuring it points to the correct set of cluster nodes. + +#### **3.2. Deletion Logic Modification** + +The `EnsureLoadBalancerDeleted` method must also be updated. When a Service is deleted, the controller will use the naming convention (`k8s-fw-\[service-uid\]-\*`) to find and delete *all* associated forwarding rules, in addition to the backend service and the reserved static IP address (if it was provisioned by the controller). + +### **4\. Code Implementation Pointers** + +The necessary changes would be localized within the GCP-specific implementation of the cloudprovider.LoadBalancer interface.7 + +* **Primary Files for Modification:** The core logic for L4 load balancer reconciliation is located in the gce package. The files managing Service objects of `type: LoadBalancer` would be the main focus. +* **Key Functions:** + * gce.EnsureLoadBalancer: This function would need to be refactored to contain the protocol-grouping and per-protocol reconciliation loop described above. + * gce.EnsureLoadBalancerDeleted: This function would need to be updated to iterate through all potential forwarding rules based on the new naming scheme and delete them. + * **Resource Naming Functions:** Helper functions that generate names for GCP resources would need to be adapted to produce the protocol-specific forwarding rule names. + +### **5\. Alternatives Considered** + +* **Continue with Dual-Service Workaround:** This is the current state. It is not a true implementation of the Kubernetes feature and places an unnecessary operational burden on users. +* **Rely on Gateway API:** The Gateway API is the long-term strategic direction for advanced Kubernetes networking.10 However, it is a separate, more complex API. This proposal aims to fix a specific deficiency in the existing and widely used + Service API, which should function as specified by the core Kubernetes project. Implementing this feature does not preclude or conflict with the ongoing development of the Gateway API. + +By adopting this design, the GCP Service controller can provide a seamless and intuitive experience for users needing to expose mixed-protocol applications, directly translating the Kubernetes API's intent into the necessary underlying cloud infrastructure. + + From 9b33c10d9ea0a1157df89dd7cdb1a1a3835ba0d2 Mon Sep 17 00:00:00 2001 From: justinsb Date: Thu, 24 Jul 2025 20:40:02 -0400 Subject: [PATCH 02/10] fixup! Add doc for MixedProtocolLBService support --- docs/design-docs/multi-protocol-cidr.md | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/docs/design-docs/multi-protocol-cidr.md b/docs/design-docs/multi-protocol-cidr.md index 15458ccf05..ebbaf9ae38 100644 --- a/docs/design-docs/multi-protocol-cidr.md +++ b/docs/design-docs/multi-protocol-cidr.md @@ -66,6 +66,14 @@ The necessary changes would be localized within the GCP-specific implementation * gce.EnsureLoadBalancerDeleted: This function would need to be updated to iterate through all potential forwarding rules based on the new naming scheme and delete them. * **Resource Naming Functions:** Helper functions that generate names for GCP resources would need to be adapted to produce the protocol-specific forwarding rule names. +### 5. Production Readiness + +For ease of introduction, we will implement a feature flag for the support. If the feature flag is not set, +the existing behaviour will be used - specifying a service with multiple protocols will be an error. + +So that traffic is not interrupted, if a ForwardingRule exists with the "old" naming convention (`k8s-fw-\[service-uid\]`), +that name will be used for the matching desired ForwardingRule. + ### **5\. Alternatives Considered** * **Continue with Dual-Service Workaround:** This is the current state. It is not a true implementation of the Kubernetes feature and places an unnecessary operational burden on users. From 72ed8d105f224b8907d4548dd27470e8f1ff7fb7 Mon Sep 17 00:00:00 2001 From: justinsb Date: Thu, 24 Jul 2025 20:50:32 -0400 Subject: [PATCH 03/10] controller support for mixed protocol LB services --- providers/gce/gce_alpha.go | 3 + providers/gce/gce_loadbalancer.go | 60 +-- providers/gce/gce_loadbalancer_external.go | 432 +++++++++++++++------ 3 files changed, 343 insertions(+), 152 deletions(-) diff --git a/providers/gce/gce_alpha.go b/providers/gce/gce_alpha.go index 64a4f1b236..ef365fb9a3 100644 --- a/providers/gce/gce_alpha.go +++ b/providers/gce/gce_alpha.go @@ -27,6 +27,9 @@ const ( // AlphaFeatureSkipIGsManagement enabled L4 Regional Backend Services and // disables instance group management in service controller AlphaFeatureSkipIGsManagement = "SkipIGsManagement" + + // AlphaFeatureMultiProtocolLB allows services to use multiple protocols in the same LoadBalancer. + AlphaFeatureMultiProtocolLB = "MultiProtocolLB" ) // AlphaFeatureGate contains a mapping of alpha features to whether they are enabled diff --git a/providers/gce/gce_loadbalancer.go b/providers/gce/gce_loadbalancer.go index 7123781236..d13297f943 100644 --- a/providers/gce/gce_loadbalancer.go +++ b/providers/gce/gce_loadbalancer.go @@ -148,23 +148,25 @@ func (g *Cloud) EnsureLoadBalancer(ctx context.Context, clusterName string, svc // Services with multiples protocols are not supported by this controller, warn the users and sets // the corresponding Service Status Condition. // https://github.com/kubernetes/enhancements/tree/master/keps/sig-network/1435-mixed-protocol-lb - if err := checkMixedProtocol(svc.Spec.Ports); err != nil { - if hasLoadBalancerPortsError(svc) { + if !g.AlphaFeatureGate.Enabled(AlphaFeatureMultiProtocolLB) { + if err := checkMixedProtocol(svc.Spec.Ports); err != nil { + if hasLoadBalancerPortsError(svc) { + return nil, err + } + klog.Warningf("Ignoring service %s/%s using different ports protocols", svc.Namespace, svc.Name) + g.eventRecorder.Event(svc, v1.EventTypeWarning, v1.LoadBalancerPortsErrorReason, "LoadBalancers with multiple protocols are not supported.") + svcApplyStatus := corev1apply.ServiceStatus().WithConditions( + metav1apply.Condition(). + WithType(v1.LoadBalancerPortsError). + WithStatus(metav1.ConditionTrue). + WithReason(v1.LoadBalancerPortsErrorReason). + WithMessage("LoadBalancer with multiple protocols are not supported")) + svcApply := corev1apply.Service(svc.Name, svc.Namespace).WithStatus(svcApplyStatus) + if _, errApply := g.client.CoreV1().Services(svc.Namespace).ApplyStatus(ctx, svcApply, metav1.ApplyOptions{FieldManager: "gce-cloud-controller", Force: true}); errApply != nil { + return nil, errApply + } return nil, err } - klog.Warningf("Ignoring service %s/%s using different ports protocols", svc.Namespace, svc.Name) - g.eventRecorder.Event(svc, v1.EventTypeWarning, v1.LoadBalancerPortsErrorReason, "LoadBalancers with multiple protocols are not supported.") - svcApplyStatus := corev1apply.ServiceStatus().WithConditions( - metav1apply.Condition(). - WithType(v1.LoadBalancerPortsError). - WithStatus(metav1.ConditionTrue). - WithReason(v1.LoadBalancerPortsErrorReason). - WithMessage("LoadBalancer with multiple protocols are not supported")) - svcApply := corev1apply.Service(svc.Name, svc.Namespace).WithStatus(svcApplyStatus) - if _, errApply := g.client.CoreV1().Services(svc.Namespace).ApplyStatus(ctx, svcApply, metav1.ApplyOptions{FieldManager: "gce-cloud-controller", Force: true}); errApply != nil { - return nil, errApply - } - return nil, err } klog.V(4).Infof("EnsureLoadBalancer(%v, %v, %v, %v, %v): ensure %v loadbalancer", clusterName, svc.Namespace, svc.Name, loadBalancerName, g.region, desiredScheme) @@ -231,19 +233,21 @@ func (g *Cloud) UpdateLoadBalancer(ctx context.Context, clusterName string, svc // Services with multiples protocols are not supported by this controller, warn the users and sets // the corresponding Service Status Condition, but keep processing the Update to not break upgrades. // https://github.com/kubernetes/enhancements/tree/master/keps/sig-network/1435-mixed-protocol-lb - if err := checkMixedProtocol(svc.Spec.Ports); err != nil && !hasLoadBalancerPortsError(svc) { - klog.Warningf("Ignoring update for service %s/%s using different ports protocols", svc.Namespace, svc.Name) - g.eventRecorder.Event(svc, v1.EventTypeWarning, v1.LoadBalancerPortsErrorReason, "LoadBalancer with multiple protocols are not supported.") - svcApplyStatus := corev1apply.ServiceStatus().WithConditions( - metav1apply.Condition(). - WithType(v1.LoadBalancerPortsError). - WithStatus(metav1.ConditionTrue). - WithReason(v1.LoadBalancerPortsErrorReason). - WithMessage("LoadBalancer with multiple protocols are not supported")) - svcApply := corev1apply.Service(svc.Name, svc.Namespace).WithStatus(svcApplyStatus) - if _, errApply := g.client.CoreV1().Services(svc.Namespace).ApplyStatus(ctx, svcApply, metav1.ApplyOptions{FieldManager: "gce-cloud-controller", Force: true}); errApply != nil { - // the error is retried by the controller loop - return errApply + if !g.AlphaFeatureGate.Enabled(AlphaFeatureMultiProtocolLB) { + if err := checkMixedProtocol(svc.Spec.Ports); err != nil && !hasLoadBalancerPortsError(svc) { + klog.Warningf("Ignoring update for service %s/%s using different ports protocols", svc.Namespace, svc.Name) + g.eventRecorder.Event(svc, v1.EventTypeWarning, v1.LoadBalancerPortsErrorReason, "LoadBalancer with multiple protocols are not supported.") + svcApplyStatus := corev1apply.ServiceStatus().WithConditions( + metav1apply.Condition(). + WithType(v1.LoadBalancerPortsError). + WithStatus(metav1.ConditionTrue). + WithReason(v1.LoadBalancerPortsErrorReason). + WithMessage("LoadBalancer with multiple protocols are not supported")) + svcApply := corev1apply.Service(svc.Name, svc.Namespace).WithStatus(svcApplyStatus) + if _, errApply := g.client.CoreV1().Services(svc.Namespace).ApplyStatus(ctx, svcApply, metav1.ApplyOptions{FieldManager: "gce-cloud-controller", Force: true}); errApply != nil { + // the error is retried by the controller loop + return errApply + } } } diff --git a/providers/gce/gce_loadbalancer_external.go b/providers/gce/gce_loadbalancer_external.go index 9c8daab905..c51a99b07e 100644 --- a/providers/gce/gce_loadbalancer_external.go +++ b/providers/gce/gce_loadbalancer_external.go @@ -112,45 +112,152 @@ func (g *Cloud) ensureExternalLoadBalancer(clusterName string, clusterID string, g.deleteWrongNetworkTieredResources(loadBalancerName, lbRefStr, netTier) } - // Check if the forwarding rule exists, and if so, what its IP is. - fwdRuleExists, fwdRuleNeedsUpdate, fwdRuleIP, err := g.forwardingRuleNeedsUpdate(loadBalancerName, g.region, requestedIP, ports) - if err != nil { - return nil, err - } - if !fwdRuleExists { - klog.V(2).Infof("ensureExternalLoadBalancer(%s): Forwarding rule %v doesn't exist.", lbRefStr, loadBalancerName) + // If the feature gate is not enabled, we check for multiple protocols and error out. + if !g.AlphaFeatureGate.Enabled(AlphaFeatureMultiProtocolLB) { + fwdRuleExists, fwdRuleNeedsUpdate, fwdRuleIP, err := g.forwardingRuleNeedsUpdate(loadBalancerName, g.region, requestedIP, ports) + if err != nil { + return nil, err + } + if !fwdRuleExists { + klog.V(2).Infof("ensureExternalLoadBalancer(%s): Forwarding rule %v doesn't exist.", lbRefStr, loadBalancerName) + } + // Single-protocol logic + ipAddressToUse, isSafeToReleaseIP, err := g.ensureIPAddress(loadBalancerName, lbRefStr, apiService.Spec.LoadBalancerIP, fwdRuleIP, netTier) + if err != nil { + return nil, err + } + defer func() { + if isSafeToReleaseIP { + if err := g.DeleteRegionAddress(loadBalancerName, g.region); err != nil && !isNotFound(err) { + klog.Errorf("ensureExternalLoadBalancer(%s): Failed to release static IP %s in region %v: %v.", lbRefStr, ipAddressToUse, g.region, err) + } else if isNotFound(err) { + klog.V(2).Infof("ensureExternalLoadBalancer(%s): IP address %s is not reserved.", lbRefStr, ipAddressToUse) + } else { + klog.Infof("ensureExternalLoadBalancer(%s): Released static IP %s.", lbRefStr, ipAddressToUse) + } + } else { + klog.Warningf("ensureExternalLoadBalancer(%s): Orphaning static IP %s in region %v: %v.", lbRefStr, ipAddressToUse, g.region, err) + } + }() + + sourceRanges, err := servicehelpers.GetLoadBalancerSourceRanges(apiService) + if err != nil { + return nil, err + } + + firewallExists, firewallNeedsUpdate, err := g.firewallNeedsUpdate(loadBalancerName, serviceName.String(), ipAddressToUse, ports, sourceRanges) + if err != nil { + return nil, err + } + + if firewallNeedsUpdate { + desc := makeFirewallDescription(serviceName.String(), ipAddressToUse) + if firewallExists { + klog.Infof("ensureExternalLoadBalancer(%s): Updating firewall.", lbRefStr) + if err := g.updateFirewall(apiService, MakeFirewallName(loadBalancerName), desc, ipAddressToUse, sourceRanges, ports, hosts); err != nil { + return nil, err + } + klog.Infof("ensureExternalLoadBalancer(%s): Updated firewall.", lbRefStr) + } else { + klog.Infof("ensureExternalLoadBalancer(%s): Creating firewall.", lbRefStr) + if err := g.createFirewall(apiService, MakeFirewallName(loadBalancerName), desc, ipAddressToUse, sourceRanges, ports, hosts); err != nil { + return nil, err + } + klog.Infof("ensureExternalLoadBalancer(%s): Created firewall.", lbRefStr) + } + } + + tpExists, tpNeedsRecreation, err := g.targetPoolNeedsRecreation(loadBalancerName, g.region, apiService.Spec.SessionAffinity) + if err != nil { + return nil, err + } + if !tpExists { + klog.Infof("ensureExternalLoadBalancer(%s): Target pool for service doesn't exist.", lbRefStr) + } + + var hcToCreate, hcToDelete *compute.HttpHealthCheck + hcLocalTrafficExisting, err := g.GetHTTPHealthCheck(loadBalancerName) + if err != nil && !isHTTPErrorCode(err, http.StatusNotFound) { + return nil, fmt.Errorf("error checking HTTP health check for load balancer (%s): %v", lbRefStr, err) + } + if path, healthCheckNodePort := servicehelpers.GetServiceHealthCheckPathPort(apiService); path != "" { + klog.V(4).Infof("ensureExternalLoadBalancer(%s): Service needs local traffic health checks on: %d%s.", lbRefStr, healthCheckNodePort, path) + if hcLocalTrafficExisting == nil { + klog.V(2).Infof("ensureExternalLoadBalancer(%s): Updating from nodes health checks to local traffic health checks.", lbRefStr) + hcToDelete = makeHTTPHealthCheck(MakeNodesHealthCheckName(clusterID), GetNodesHealthCheckPath(), GetNodesHealthCheckPort()) + tpNeedsRecreation = true + } + hcToCreate = makeHTTPHealthCheck(loadBalancerName, path, healthCheckNodePort) + } else { + klog.V(4).Infof("ensureExternalLoadBalancer(%s): Service needs nodes health checks.", lbRefStr) + if hcLocalTrafficExisting != nil { + klog.V(2).Infof("ensureExternalLoadBalancer(%s): Updating from local traffic health checks to nodes health checks.", lbRefStr) + hcToDelete = hcLocalTrafficExisting + tpNeedsRecreation = true + } + hcToCreate = makeHTTPHealthCheck(MakeNodesHealthCheckName(clusterID), GetNodesHealthCheckPath(), GetNodesHealthCheckPort()) + } + + if fwdRuleExists && (fwdRuleNeedsUpdate || tpNeedsRecreation) { + isSafeToReleaseIP = false + if err := g.DeleteRegionForwardingRule(loadBalancerName, g.region); err != nil && !isNotFound(err) { + return nil, fmt.Errorf("failed to delete existing forwarding rule for load balancer (%s) update: %v", lbRefStr, err) + } + klog.Infof("ensureExternalLoadBalancer(%s): Deleted forwarding rule.", lbRefStr) + } + + if err := g.ensureTargetPoolAndHealthCheck(tpExists, tpNeedsRecreation, apiService, loadBalancerName, clusterID, ipAddressToUse, hosts, hcToCreate, hcToDelete); err != nil { + return nil, err + } + + if tpNeedsRecreation || fwdRuleNeedsUpdate { + klog.Infof("ensureExternalLoadBalancer(%s): Creating forwarding rule, IP %s (tier: %s).", lbRefStr, ipAddressToUse, netTier) + if err := createForwardingRule(g, loadBalancerName, serviceName.String(), g.region, ipAddressToUse, g.targetPoolURL(loadBalancerName), ports, netTier, g.enableDiscretePortForwarding); err != nil { + return nil, fmt.Errorf("failed to create forwarding rule for load balancer (%s): %v", lbRefStr, err) + } + isSafeToReleaseIP = true + klog.Infof("ensureExternalLoadBalancer(%s): Created forwarding rule, IP %s.", lbRefStr, ipAddressToUse) + } + + status := &v1.LoadBalancerStatus{} + status.Ingress = []v1.LoadBalancerIngress{{IP: ipAddressToUse}} + + return status, nil } - // Make sure we know which IP address will be used and have properly reserved - // it as static before moving forward with the rest of our operations. - // - // We use static IP addresses when updating a load balancer to ensure that we - // can replace the load balancer's other components without changing the - // address its service is reachable on. We do it this way rather than always - // keeping the static IP around even though this is more complicated because - // it makes it less likely that we'll run into quota issues. Only 7 static - // IP addresses are allowed per region by default. - // - // We could let an IP be allocated for us when the forwarding rule is created, - // but we need the IP to set up the firewall rule, and we want to keep the - // forwarding rule creation as the last thing that needs to be done in this - // function in order to maintain the invariant that "if the forwarding rule - // exists, the LB has been fully created". - ipAddressToUse := "" + // Multi-protocol logic starts here + groupedPorts := groupPortsByProtocol(ports) + var fwdRuleIP string - // Through this process we try to keep track of whether it is safe to - // release the IP that was allocated. If the user specifically asked for - // an IP, we assume they are managing it themselves. Otherwise, we will - // release the IP in case of early-terminating failure or upon successful - // creating of the LB. - // TODO(#36535): boil this logic down into a set of component functions - // and key the flag values off of errors returned. - isUserOwnedIP := false // if this is set, we never release the IP - isSafeToReleaseIP := false - defer func() { - if isUserOwnedIP { - return + // We need to determine the IP address for all forwarding rules. + // We check if any of the forwarding rules already exist and use their IP. + for protocol := range groupedPorts { + frName := g.getProtocolForwardingRuleName(loadBalancerName, protocol) + fwd, err := g.GetRegionForwardingRule(frName, g.region) + if err != nil && !isNotFound(err) { + return nil, err } + if fwd != nil { + fwdRuleIP = fwd.IPAddress + break + } + } + // If no forwarding rule exists, check for the old one. + if fwdRuleIP == "" { + fwd, err := g.GetRegionForwardingRule(loadBalancerName, g.region) + if err != nil && !isNotFound(err) { + return nil, err + } + if fwd != nil { + fwdRuleIP = fwd.IPAddress + } + } + + ipAddressToUse, isSafeToReleaseIP, err := g.ensureIPAddress(loadBalancerName, lbRefStr, requestedIP, fwdRuleIP, netTier) + if err != nil { + return nil, err + } + defer func() { if isSafeToReleaseIP { if err := g.DeleteRegionAddress(loadBalancerName, g.region); err != nil && !isNotFound(err) { klog.Errorf("ensureExternalLoadBalancer(%s): Failed to release static IP %s in region %v: %v.", lbRefStr, ipAddressToUse, g.region, err) @@ -164,36 +271,6 @@ func (g *Cloud) ensureExternalLoadBalancer(clusterName string, clusterID string, } }() - if requestedIP != "" { - // If user requests a specific IP address, verify first. No mutation to - // the GCE resources will be performed in the verification process. - isUserOwnedIP, err = verifyUserRequestedIP(g, g.region, requestedIP, fwdRuleIP, lbRefStr, netTier) - if err != nil { - return nil, err - } - ipAddressToUse = requestedIP - } - - if !isUserOwnedIP { - // If we are not using the user-owned IP, either promote the - // emphemeral IP used by the fwd rule, or create a new static IP. - ipAddr, existed, err := ensureStaticIP(g, loadBalancerName, serviceName.String(), g.region, fwdRuleIP, netTier) - if err != nil { - return nil, fmt.Errorf("failed to ensure a static IP for load balancer (%s): %v", lbRefStr, err) - } - klog.Infof("ensureExternalLoadBalancer(%s): Ensured IP address %s (tier: %s).", lbRefStr, ipAddr, netTier) - // If the IP was not owned by the user, but it already existed, it - // could indicate that the previous update cycle failed. We can use - // this IP and try to run through the process again, but we should - // not release the IP unless it is explicitly flagged as OK. - isSafeToReleaseIP = !existed - ipAddressToUse = ipAddr - } - - // Deal with the firewall next. The reason we do this here rather than last - // is because the forwarding rule is used as the indicator that the load - // balancer is fully created - it's what getLoadBalancer checks for. - // Check if user specified the allow source range sourceRanges, err := servicehelpers.GetLoadBalancerSourceRanges(apiService) if err != nil { return nil, err @@ -231,8 +308,7 @@ func (g *Cloud) ensureExternalLoadBalancer(clusterName string, clusterID string, klog.Infof("ensureExternalLoadBalancer(%s): Target pool for service doesn't exist.", lbRefStr) } - // Check which health check needs to create and which health check needs to delete. - // Health check management is coupled with target pool operation to prevent leaking. + // Health check logic... var hcToCreate, hcToDelete *compute.HttpHealthCheck hcLocalTrafficExisting, err := g.GetHTTPHealthCheck(loadBalancerName) if err != nil && !isHTTPErrorCode(err, http.StatusNotFound) { @@ -241,9 +317,6 @@ func (g *Cloud) ensureExternalLoadBalancer(clusterName string, clusterID string, if path, healthCheckNodePort := servicehelpers.GetServiceHealthCheckPathPort(apiService); path != "" { klog.V(4).Infof("ensureExternalLoadBalancer(%s): Service needs local traffic health checks on: %d%s.", lbRefStr, healthCheckNodePort, path) if hcLocalTrafficExisting == nil { - // This logic exists to detect a transition for non-OnlyLocal to OnlyLocal service - // turn on the tpNeedsRecreation flag to delete/recreate fwdrule/tpool updating the - // target pool to use local traffic health check. klog.V(2).Infof("ensureExternalLoadBalancer(%s): Updating from nodes health checks to local traffic health checks.", lbRefStr) hcToDelete = makeHTTPHealthCheck(MakeNodesHealthCheckName(clusterID), GetNodesHealthCheckPath(), GetNodesHealthCheckPort()) tpNeedsRecreation = true @@ -252,56 +325,138 @@ func (g *Cloud) ensureExternalLoadBalancer(clusterName string, clusterID string, } else { klog.V(4).Infof("ensureExternalLoadBalancer(%s): Service needs nodes health checks.", lbRefStr) if hcLocalTrafficExisting != nil { - // This logic exists to detect a transition from OnlyLocal to non-OnlyLocal service - // and turn on the tpNeedsRecreation flag to delete/recreate fwdrule/tpool updating the - // target pool to use nodes health check. klog.V(2).Infof("ensureExternalLoadBalancer(%s): Updating from local traffic health checks to nodes health checks.", lbRefStr) hcToDelete = hcLocalTrafficExisting tpNeedsRecreation = true } hcToCreate = makeHTTPHealthCheck(MakeNodesHealthCheckName(clusterID), GetNodesHealthCheckPath(), GetNodesHealthCheckPort()) } - // Now we get to some slightly more interesting logic. - // First, neither target pools nor forwarding rules can be updated in place - - // they have to be deleted and recreated. - // Second, forwarding rules are layered on top of target pools in that you - // can't delete a target pool that's currently in use by a forwarding rule. - // Thus, we have to tear down the forwarding rule if either it or the target - // pool needs to be updated. - if fwdRuleExists && (fwdRuleNeedsUpdate || tpNeedsRecreation) { - // Begin critical section. If we have to delete the forwarding rule, - // and something should fail before we recreate it, don't release the - // IP. That way we can come back to it later. - isSafeToReleaseIP = false - if err := g.DeleteRegionForwardingRule(loadBalancerName, g.region); err != nil && !isNotFound(err) { - return nil, fmt.Errorf("failed to delete existing forwarding rule for load balancer (%s) update: %v", lbRefStr, err) - } - klog.Infof("ensureExternalLoadBalancer(%s): Deleted forwarding rule.", lbRefStr) - } if err := g.ensureTargetPoolAndHealthCheck(tpExists, tpNeedsRecreation, apiService, loadBalancerName, clusterID, ipAddressToUse, hosts, hcToCreate, hcToDelete); err != nil { return nil, err } - if tpNeedsRecreation || fwdRuleNeedsUpdate { - klog.Infof("ensureExternalLoadBalancer(%s): Creating forwarding rule, IP %s (tier: %s).", lbRefStr, ipAddressToUse, netTier) - if err := createForwardingRule(g, loadBalancerName, serviceName.String(), g.region, ipAddressToUse, g.targetPoolURL(loadBalancerName), ports, netTier, g.enableDiscretePortForwarding); err != nil { - return nil, fmt.Errorf("failed to create forwarding rule for load balancer (%s): %v", lbRefStr, err) + // Forwarding rule logic + // First, handle the old forwarding rule name for backward compatibility. + fwd, err := g.GetRegionForwardingRule(loadBalancerName, g.region) + if err != nil && !isNotFound(err) { + return nil, err + } + if fwd != nil { + // If the old forwarding rule exists, we need to check if it matches one of the protocols. + // If so, we keep it. If not, we delete it. + protocol, err := getProtocol(fwd.IPProtocol) + if err != nil { + return nil, err + } + if _, ok := groupedPorts[protocol]; !ok { + // This forwarding rule's protocol is not in the service spec, so delete it. + if err := g.DeleteRegionForwardingRule(loadBalancerName, g.region); err != nil && !isNotFound(err) { + return nil, err + } + } + } + + // Then, iterate over the protocols and create/update forwarding rules. + for protocol, protocolPorts := range groupedPorts { + frName := g.getProtocolForwardingRuleName(loadBalancerName, protocol) + // If the old forwarding rule matches this protocol, use its name. + if fwd != nil && fwd.IPProtocol == string(protocol) { + frName = loadBalancerName + } + + exists, needsUpdate, _, err := g.forwardingRuleNeedsUpdate(frName, g.region, ipAddressToUse, protocolPorts) + if err != nil { + return nil, err + } + + if needsUpdate { + if exists { + if err := g.DeleteRegionForwardingRule(frName, g.region); err != nil && !isNotFound(err) { + return nil, err + } + } + klog.Infof("ensureExternalLoadBalancer(%s): Creating forwarding rule %s for protocol %s, IP %s (tier: %s).", lbRefStr, frName, protocol, ipAddressToUse, netTier) + if err := createForwardingRule(g, frName, serviceName.String(), g.region, ipAddressToUse, g.targetPoolURL(loadBalancerName), protocolPorts, netTier, g.enableDiscretePortForwarding); err != nil { + return nil, fmt.Errorf("failed to create forwarding rule for load balancer (%s): %v", lbRefStr, err) + } + klog.Infof("ensureExternalLoadBalancer(%s): Created forwarding rule %s.", lbRefStr, frName) + } + } + + // Garbage collect old forwarding rules. + activeFRNames := sets.NewString() + for protocol := range groupedPorts { + activeFRNames.Insert(g.getProtocolForwardingRuleName(loadBalancerName, protocol)) + } + // Check for the old forwarding rule name. + if fwd != nil { + protocol, err := getProtocol(fwd.IPProtocol) + if err != nil { + return nil, err + } + if _, ok := groupedPorts[protocol]; ok { + activeFRNames.Insert(loadBalancerName) + } + } + + // List all forwarding rules for this service and delete the ones that are not active. + frs, err := g.ListRegionForwardingRules(g.region) + if err != nil { + return nil, err + } + for _, fr := range frs { + if strings.HasPrefix(fr.Name, loadBalancerName) && !activeFRNames.Has(fr.Name) { + klog.Infof("ensureExternalLoadBalancer(%s): Deleting orphaned forwarding rule %s.", lbRefStr, fr.Name) + if err := g.DeleteRegionForwardingRule(fr.Name, g.region); err != nil && !isNotFound(err) { + return nil, err + } } - // End critical section. It is safe to release the static IP (which - // just demotes it to ephemeral) now that it is attached. In the case - // of a user-requested IP, the "is user-owned" flag will be set, - // preventing it from actually being released. - isSafeToReleaseIP = true - klog.Infof("ensureExternalLoadBalancer(%s): Created forwarding rule, IP %s.", lbRefStr, ipAddressToUse) } + isSafeToReleaseIP = true status := &v1.LoadBalancerStatus{} status.Ingress = []v1.LoadBalancerIngress{{IP: ipAddressToUse}} return status, nil } +func (g *Cloud) ensureIPAddress(loadBalancerName, lbRefStr, requestedIP, fwdRuleIP string, netTier cloud.NetworkTier) (ipAddress string, isSafeToReleaseIP bool, err error) { + // Make sure we know which IP address will be used and have properly reserved + // it as static before moving forward with the rest of our operations. + ipAddressToUse := "" + isUserOwnedIP := false + + if requestedIP != "" { + // If user requests a specific IP address, verify first. No mutation to + // the GCE resources will be performed in the verification process. + isUserOwnedIP, err = verifyUserRequestedIP(g, g.region, requestedIP, fwdRuleIP, lbRefStr, netTier) + if err != nil { + return "", false, err + } + ipAddressToUse = requestedIP + } + + if !isUserOwnedIP { + // If we are not using the user-owned IP, either promote the + // emphemeral IP used by the fwd rule, or create a new static IP. + ipAddr, existed, err := ensureStaticIP(g, loadBalancerName, lbRefStr, g.region, fwdRuleIP, netTier) + if err != nil { + return "", false, fmt.Errorf("failed to ensure a static IP for load balancer (%s): %v", lbRefStr, err) + } + klog.Infof("ensureExternalLoadBalancer(%s): Ensured IP address %s (tier: %s).", lbRefStr, ipAddr, netTier) + // If the IP was not owned by the user, but it already existed, it + // could indicate that the previous update cycle failed. We can use + // this IP and try to run through the process again, but we should + // not release the IP unless it is explicitly flagged as OK. + isSafeToReleaseIP = !existed + ipAddressToUse = ipAddr + } + + return ipAddressToUse, isSafeToReleaseIP, nil +} + + // updateExternalLoadBalancer is the external implementation of LoadBalancer.UpdateLoadBalancer. func (g *Cloud) updateExternalLoadBalancer(clusterName string, service *v1.Service, nodes []*v1.Node) error { // Process services with LoadBalancerClass "networking.gke.io/l4-regional-external-legacy" used for this controller. @@ -378,12 +533,26 @@ func (g *Cloud) ensureExternalLoadBalancerDeleted(clusterName, clusterID string, return ignoreNotFound(g.DeleteRegionAddress(loadBalancerName, g.region)) }, func() error { - klog.Infof("ensureExternalLoadBalancerDeleted(%s): Deleting forwarding rule.", lbRefStr) + klog.Infof("ensureExternalLoadBalancerDeleted(%s): Deleting forwarding rules.", lbRefStr) // The forwarding rule must be deleted before either the target pool can, // unfortunately, so we have to do these two serially. - if err := ignoreNotFound(g.DeleteRegionForwardingRule(loadBalancerName, g.region)); err != nil { + frs, err := g.ListRegionForwardingRules(g.region) + if err != nil { return err } + var deleteErrs []error + for _, fr := range frs { + if strings.HasPrefix(fr.Name, loadBalancerName) { + klog.Infof("ensureExternalLoadBalancerDeleted(%s): Deleting forwarding rule %s.", lbRefStr, fr.Name) + if err := ignoreNotFound(g.DeleteRegionForwardingRule(fr.Name, g.region)); err != nil { + deleteErrs = append(deleteErrs, err) + } + } + } + if len(deleteErrs) > 0 { + return utilerrors.NewAggregate(deleteErrs) + } + klog.Infof("ensureExternalLoadBalancerDeleted(%s): Deleting target pool.", lbRefStr) if err := g.DeleteExternalTargetPoolAndChecks(service, loadBalancerName, g.region, clusterID, hcNames...); err != nil { return err @@ -685,6 +854,10 @@ func (g *Cloud) targetPoolURL(name string) string { return g.projectsBasePath + strings.Join([]string{g.projectID, "regions", g.region, "targetPools", name}, "/") } +func (g *Cloud) getProtocolForwardingRuleName(loadBalancerName string, protocol v1.Protocol) string { + return loadBalancerName + "-" + strings.ToLower(string(protocol)) +} + func makeHTTPHealthCheck(name, path string, port int32) *compute.HttpHealthCheck { return &compute.HttpHealthCheck{ Name: name, @@ -950,6 +1123,15 @@ func equalPorts(existingPorts, newPorts []string, existingPortRange, newPortRang return existingPortRange == newPortRange } +func groupPortsByProtocol(ports []v1.ServicePort) map[v1.Protocol][]v1.ServicePort { + grouped := make(map[v1.Protocol][]v1.ServicePort) + for _, p := range ports { + port := p + grouped[port.Protocol] = append(grouped[port.Protocol], port) + } + return grouped +} + // translate from what K8s supports to what the cloud provider supports for session affinity. func translateAffinityType(affinityType v1.ServiceAffinity) string { switch affinityType { @@ -974,15 +1156,20 @@ func (g *Cloud) firewallNeedsUpdate(name, serviceName, ipAddress string, ports [ if fw.Description != makeFirewallDescription(serviceName, ipAddress) { return true, true, nil } - if len(fw.Allowed) != 1 || (fw.Allowed[0].IPProtocol != "tcp" && fw.Allowed[0].IPProtocol != "udp") { - return true, true, nil + + groupedPorts := groupPortsByProtocol(ports) + expectedAllowed := make(map[string][]string) + for protocol, protocolPorts := range groupedPorts { + _, portRanges, _ := getPortsAndProtocol(protocolPorts) + expectedAllowed[strings.ToLower(string(protocol))] = portRanges + } + + actualAllowed := make(map[string][]string) + for _, allow := range fw.Allowed { + actualAllowed[strings.ToLower(allow.IPProtocol)] = allow.Ports } - // Make sure the allowed ports match. - portNums, portRanges, _ := getPortsAndProtocol(ports) - // This logic checks if the existing firewall rules contains either enumerated service ports or port ranges. - // This is to prevent unnecessary noop updates to the firewall rule when the existing firewall rule is - // set up via the previous pattern using enumerated ports instead of port ranges. - if !equalStringSets(portNums, fw.Allowed[0].Ports) && !equalStringSets(portRanges, fw.Allowed[0].Ports) { + + if !reflect.DeepEqual(expectedAllowed, actualAllowed) { return true, true, nil } @@ -1008,6 +1195,7 @@ func (g *Cloud) firewallNeedsUpdate(name, serviceName, ipAddress string, ports [ return true, false, nil } + func (g *Cloud) ensureHTTPHealthCheckFirewall(svc *v1.Service, serviceName, ipAddress, region, clusterID string, hosts []*gceInstance, hcName string, hcPort int32, isNodesHealthCheck bool) error { // Prepare the firewall params for creating / checking. desc := fmt.Sprintf(`{"kubernetes.io/cluster-id":"%s"}`, clusterID) @@ -1122,9 +1310,15 @@ func (g *Cloud) updateFirewall(svc *v1.Service, name, desc, destinationIP string func (g *Cloud) firewallObject(name, desc, destinationIP string, sourceRanges utilnet.IPNetSet, ports []v1.ServicePort, hosts []*gceInstance) (*compute.Firewall, error) { // destinationIP can be empty string "" and this means that it is not set. // GCE considers empty destinationRanges as "all" for ingress firewall-rules. - // Concatenate service ports into port ranges. This help to workaround the gce firewall limitation where only - // 100 ports or port ranges can be used in a firewall rule. - _, portRanges, _ := getPortsAndProtocol(ports) + groupedPorts := groupPortsByProtocol(ports) + var allowed []*compute.FirewallAllowed + for protocol, protocolPorts := range groupedPorts { + _, portRanges, _ := getPortsAndProtocol(protocolPorts) + allowed = append(allowed, &compute.FirewallAllowed{ + IPProtocol: strings.ToLower(string(protocol)), + Ports: portRanges, + }) + } // If the node tags to be used for this cluster have been predefined in the // provider config, just use them. Otherwise, invoke computeHostTags method to get the tags. @@ -1142,17 +1336,7 @@ func (g *Cloud) firewallObject(name, desc, destinationIP string, sourceRanges ut Network: g.networkURL, SourceRanges: sourceRanges.StringSlice(), TargetTags: hostTags, - Allowed: []*compute.FirewallAllowed{ - { - // TODO: Make this more generic. Currently this method is only - // used to create firewall rules for loadbalancers, which have - // exactly one protocol, so we can never end up with a list of - // mixed TCP and UDP ports. It should be possible to use a - // single firewall rule for both a TCP and UDP lb. - IPProtocol: strings.ToLower(string(ports[0].Protocol)), - Ports: portRanges, - }, - }, + Allowed: allowed, } if destinationIP != "" { firewall.DestinationRanges = []string{destinationIP} From 55ca64d88864250294eb73c31f6d64e28f698484 Mon Sep 17 00:00:00 2001 From: justinsb Date: Thu, 24 Jul 2025 22:31:28 -0400 Subject: [PATCH 04/10] fixup! controller support for mixed protocol LB services --- providers/gce/gce_loadbalancer.go | 70 ++++++++--------- providers/gce/gce_loadbalancer_external.go | 91 ++++++++++++++++++---- 2 files changed, 113 insertions(+), 48 deletions(-) diff --git a/providers/gce/gce_loadbalancer.go b/providers/gce/gce_loadbalancer.go index d13297f943..447463ef45 100644 --- a/providers/gce/gce_loadbalancer.go +++ b/providers/gce/gce_loadbalancer.go @@ -145,28 +145,28 @@ func (g *Cloud) EnsureLoadBalancer(ctx context.Context, clusterName string, svc return nil, err } - // Services with multiples protocols are not supported by this controller, warn the users and sets - // the corresponding Service Status Condition. + // Services with multiples protocols are not supported by this controller (without AlphaFeatureMultiProtocolLB), + // warn the users and set the corresponding Service Status Condition. // https://github.com/kubernetes/enhancements/tree/master/keps/sig-network/1435-mixed-protocol-lb - if !g.AlphaFeatureGate.Enabled(AlphaFeatureMultiProtocolLB) { - if err := checkMixedProtocol(svc.Spec.Ports); err != nil { - if hasLoadBalancerPortsError(svc) { - return nil, err - } - klog.Warningf("Ignoring service %s/%s using different ports protocols", svc.Namespace, svc.Name) - g.eventRecorder.Event(svc, v1.EventTypeWarning, v1.LoadBalancerPortsErrorReason, "LoadBalancers with multiple protocols are not supported.") - svcApplyStatus := corev1apply.ServiceStatus().WithConditions( - metav1apply.Condition(). - WithType(v1.LoadBalancerPortsError). - WithStatus(metav1.ConditionTrue). - WithReason(v1.LoadBalancerPortsErrorReason). - WithMessage("LoadBalancer with multiple protocols are not supported")) - svcApply := corev1apply.Service(svc.Name, svc.Namespace).WithStatus(svcApplyStatus) - if _, errApply := g.client.CoreV1().Services(svc.Namespace).ApplyStatus(ctx, svcApply, metav1.ApplyOptions{FieldManager: "gce-cloud-controller", Force: true}); errApply != nil { - return nil, errApply - } + if g.AlphaFeatureGate.Enabled(AlphaFeatureMultiProtocolLB) { + klog.Infof("AlphaFeatureMultiProtocolLB feature gate is enabled") + } else if err := checkMixedProtocol(svc.Spec.Ports); err != nil { + if hasLoadBalancerPortsError(svc) { return nil, err } + klog.Warningf("Ignoring service %s/%s using different ports protocols", svc.Namespace, svc.Name) + g.eventRecorder.Event(svc, v1.EventTypeWarning, v1.LoadBalancerPortsErrorReason, "LoadBalancers with multiple protocols are not supported.") + svcApplyStatus := corev1apply.ServiceStatus().WithConditions( + metav1apply.Condition(). + WithType(v1.LoadBalancerPortsError). + WithStatus(metav1.ConditionTrue). + WithReason(v1.LoadBalancerPortsErrorReason). + WithMessage("LoadBalancer with multiple protocols are not supported")) + svcApply := corev1apply.Service(svc.Name, svc.Namespace).WithStatus(svcApplyStatus) + if _, errApply := g.client.CoreV1().Services(svc.Namespace).ApplyStatus(ctx, svcApply, metav1.ApplyOptions{FieldManager: "gce-cloud-controller", Force: true}); errApply != nil { + return nil, errApply + } + return nil, err } klog.V(4).Infof("EnsureLoadBalancer(%v, %v, %v, %v, %v): ensure %v loadbalancer", clusterName, svc.Namespace, svc.Name, loadBalancerName, g.region, desiredScheme) @@ -230,24 +230,24 @@ func (g *Cloud) UpdateLoadBalancer(ctx context.Context, clusterName string, svc return err } - // Services with multiples protocols are not supported by this controller, warn the users and sets + // Services with multiples protocols are not supported by this controller (without AlphaFeatureMultiProtocolLB), warn the users and sets // the corresponding Service Status Condition, but keep processing the Update to not break upgrades. // https://github.com/kubernetes/enhancements/tree/master/keps/sig-network/1435-mixed-protocol-lb - if !g.AlphaFeatureGate.Enabled(AlphaFeatureMultiProtocolLB) { - if err := checkMixedProtocol(svc.Spec.Ports); err != nil && !hasLoadBalancerPortsError(svc) { - klog.Warningf("Ignoring update for service %s/%s using different ports protocols", svc.Namespace, svc.Name) - g.eventRecorder.Event(svc, v1.EventTypeWarning, v1.LoadBalancerPortsErrorReason, "LoadBalancer with multiple protocols are not supported.") - svcApplyStatus := corev1apply.ServiceStatus().WithConditions( - metav1apply.Condition(). - WithType(v1.LoadBalancerPortsError). - WithStatus(metav1.ConditionTrue). - WithReason(v1.LoadBalancerPortsErrorReason). - WithMessage("LoadBalancer with multiple protocols are not supported")) - svcApply := corev1apply.Service(svc.Name, svc.Namespace).WithStatus(svcApplyStatus) - if _, errApply := g.client.CoreV1().Services(svc.Namespace).ApplyStatus(ctx, svcApply, metav1.ApplyOptions{FieldManager: "gce-cloud-controller", Force: true}); errApply != nil { - // the error is retried by the controller loop - return errApply - } + if g.AlphaFeatureGate.Enabled(AlphaFeatureMultiProtocolLB) { + klog.Infof("AlphaFeatureMultiProtocolLB feature gate is enabled") + } else if err := checkMixedProtocol(svc.Spec.Ports); err != nil && !hasLoadBalancerPortsError(svc) { + klog.Warningf("Ignoring update for service %s/%s using different ports protocols", svc.Namespace, svc.Name) + g.eventRecorder.Event(svc, v1.EventTypeWarning, v1.LoadBalancerPortsErrorReason, "LoadBalancer with multiple protocols are not supported.") + svcApplyStatus := corev1apply.ServiceStatus().WithConditions( + metav1apply.Condition(). + WithType(v1.LoadBalancerPortsError). + WithStatus(metav1.ConditionTrue). + WithReason(v1.LoadBalancerPortsErrorReason). + WithMessage("LoadBalancer with multiple protocols are not supported")) + svcApply := corev1apply.Service(svc.Name, svc.Namespace).WithStatus(svcApplyStatus) + if _, errApply := g.client.CoreV1().Services(svc.Namespace).ApplyStatus(ctx, svcApply, metav1.ApplyOptions{FieldManager: "gce-cloud-controller", Force: true}); errApply != nil { + // the error is retried by the controller loop + return errApply } } diff --git a/providers/gce/gce_loadbalancer_external.go b/providers/gce/gce_loadbalancer_external.go index c51a99b07e..15d7978c38 100644 --- a/providers/gce/gce_loadbalancer_external.go +++ b/providers/gce/gce_loadbalancer_external.go @@ -52,9 +52,9 @@ const ( // IP address, a firewall rule, a target pool, and a forwarding rule. This // function has to manage all of them. // -// Due to an interesting series of design decisions, this handles both creating -// new load balancers and updating existing load balancers, recognizing when -// each is needed. +// This function handles both creating new load balancers and updating existing load balancers, +// recognizing when each is needed. +// This approach is resilient, for example if we are interrupted part-way during creation. func (g *Cloud) ensureExternalLoadBalancer(clusterName string, clusterID string, apiService *v1.Service, existingFwdRule *compute.ForwardingRule, nodes []*v1.Node) (*v1.LoadBalancerStatus, error) { // Process services with LoadBalancerClass "networking.gke.io/l4-regional-external-legacy" used for this controller. // LoadBalancerClass can't be updated so we know this controller should process the NetLB. @@ -253,11 +253,37 @@ func (g *Cloud) ensureExternalLoadBalancer(clusterName string, clusterID string, } } - ipAddressToUse, isSafeToReleaseIP, err := g.ensureIPAddress(loadBalancerName, lbRefStr, requestedIP, fwdRuleIP, netTier) - if err != nil { - return nil, err - } + // Make sure we know which IP address will be used and have properly reserved + // it as static before moving forward with the rest of our operations. + // + // We use static IP addresses when updating a load balancer to ensure that we + // can replace the load balancer's other components without changing the + // address its service is reachable on. We do it this way rather than always + // keeping the static IP around even though this is more complicated because + // it makes it less likely that we'll run into quota issues. Only 7 static + // IP addresses are allowed per region by default. + // + // We could let an IP be allocated for us when the forwarding rule is created, + // but we need the IP to set up the firewall rule, and we want to keep the + // forwarding rule creation as the last thing that needs to be done in this + // function in order to maintain the invariant that "if the forwarding rule + // exists, the LB has been fully created". + ipAddressToUse := "" + + // Through this process we try to keep track of whether it is safe to + // release the IP that was allocated. If the user specifically asked for + // an IP, we assume they are managing it themselves. Otherwise, we will + // release the IP in case of early-terminating failure or upon successful + // creating of the LB. + // TODO(#36535): boil this logic down into a set of component functions + // and key the flag values off of errors returned. + isUserOwnedIP := false // if this is set, we never release the IP + isSafeToReleaseIP := false + defer func() { + if isUserOwnedIP { + return + } if isSafeToReleaseIP { if err := g.DeleteRegionAddress(loadBalancerName, g.region); err != nil && !isNotFound(err) { klog.Errorf("ensureExternalLoadBalancer(%s): Failed to release static IP %s in region %v: %v.", lbRefStr, ipAddressToUse, g.region, err) @@ -271,6 +297,36 @@ func (g *Cloud) ensureExternalLoadBalancer(clusterName string, clusterID string, } }() + if requestedIP != "" { + // If user requests a specific IP address, verify first. No mutation to + // the GCE resources will be performed in the verification process. + isUserOwnedIP, err = verifyUserRequestedIP(g, g.region, requestedIP, fwdRuleIP, lbRefStr, netTier) + if err != nil { + return nil, err + } + ipAddressToUse = requestedIP + } + + if !isUserOwnedIP { + // If we are not using the user-owned IP, either promote the + // emphemeral IP used by the fwd rule, or create a new static IP. + ipAddr, existed, err := ensureStaticIP(g, loadBalancerName, serviceName.String(), g.region, fwdRuleIP, netTier) + if err != nil { + return nil, fmt.Errorf("failed to ensure a static IP for load balancer (%s): %v", lbRefStr, err) + } + klog.Infof("ensureExternalLoadBalancer(%s): Ensured IP address %s (tier: %s).", lbRefStr, ipAddr, netTier) + // If the IP was not owned by the user, but it already existed, it + // could indicate that the previous update cycle failed. We can use + // this IP and try to run through the process again, but we should + // not release the IP unless it is explicitly flagged as OK. + isSafeToReleaseIP = !existed + ipAddressToUse = ipAddr + } + + // Deal with the firewall next. The reason we do this here rather than last + // is because the forwarding rule is used as the indicator that the load + // balancer is fully created - it's what getLoadBalancer checks for. + // Check if user specified the allow source range sourceRanges, err := servicehelpers.GetLoadBalancerSourceRanges(apiService) if err != nil { return nil, err @@ -308,7 +364,8 @@ func (g *Cloud) ensureExternalLoadBalancer(clusterName string, clusterID string, klog.Infof("ensureExternalLoadBalancer(%s): Target pool for service doesn't exist.", lbRefStr) } - // Health check logic... + // Check which health check needs to create and which health check needs to delete. + // Health check management is coupled with target pool operation to prevent leaking. var hcToCreate, hcToDelete *compute.HttpHealthCheck hcLocalTrafficExisting, err := g.GetHTTPHealthCheck(loadBalancerName) if err != nil && !isHTTPErrorCode(err, http.StatusNotFound) { @@ -317,6 +374,9 @@ func (g *Cloud) ensureExternalLoadBalancer(clusterName string, clusterID string, if path, healthCheckNodePort := servicehelpers.GetServiceHealthCheckPathPort(apiService); path != "" { klog.V(4).Infof("ensureExternalLoadBalancer(%s): Service needs local traffic health checks on: %d%s.", lbRefStr, healthCheckNodePort, path) if hcLocalTrafficExisting == nil { + // This logic exists to detect a transition for non-OnlyLocal to OnlyLocal service + // turn on the tpNeedsRecreation flag to delete/recreate fwdrule/tpool updating the + // target pool to use local traffic health check. klog.V(2).Infof("ensureExternalLoadBalancer(%s): Updating from nodes health checks to local traffic health checks.", lbRefStr) hcToDelete = makeHTTPHealthCheck(MakeNodesHealthCheckName(clusterID), GetNodesHealthCheckPath(), GetNodesHealthCheckPort()) tpNeedsRecreation = true @@ -534,8 +594,13 @@ func (g *Cloud) ensureExternalLoadBalancerDeleted(clusterName, clusterID string, }, func() error { klog.Infof("ensureExternalLoadBalancerDeleted(%s): Deleting forwarding rules.", lbRefStr) - // The forwarding rule must be deleted before either the target pool can, - // unfortunately, so we have to do these two serially. + // The forwarding rule must be deleted before the target pool can be deleted, + // unfortunately, so we have to delete forwarding rules then target pools serially. + if err := ignoreNotFound(g.DeleteRegionForwardingRule(loadBalancerName, g.region)); err != nil { + return err + } + + // TODO: Always or just with alpha feature flag? frs, err := g.ListRegionForwardingRules(g.region) if err != nil { return err @@ -1125,8 +1190,7 @@ func equalPorts(existingPorts, newPorts []string, existingPortRange, newPortRang func groupPortsByProtocol(ports []v1.ServicePort) map[v1.Protocol][]v1.ServicePort { grouped := make(map[v1.Protocol][]v1.ServicePort) - for _, p := range ports { - port := p + for _, port := range ports { grouped[port.Protocol] = append(grouped[port.Protocol], port) } return grouped @@ -1195,7 +1259,6 @@ func (g *Cloud) firewallNeedsUpdate(name, serviceName, ipAddress string, ports [ return true, false, nil } - func (g *Cloud) ensureHTTPHealthCheckFirewall(svc *v1.Service, serviceName, ipAddress, region, clusterID string, hosts []*gceInstance, hcName string, hcPort int32, isNodesHealthCheck bool) error { // Prepare the firewall params for creating / checking. desc := fmt.Sprintf(`{"kubernetes.io/cluster-id":"%s"}`, clusterID) @@ -1310,6 +1373,8 @@ func (g *Cloud) updateFirewall(svc *v1.Service, name, desc, destinationIP string func (g *Cloud) firewallObject(name, desc, destinationIP string, sourceRanges utilnet.IPNetSet, ports []v1.ServicePort, hosts []*gceInstance) (*compute.Firewall, error) { // destinationIP can be empty string "" and this means that it is not set. // GCE considers empty destinationRanges as "all" for ingress firewall-rules. + // Concatenate service ports into port ranges. This help to workaround the gce firewall limitation where only + // 100 ports or port ranges can be used in a firewall rule. groupedPorts := groupPortsByProtocol(ports) var allowed []*compute.FirewallAllowed for protocol, protocolPorts := range groupedPorts { From 3cda00c1eadb2aa03e06a5db692fbb6cc65ef773 Mon Sep 17 00:00:00 2001 From: justinsb Date: Thu, 24 Jul 2025 22:38:01 -0400 Subject: [PATCH 05/10] refactor into one flow --- providers/gce/gce_loadbalancer_external.go | 236 ++++----------------- 1 file changed, 40 insertions(+), 196 deletions(-) diff --git a/providers/gce/gce_loadbalancer_external.go b/providers/gce/gce_loadbalancer_external.go index 15d7978c38..c3c276ee72 100644 --- a/providers/gce/gce_loadbalancer_external.go +++ b/providers/gce/gce_loadbalancer_external.go @@ -112,123 +112,16 @@ func (g *Cloud) ensureExternalLoadBalancer(clusterName string, clusterID string, g.deleteWrongNetworkTieredResources(loadBalancerName, lbRefStr, netTier) } - // If the feature gate is not enabled, we check for multiple protocols and error out. - if !g.AlphaFeatureGate.Enabled(AlphaFeatureMultiProtocolLB) { - fwdRuleExists, fwdRuleNeedsUpdate, fwdRuleIP, err := g.forwardingRuleNeedsUpdate(loadBalancerName, g.region, requestedIP, ports) - if err != nil { - return nil, err - } - if !fwdRuleExists { - klog.V(2).Infof("ensureExternalLoadBalancer(%s): Forwarding rule %v doesn't exist.", lbRefStr, loadBalancerName) - } - // Single-protocol logic - ipAddressToUse, isSafeToReleaseIP, err := g.ensureIPAddress(loadBalancerName, lbRefStr, apiService.Spec.LoadBalancerIP, fwdRuleIP, netTier) - if err != nil { - return nil, err - } - defer func() { - if isSafeToReleaseIP { - if err := g.DeleteRegionAddress(loadBalancerName, g.region); err != nil && !isNotFound(err) { - klog.Errorf("ensureExternalLoadBalancer(%s): Failed to release static IP %s in region %v: %v.", lbRefStr, ipAddressToUse, g.region, err) - } else if isNotFound(err) { - klog.V(2).Infof("ensureExternalLoadBalancer(%s): IP address %s is not reserved.", lbRefStr, ipAddressToUse) - } else { - klog.Infof("ensureExternalLoadBalancer(%s): Released static IP %s.", lbRefStr, ipAddressToUse) - } - } else { - klog.Warningf("ensureExternalLoadBalancer(%s): Orphaning static IP %s in region %v: %v.", lbRefStr, ipAddressToUse, g.region, err) - } - }() - - sourceRanges, err := servicehelpers.GetLoadBalancerSourceRanges(apiService) - if err != nil { - return nil, err - } - - firewallExists, firewallNeedsUpdate, err := g.firewallNeedsUpdate(loadBalancerName, serviceName.String(), ipAddressToUse, ports, sourceRanges) - if err != nil { - return nil, err - } - - if firewallNeedsUpdate { - desc := makeFirewallDescription(serviceName.String(), ipAddressToUse) - if firewallExists { - klog.Infof("ensureExternalLoadBalancer(%s): Updating firewall.", lbRefStr) - if err := g.updateFirewall(apiService, MakeFirewallName(loadBalancerName), desc, ipAddressToUse, sourceRanges, ports, hosts); err != nil { - return nil, err - } - klog.Infof("ensureExternalLoadBalancer(%s): Updated firewall.", lbRefStr) - } else { - klog.Infof("ensureExternalLoadBalancer(%s): Creating firewall.", lbRefStr) - if err := g.createFirewall(apiService, MakeFirewallName(loadBalancerName), desc, ipAddressToUse, sourceRanges, ports, hosts); err != nil { - return nil, err - } - klog.Infof("ensureExternalLoadBalancer(%s): Created firewall.", lbRefStr) - } - } - - tpExists, tpNeedsRecreation, err := g.targetPoolNeedsRecreation(loadBalancerName, g.region, apiService.Spec.SessionAffinity) - if err != nil { - return nil, err - } - if !tpExists { - klog.Infof("ensureExternalLoadBalancer(%s): Target pool for service doesn't exist.", lbRefStr) - } - - var hcToCreate, hcToDelete *compute.HttpHealthCheck - hcLocalTrafficExisting, err := g.GetHTTPHealthCheck(loadBalancerName) - if err != nil && !isHTTPErrorCode(err, http.StatusNotFound) { - return nil, fmt.Errorf("error checking HTTP health check for load balancer (%s): %v", lbRefStr, err) - } - if path, healthCheckNodePort := servicehelpers.GetServiceHealthCheckPathPort(apiService); path != "" { - klog.V(4).Infof("ensureExternalLoadBalancer(%s): Service needs local traffic health checks on: %d%s.", lbRefStr, healthCheckNodePort, path) - if hcLocalTrafficExisting == nil { - klog.V(2).Infof("ensureExternalLoadBalancer(%s): Updating from nodes health checks to local traffic health checks.", lbRefStr) - hcToDelete = makeHTTPHealthCheck(MakeNodesHealthCheckName(clusterID), GetNodesHealthCheckPath(), GetNodesHealthCheckPort()) - tpNeedsRecreation = true - } - hcToCreate = makeHTTPHealthCheck(loadBalancerName, path, healthCheckNodePort) - } else { - klog.V(4).Infof("ensureExternalLoadBalancer(%s): Service needs nodes health checks.", lbRefStr) - if hcLocalTrafficExisting != nil { - klog.V(2).Infof("ensureExternalLoadBalancer(%s): Updating from local traffic health checks to nodes health checks.", lbRefStr) - hcToDelete = hcLocalTrafficExisting - tpNeedsRecreation = true - } - hcToCreate = makeHTTPHealthCheck(MakeNodesHealthCheckName(clusterID), GetNodesHealthCheckPath(), GetNodesHealthCheckPort()) - } - - if fwdRuleExists && (fwdRuleNeedsUpdate || tpNeedsRecreation) { - isSafeToReleaseIP = false - if err := g.DeleteRegionForwardingRule(loadBalancerName, g.region); err != nil && !isNotFound(err) { - return nil, fmt.Errorf("failed to delete existing forwarding rule for load balancer (%s) update: %v", lbRefStr, err) - } - klog.Infof("ensureExternalLoadBalancer(%s): Deleted forwarding rule.", lbRefStr) - } - - if err := g.ensureTargetPoolAndHealthCheck(tpExists, tpNeedsRecreation, apiService, loadBalancerName, clusterID, ipAddressToUse, hosts, hcToCreate, hcToDelete); err != nil { - return nil, err - } - - if tpNeedsRecreation || fwdRuleNeedsUpdate { - klog.Infof("ensureExternalLoadBalancer(%s): Creating forwarding rule, IP %s (tier: %s).", lbRefStr, ipAddressToUse, netTier) - if err := createForwardingRule(g, loadBalancerName, serviceName.String(), g.region, ipAddressToUse, g.targetPoolURL(loadBalancerName), ports, netTier, g.enableDiscretePortForwarding); err != nil { - return nil, fmt.Errorf("failed to create forwarding rule for load balancer (%s): %v", lbRefStr, err) - } - isSafeToReleaseIP = true - klog.Infof("ensureExternalLoadBalancer(%s): Created forwarding rule, IP %s.", lbRefStr, ipAddressToUse) + groupedPorts := groupPortsByProtocol(ports) + if !g.AlphaFeatureGate.Enabled(AlphaFeatureMultiProtocolLB) && len(groupedPorts) > 1 { + var protocols []string + for p := range groupedPorts { + protocols = append(protocols, string(p)) } - - status := &v1.LoadBalancerStatus{} - status.Ingress = []v1.LoadBalancerIngress{{IP: ipAddressToUse}} - - return status, nil + return nil, fmt.Errorf("load balancer with multiple protocols (%s) is not supported when AlphaFeatureMultiProtocolLB is disabled", strings.Join(protocols, ",")) } - // Multi-protocol logic starts here - groupedPorts := groupPortsByProtocol(ports) var fwdRuleIP string - // We need to determine the IP address for all forwarding rules. // We check if any of the forwarding rules already exist and use their IP. for protocol := range groupedPorts { @@ -255,29 +148,8 @@ func (g *Cloud) ensureExternalLoadBalancer(clusterName string, clusterID string, // Make sure we know which IP address will be used and have properly reserved // it as static before moving forward with the rest of our operations. - // - // We use static IP addresses when updating a load balancer to ensure that we - // can replace the load balancer's other components without changing the - // address its service is reachable on. We do it this way rather than always - // keeping the static IP around even though this is more complicated because - // it makes it less likely that we'll run into quota issues. Only 7 static - // IP addresses are allowed per region by default. - // - // We could let an IP be allocated for us when the forwarding rule is created, - // but we need the IP to set up the firewall rule, and we want to keep the - // forwarding rule creation as the last thing that needs to be done in this - // function in order to maintain the invariant that "if the forwarding rule - // exists, the LB has been fully created". ipAddressToUse := "" - - // Through this process we try to keep track of whether it is safe to - // release the IP that was allocated. If the user specifically asked for - // an IP, we assume they are managing it themselves. Otherwise, we will - // release the IP in case of early-terminating failure or upon successful - // creating of the LB. - // TODO(#36535): boil this logic down into a set of component functions - // and key the flag values off of errors returned. - isUserOwnedIP := false // if this is set, we never release the IP + isUserOwnedIP := false isSafeToReleaseIP := false defer func() { @@ -419,9 +291,14 @@ func (g *Cloud) ensureExternalLoadBalancer(clusterName string, clusterID string, // Then, iterate over the protocols and create/update forwarding rules. for protocol, protocolPorts := range groupedPorts { - frName := g.getProtocolForwardingRuleName(loadBalancerName, protocol) - // If the old forwarding rule matches this protocol, use its name. - if fwd != nil && fwd.IPProtocol == string(protocol) { + var frName string + if g.AlphaFeatureGate.Enabled(AlphaFeatureMultiProtocolLB) { + frName = g.getProtocolForwardingRuleName(loadBalancerName, protocol) + // If the old forwarding rule matches this protocol, use its name. + if fwd != nil && fwd.IPProtocol == string(protocol) { + frName = loadBalancerName + } + } else { frName = loadBalancerName } @@ -430,7 +307,7 @@ func (g *Cloud) ensureExternalLoadBalancer(clusterName string, clusterID string, return nil, err } - if needsUpdate { + if needsUpdate || tpNeedsRecreation { if exists { if err := g.DeleteRegionForwardingRule(frName, g.region); err != nil && !isNotFound(err) { return nil, err @@ -445,31 +322,33 @@ func (g *Cloud) ensureExternalLoadBalancer(clusterName string, clusterID string, } // Garbage collect old forwarding rules. - activeFRNames := sets.NewString() - for protocol := range groupedPorts { - activeFRNames.Insert(g.getProtocolForwardingRuleName(loadBalancerName, protocol)) - } - // Check for the old forwarding rule name. - if fwd != nil { - protocol, err := getProtocol(fwd.IPProtocol) - if err != nil { - return nil, err + if g.AlphaFeatureGate.Enabled(AlphaFeatureMultiProtocolLB) { + activeFRNames := sets.NewString() + for protocol := range groupedPorts { + activeFRNames.Insert(g.getProtocolForwardingRuleName(loadBalancerName, protocol)) } - if _, ok := groupedPorts[protocol]; ok { - activeFRNames.Insert(loadBalancerName) + // Check for the old forwarding rule name. + if fwd != nil { + protocol, err := getProtocol(fwd.IPProtocol) + if err != nil { + return nil, err + } + if _, ok := groupedPorts[protocol]; ok { + activeFRNames.Insert(loadBalancerName) + } } - } - // List all forwarding rules for this service and delete the ones that are not active. - frs, err := g.ListRegionForwardingRules(g.region) - if err != nil { - return nil, err - } - for _, fr := range frs { - if strings.HasPrefix(fr.Name, loadBalancerName) && !activeFRNames.Has(fr.Name) { - klog.Infof("ensureExternalLoadBalancer(%s): Deleting orphaned forwarding rule %s.", lbRefStr, fr.Name) - if err := g.DeleteRegionForwardingRule(fr.Name, g.region); err != nil && !isNotFound(err) { - return nil, err + // List all forwarding rules for this service and delete the ones that are not active. + frs, err := g.ListRegionForwardingRules(g.region) + if err != nil { + return nil, err + } + for _, fr := range frs { + if strings.HasPrefix(fr.Name, loadBalancerName) && !activeFRNames.Has(fr.Name) { + klog.Infof("ensureExternalLoadBalancer(%s): Deleting orphaned forwarding rule %s.", lbRefStr, fr.Name) + if err := g.DeleteRegionForwardingRule(fr.Name, g.region); err != nil && !isNotFound(err) { + return nil, err + } } } } @@ -481,41 +360,6 @@ func (g *Cloud) ensureExternalLoadBalancer(clusterName string, clusterID string, return status, nil } -func (g *Cloud) ensureIPAddress(loadBalancerName, lbRefStr, requestedIP, fwdRuleIP string, netTier cloud.NetworkTier) (ipAddress string, isSafeToReleaseIP bool, err error) { - // Make sure we know which IP address will be used and have properly reserved - // it as static before moving forward with the rest of our operations. - ipAddressToUse := "" - isUserOwnedIP := false - - if requestedIP != "" { - // If user requests a specific IP address, verify first. No mutation to - // the GCE resources will be performed in the verification process. - isUserOwnedIP, err = verifyUserRequestedIP(g, g.region, requestedIP, fwdRuleIP, lbRefStr, netTier) - if err != nil { - return "", false, err - } - ipAddressToUse = requestedIP - } - - if !isUserOwnedIP { - // If we are not using the user-owned IP, either promote the - // emphemeral IP used by the fwd rule, or create a new static IP. - ipAddr, existed, err := ensureStaticIP(g, loadBalancerName, lbRefStr, g.region, fwdRuleIP, netTier) - if err != nil { - return "", false, fmt.Errorf("failed to ensure a static IP for load balancer (%s): %v", lbRefStr, err) - } - klog.Infof("ensureExternalLoadBalancer(%s): Ensured IP address %s (tier: %s).", lbRefStr, ipAddr, netTier) - // If the IP was not owned by the user, but it already existed, it - // could indicate that the previous update cycle failed. We can use - // this IP and try to run through the process again, but we should - // not release the IP unless it is explicitly flagged as OK. - isSafeToReleaseIP = !existed - ipAddressToUse = ipAddr - } - - return ipAddressToUse, isSafeToReleaseIP, nil -} - // updateExternalLoadBalancer is the external implementation of LoadBalancer.UpdateLoadBalancer. func (g *Cloud) updateExternalLoadBalancer(clusterName string, service *v1.Service, nodes []*v1.Node) error { From 93275975b953d1d5cca3153740bab8fa96a8ee95 Mon Sep 17 00:00:00 2001 From: justinsb Date: Thu, 24 Jul 2025 22:54:59 -0400 Subject: [PATCH 06/10] refactor into designed declarative approach --- providers/gce/gce_loadbalancer_external.go | 260 +++++++++------------ 1 file changed, 115 insertions(+), 145 deletions(-) diff --git a/providers/gce/gce_loadbalancer_external.go b/providers/gce/gce_loadbalancer_external.go index c3c276ee72..72f9893c0c 100644 --- a/providers/gce/gce_loadbalancer_external.go +++ b/providers/gce/gce_loadbalancer_external.go @@ -269,95 +269,150 @@ func (g *Cloud) ensureExternalLoadBalancer(clusterName string, clusterID string, } // Forwarding rule logic - // First, handle the old forwarding rule name for backward compatibility. - fwd, err := g.GetRegionForwardingRule(loadBalancerName, g.region) - if err != nil && !isNotFound(err) { - return nil, err + // We build the desired forwarding rules and then sync them with what exists. + existingFRs, err := g.getExistingForwardingRules(loadBalancerName) + if err != nil { + return nil, fmt.Errorf("error getting existing forwarding rules for %s: %v", loadBalancerName, err) } - if fwd != nil { - // If the old forwarding rule exists, we need to check if it matches one of the protocols. - // If so, we keep it. If not, we delete it. - protocol, err := getProtocol(fwd.IPProtocol) - if err != nil { - return nil, err - } - if _, ok := groupedPorts[protocol]; !ok { - // This forwarding rule's protocol is not in the service spec, so delete it. - if err := g.DeleteRegionForwardingRule(loadBalancerName, g.region); err != nil && !isNotFound(err) { + + // Get the old forwarding rule for backward compatibility. + var oldFwdRule *compute.ForwardingRule + if fwd, ok := existingFRs[loadBalancerName]; ok { + oldFwdRule = fwd + } + + desiredFRs, err := g.buildDesiredForwardingRules(loadBalancerName, serviceName.String(), ipAddressToUse, g.targetPoolURL(loadBalancerName), apiService, netTier, oldFwdRule) + if err != nil { + return nil, fmt.Errorf("error building desired forwarding rules for %s: %v", loadBalancerName, err) + } + + // Delete unwanted forwarding rules. + for _, fr := range existingFRs { + if _, ok := desiredFRs[fr.Name]; !ok { + klog.Infof("ensureExternalLoadBalancer(%s): Deleting orphaned forwarding rule %s.", lbRefStr, fr.Name) + if err := g.DeleteRegionForwardingRule(fr.Name, g.region); err != nil && !isNotFound(err) { return nil, err } } } - // Then, iterate over the protocols and create/update forwarding rules. - for protocol, protocolPorts := range groupedPorts { - var frName string - if g.AlphaFeatureGate.Enabled(AlphaFeatureMultiProtocolLB) { - frName = g.getProtocolForwardingRuleName(loadBalancerName, protocol) - // If the old forwarding rule matches this protocol, use its name. - if fwd != nil && fwd.IPProtocol == string(protocol) { - frName = loadBalancerName - } - } else { - frName = loadBalancerName - } + // Create or update forwarding rules. + for _, desiredFR := range desiredFRs { + existingFR, exists := existingFRs[desiredFR.Name] - exists, needsUpdate, _, err := g.forwardingRuleNeedsUpdate(frName, g.region, ipAddressToUse, protocolPorts) - if err != nil { - return nil, err + needsUpdate := false + if exists { + if !g.forwardingRulesEqual(existingFR, desiredFR) { + needsUpdate = true + } } - if needsUpdate || tpNeedsRecreation { + if !exists || needsUpdate || tpNeedsRecreation { if exists { - if err := g.DeleteRegionForwardingRule(frName, g.region); err != nil && !isNotFound(err) { + klog.Infof("ensureExternalLoadBalancer(%s): Deleting forwarding rule %s to update.", lbRefStr, desiredFR.Name) + if err := g.DeleteRegionForwardingRule(desiredFR.Name, g.region); err != nil && !isNotFound(err) { return nil, err } } - klog.Infof("ensureExternalLoadBalancer(%s): Creating forwarding rule %s for protocol %s, IP %s (tier: %s).", lbRefStr, frName, protocol, ipAddressToUse, netTier) - if err := createForwardingRule(g, frName, serviceName.String(), g.region, ipAddressToUse, g.targetPoolURL(loadBalancerName), protocolPorts, netTier, g.enableDiscretePortForwarding); err != nil { + klog.Infof("ensureExternalLoadBalancer(%s): Creating forwarding rule %s for protocol %s, IP %s (tier: %s).", lbRefStr, desiredFR.Name, desiredFR.IPProtocol, ipAddressToUse, netTier) + // The desiredFR is a complete spec, so we can just pass it to CreateRegionForwardingRule. + if err := g.CreateRegionForwardingRule(desiredFR, g.region); err != nil && !isHTTPErrorCode(err, http.StatusConflict) { return nil, fmt.Errorf("failed to create forwarding rule for load balancer (%s): %v", lbRefStr, err) } - klog.Infof("ensureExternalLoadBalancer(%s): Created forwarding rule %s.", lbRefStr, frName) + klog.Infof("ensureExternalLoadBalancer(%s): Created forwarding rule %s.", lbRefStr, desiredFR.Name) } } - // Garbage collect old forwarding rules. - if g.AlphaFeatureGate.Enabled(AlphaFeatureMultiProtocolLB) { - activeFRNames := sets.NewString() - for protocol := range groupedPorts { - activeFRNames.Insert(g.getProtocolForwardingRuleName(loadBalancerName, protocol)) + isSafeToReleaseIP = true + status := &v1.LoadBalancerStatus{} + status.Ingress = []v1.LoadBalancerIngress{{IP: ipAddressToUse}} + + return status, nil +} + +// getExistingForwardingRules returns a map of forwarding rules for the given load balancer. +func (g *Cloud) getExistingForwardingRules(loadBalancerName string) (map[string]*compute.ForwardingRule, error) { + frs, err := g.ListRegionForwardingRules(g.region) + if err != nil { + return nil, err + } + + existingFRs := make(map[string]*compute.ForwardingRule) + for _, fr := range frs { + if strings.HasPrefix(fr.Name, loadBalancerName) { + existingFRs[fr.Name] = fr } - // Check for the old forwarding rule name. - if fwd != nil { - protocol, err := getProtocol(fwd.IPProtocol) - if err != nil { - return nil, err - } - if _, ok := groupedPorts[protocol]; ok { - activeFRNames.Insert(loadBalancerName) + } + return existingFRs, nil +} + +// buildDesiredForwardingRules builds the desired forwarding rules for the given load balancer. +func (g *Cloud) buildDesiredForwardingRules(loadBalancerName, serviceName, ipAddress, targetPoolURL string, apiService *v1.Service, netTier cloud.NetworkTier, oldFwdRule *compute.ForwardingRule) (map[string]*compute.ForwardingRule, error) { + desiredFRs := make(map[string]*compute.ForwardingRule) + groupedPorts := groupPortsByProtocol(apiService.Spec.Ports) + desc := makeServiceDescription(serviceName) + + for protocol, protocolPorts := range groupedPorts { + var frName string + if g.AlphaFeatureGate.Enabled(AlphaFeatureMultiProtocolLB) { + frName = g.getProtocolForwardingRuleName(loadBalancerName, protocol) + // If the old forwarding rule matches this protocol, use its name for backward compatibility. + if oldFwdRule != nil && oldFwdRule.IPProtocol == string(protocol) { + frName = loadBalancerName } + } else { + frName = loadBalancerName } - // List all forwarding rules for this service and delete the ones that are not active. - frs, err := g.ListRegionForwardingRules(g.region) + frPorts := getPorts(protocolPorts) + portRange, err := loadBalancerPortRange(protocolPorts) if err != nil { return nil, err } - for _, fr := range frs { - if strings.HasPrefix(fr.Name, loadBalancerName) && !activeFRNames.Has(fr.Name) { - klog.Infof("ensureExternalLoadBalancer(%s): Deleting orphaned forwarding rule %s.", lbRefStr, fr.Name) - if err := g.DeleteRegionForwardingRule(fr.Name, g.region); err != nil && !isNotFound(err) { - return nil, err - } - } + + rule := &compute.ForwardingRule{ + Name: frName, + Description: desc, + IPAddress: ipAddress, + IPProtocol: string(protocol), + PortRange: portRange, + Target: targetPoolURL, + NetworkTier: netTier.ToGCEValue(), } + + if len(frPorts) <= maxForwardedPorts && g.enableDiscretePortForwarding { + rule.Ports = frPorts + rule.PortRange = "" + } + + desiredFRs[frName] = rule } + return desiredFRs, nil +} - isSafeToReleaseIP = true - status := &v1.LoadBalancerStatus{} - status.Ingress = []v1.LoadBalancerIngress{{IP: ipAddressToUse}} +// forwardingRulesEqual checks if two forwarding rules are equal. +func (g *Cloud) forwardingRulesEqual(existing, desired *compute.ForwardingRule) bool { + if existing.IPAddress != desired.IPAddress { + klog.V(3).Infof("Forwarding rule %s IP changed from %s to %s", existing.Name, existing.IPAddress, desired.IPAddress) + return false + } + if existing.IPProtocol != desired.IPProtocol { + klog.V(3).Infof("Forwarding rule %s protocol changed from %s to %s", existing.Name, existing.IPProtocol, desired.IPProtocol) + return false + } - return status, nil + frEqualPorts := equalPorts(existing.Ports, desired.Ports, existing.PortRange, desired.PortRange, g.enableDiscretePortForwarding) + if !frEqualPorts { + klog.V(3).Infof("Forwarding rule %s ports changed from (range: %v, ports: %v) to (range: %v, ports: %v)", existing.Name, existing.PortRange, existing.Ports, desired.PortRange, desired.Ports) + return false + } + + if existing.NetworkTier != desired.NetworkTier { + klog.V(3).Infof("Forwarding rule %s network tier changed from %s to %s", existing.Name, existing.NetworkTier, desired.NetworkTier) + return false + } + + return true } @@ -853,58 +908,7 @@ func (g *Cloud) ensureHTTPHealthCheck(name, path string, port int32) (hc *comput return hc, nil } -// Passing nil for requested IP is perfectly fine - it just means that no specific -// IP is being requested. -// Returns whether the forwarding rule exists, whether it needs to be updated, -// what its IP address is (if it exists), and any error we encountered. -func (g *Cloud) forwardingRuleNeedsUpdate(name, region string, loadBalancerIP string, ports []v1.ServicePort) (exists bool, needsUpdate bool, ipAddress string, err error) { - fwd, err := g.GetRegionForwardingRule(name, region) - if err != nil { - if isHTTPErrorCode(err, http.StatusNotFound) { - return false, true, "", nil - } - // Err on the side of caution in case of errors. Caller should notice the error and retry. - // We never want to end up recreating resources because g api flaked. - return true, false, "", fmt.Errorf("error getting load balancer's forwarding rule: %v", err) - } - // If the user asks for a specific static ip through the Service spec, - // check that we're actually using it. - // TODO: we report loadbalancer IP through status, so we want to verify if - // that matches the forwarding rule as well. - if loadBalancerIP != "" && loadBalancerIP != fwd.IPAddress { - klog.Infof("LoadBalancer ip for forwarding rule %v was expected to be %v, but was actually %v", fwd.Name, fwd.IPAddress, loadBalancerIP) - return true, true, fwd.IPAddress, nil - } - protocol, err := getProtocol(ports) - if err != nil { - return true, false, "", err - } - - newPortRange, err := loadBalancerPortRange(ports) - if err != nil { - // Err on the side of caution in case of errors. Caller should notice the error and retry. - // We never want to end up recreating resources because g api flaked. - return true, false, "", err - } - newPorts := []string{} - if frPorts := getPorts(ports); len(frPorts) <= maxForwardedPorts && g.enableDiscretePortForwarding { - newPorts = frPorts - newPortRange = "" - } - frEqualPorts := equalPorts(fwd.Ports, newPorts, fwd.PortRange, newPortRange, g.enableDiscretePortForwarding) - if !frEqualPorts { - klog.Infof("Forwarding rule port range / ports are not equal, old (port range: %v, ports: %v), new (port range: %v, ports: %v)", fwd.PortRange, fwd.Ports, newPortRange, newPorts) - return true, true, fwd.IPAddress, nil - } - - if string(protocol) != fwd.IPProtocol { - klog.Infof("LoadBalancer protocol for forwarding rule %v was expected to be %v, but was actually %v", fwd.Name, fwd.IPProtocol, string(protocol)) - return true, true, fwd.IPAddress, nil - } - - return true, false, fwd.IPAddress, nil -} // Doesn't check whether the hosts have changed, since host updating is handled // separately. @@ -1141,41 +1145,7 @@ func (g *Cloud) ensureHTTPHealthCheckFirewall(svc *v1.Service, serviceName, ipAd return nil } -func createForwardingRule(s CloudForwardingRuleService, name, serviceName, region, ipAddress, target string, ports []v1.ServicePort, netTier cloud.NetworkTier, enableDiscretePortForwarding bool) error { - frPorts := getPorts(ports) - protocol, err := getProtocol(ports) - if err != nil { - return err - } - portRange, err := loadBalancerPortRange(ports) - if err != nil { - return err - } - desc := makeServiceDescription(serviceName) - rule := &compute.ForwardingRule{ - Name: name, - Description: desc, - IPAddress: ipAddress, - IPProtocol: string(protocol), - PortRange: portRange, - Target: target, - NetworkTier: netTier.ToGCEValue(), - } - - if len(frPorts) <= maxForwardedPorts && enableDiscretePortForwarding { - rule.Ports = frPorts - rule.PortRange = "" - } - - err = s.CreateRegionForwardingRule(rule, region) - - if err != nil && !isHTTPErrorCode(err, http.StatusConflict) { - return err - } - - return nil -} func (g *Cloud) createFirewall(svc *v1.Service, name, desc, destinationIP string, sourceRanges utilnet.IPNetSet, ports []v1.ServicePort, hosts []*gceInstance) error { firewall, err := g.firewallObject(name, desc, destinationIP, sourceRanges, ports, hosts) From 322427574306a0e5e100afcb9fc204fb570d6c54 Mon Sep 17 00:00:00 2001 From: justinsb Date: Thu, 24 Jul 2025 23:00:11 -0400 Subject: [PATCH 07/10] fixup! refactor into designed declarative approach --- providers/gce/gce_loadbalancer_external.go | 223 ++++++++++++--------- 1 file changed, 124 insertions(+), 99 deletions(-) diff --git a/providers/gce/gce_loadbalancer_external.go b/providers/gce/gce_loadbalancer_external.go index 72f9893c0c..506daaa4f0 100644 --- a/providers/gce/gce_loadbalancer_external.go +++ b/providers/gce/gce_loadbalancer_external.go @@ -112,46 +112,49 @@ func (g *Cloud) ensureExternalLoadBalancer(clusterName string, clusterID string, g.deleteWrongNetworkTieredResources(loadBalancerName, lbRefStr, netTier) } - groupedPorts := groupPortsByProtocol(ports) - if !g.AlphaFeatureGate.Enabled(AlphaFeatureMultiProtocolLB) && len(groupedPorts) > 1 { - var protocols []string - for p := range groupedPorts { - protocols = append(protocols, string(p)) - } - return nil, fmt.Errorf("load balancer with multiple protocols (%s) is not supported when AlphaFeatureMultiProtocolLB is disabled", strings.Join(protocols, ",")) + existingFRs, err := g.getExistingForwardingRules(loadBalancerName) + if err != nil { + return nil, fmt.Errorf("error getting existing forwarding rules for %s: %v", loadBalancerName, err) } + // Check if a forwarding rule exists, and if so, what the IP is. var fwdRuleIP string - // We need to determine the IP address for all forwarding rules. - // We check if any of the forwarding rules already exist and use their IP. - for protocol := range groupedPorts { - frName := g.getProtocolForwardingRuleName(loadBalancerName, protocol) - fwd, err := g.GetRegionForwardingRule(frName, g.region) - if err != nil && !isNotFound(err) { - return nil, err - } - if fwd != nil { - fwdRuleIP = fwd.IPAddress - break - } - } - // If no forwarding rule exists, check for the old one. - if fwdRuleIP == "" { - fwd, err := g.GetRegionForwardingRule(loadBalancerName, g.region) - if err != nil && !isNotFound(err) { - return nil, err - } - if fwd != nil { - fwdRuleIP = fwd.IPAddress + for _, fr := range existingFRs { + if fr.IPAddress != "" { + if fwdRuleIP == "" { + fwdRuleIP = fr.IPAddress + } else if fwdRuleIP != fr.IPAddress { + return nil, fmt.Errorf("found multiple forwarding rules with different IP addresses (%s and %s) for load balancer %s", fwdRuleIP, fr.IPAddress, loadBalancerName) + } } } // Make sure we know which IP address will be used and have properly reserved // it as static before moving forward with the rest of our operations. + // + // We use static IP addresses when updating a load balancer to ensure that we + // can replace the load balancer's other components without changing the + // address its service is reachable on. We do it this way rather than always + // keeping the static IP around even though this is more complicated because + // it makes it less likely that we'll run into quota issues. Only 7 static + // IP addresses are allowed per region by default. + // + // We could let an IP be allocated for us when the forwarding rule is created, + // but we need the IP to set up the firewall rule, and we want to keep the + // forwarding rule creation as the last thing that needs to be done in this + // function in order to maintain the invariant that "if the forwarding rule + // exists, the LB has been fully created". ipAddressToUse := "" - isUserOwnedIP := false - isSafeToReleaseIP := false + // Through this process we try to keep track of whether it is safe to + // release the IP that was allocated. If the user specifically asked for + // an IP, we assume they are managing it themselves. Otherwise, we will + // release the IP in case of early-terminating failure or upon successful + // creating of the LB. + // TODO(#36535): boil this logic down into a set of component functions + // and key the flag values off of errors returned. + isUserOwnedIP := false // if this is set, we never release the IP + isSafeToReleaseIP := false defer func() { if isUserOwnedIP { return @@ -195,6 +198,12 @@ func (g *Cloud) ensureExternalLoadBalancer(clusterName string, clusterID string, ipAddressToUse = ipAddr } + // Now we have the IP address we can build the desired forwarding rules. + desiredFRs, err := g.buildDesiredForwardingRules(loadBalancerName, serviceName.String(), ipAddressToUse, g.targetPoolURL(loadBalancerName), apiService, netTier, existingFRs) + if err != nil { + return nil, fmt.Errorf("error building desired forwarding rules for %s: %v", loadBalancerName, err) + } + // Deal with the firewall next. The reason we do this here rather than last // is because the forwarding rule is used as the indicator that the load // balancer is fully created - it's what getLoadBalancer checks for. @@ -257,6 +266,9 @@ func (g *Cloud) ensureExternalLoadBalancer(clusterName string, clusterID string, } else { klog.V(4).Infof("ensureExternalLoadBalancer(%s): Service needs nodes health checks.", lbRefStr) if hcLocalTrafficExisting != nil { + // This logic exists to detect a transition from OnlyLocal to non-OnlyLocal service + // and turn on the tpNeedsRecreation flag to delete/recreate fwdrule/tpool updating the + // target pool to use nodes health check. klog.V(2).Infof("ensureExternalLoadBalancer(%s): Updating from local traffic health checks to nodes health checks.", lbRefStr) hcToDelete = hcLocalTrafficExisting tpNeedsRecreation = true @@ -264,26 +276,32 @@ func (g *Cloud) ensureExternalLoadBalancer(clusterName string, clusterID string, hcToCreate = makeHTTPHealthCheck(MakeNodesHealthCheckName(clusterID), GetNodesHealthCheckPath(), GetNodesHealthCheckPort()) } - if err := g.ensureTargetPoolAndHealthCheck(tpExists, tpNeedsRecreation, apiService, loadBalancerName, clusterID, ipAddressToUse, hosts, hcToCreate, hcToDelete); err != nil { - return nil, err - } - - // Forwarding rule logic - // We build the desired forwarding rules and then sync them with what exists. - existingFRs, err := g.getExistingForwardingRules(loadBalancerName) - if err != nil { - return nil, fmt.Errorf("error getting existing forwarding rules for %s: %v", loadBalancerName, err) - } + // Now we get to some slightly more interesting logic. + // First, neither target pools nor forwarding rules can be updated in place - + // they have to be deleted and recreated. + // Second, forwarding rules are layered on top of target pools in that you + // can't delete a target pool that's currently in use by a forwarding rule. + // Thus, we have to tear down a forwarding rule if either it or the target + // pool needs to be updated. + if len(existingFRs) > 0 && tpNeedsRecreation { + // Begin critical section. If we have to delete the forwarding rule, + // and something should fail before we recreate it, don't release the + // IP. That way we can come back to it later. + isSafeToReleaseIP = false + + for _, fr := range existingFRs { + if err := g.DeleteRegionForwardingRule(fr.Name, g.region); err != nil && !isNotFound(err) { + return nil, fmt.Errorf("failed to delete existing forwarding rule for load balancer (%s) update: %v", lbRefStr, err) + } + } + klog.Infof("ensureExternalLoadBalancer(%s): Deleted forwarding rule(s).", lbRefStr) - // Get the old forwarding rule for backward compatibility. - var oldFwdRule *compute.ForwardingRule - if fwd, ok := existingFRs[loadBalancerName]; ok { - oldFwdRule = fwd + // Clear the existing forwarding rules so we don't try to delete them again. + existingFRs = nil } - desiredFRs, err := g.buildDesiredForwardingRules(loadBalancerName, serviceName.String(), ipAddressToUse, g.targetPoolURL(loadBalancerName), apiService, netTier, oldFwdRule) - if err != nil { - return nil, fmt.Errorf("error building desired forwarding rules for %s: %v", loadBalancerName, err) + if err := g.ensureTargetPoolAndHealthCheck(tpExists, tpNeedsRecreation, apiService, loadBalancerName, clusterID, ipAddressToUse, hosts, hcToCreate, hcToDelete); err != nil { + return nil, err } // Delete unwanted forwarding rules. @@ -299,31 +317,34 @@ func (g *Cloud) ensureExternalLoadBalancer(clusterName string, clusterID string, // Create or update forwarding rules. for _, desiredFR := range desiredFRs { existingFR, exists := existingFRs[desiredFR.Name] - - needsUpdate := false if exists { - if !g.forwardingRulesEqual(existingFR, desiredFR) { - needsUpdate = true + if g.forwardingRulesEqual(existingFR, desiredFR) { + continue } } - if !exists || needsUpdate || tpNeedsRecreation { - if exists { - klog.Infof("ensureExternalLoadBalancer(%s): Deleting forwarding rule %s to update.", lbRefStr, desiredFR.Name) - if err := g.DeleteRegionForwardingRule(desiredFR.Name, g.region); err != nil && !isNotFound(err) { - return nil, err - } - } - klog.Infof("ensureExternalLoadBalancer(%s): Creating forwarding rule %s for protocol %s, IP %s (tier: %s).", lbRefStr, desiredFR.Name, desiredFR.IPProtocol, ipAddressToUse, netTier) - // The desiredFR is a complete spec, so we can just pass it to CreateRegionForwardingRule. - if err := g.CreateRegionForwardingRule(desiredFR, g.region); err != nil && !isHTTPErrorCode(err, http.StatusConflict) { - return nil, fmt.Errorf("failed to create forwarding rule for load balancer (%s): %v", lbRefStr, err) + // We can't update a forwarding rule in place, so we have to delete it and recreate it. + if exists { + klog.Infof("ensureExternalLoadBalancer(%s): Deleting forwarding rule %s to update.", lbRefStr, desiredFR.Name) + if err := g.DeleteRegionForwardingRule(desiredFR.Name, g.region); err != nil && !isNotFound(err) { + return nil, err } - klog.Infof("ensureExternalLoadBalancer(%s): Created forwarding rule %s.", lbRefStr, desiredFR.Name) } + klog.Infof("ensureExternalLoadBalancer(%s): Creating forwarding rule %s for protocol %s, IP %s (tier: %s).", lbRefStr, desiredFR.Name, desiredFR.IPProtocol, ipAddressToUse, netTier) + // The desiredFR is a complete spec, so we can just pass it to CreateRegionForwardingRule. + // TODO: Why do we ignore the conflict error? + if err := g.CreateRegionForwardingRule(desiredFR, g.region); err != nil && !isHTTPErrorCode(err, http.StatusConflict) { + return nil, fmt.Errorf("failed to create forwarding rule for load balancer (%s): %v", lbRefStr, err) + } + klog.Infof("ensureExternalLoadBalancer(%s): Created forwarding rule %s.", lbRefStr, desiredFR.Name) } + // End critical section. It is safe to release the static IP (which + // just demotes it to ephemeral) now that it is attached. In the case + // of a user-requested IP, the "is user-owned" flag will be set, + // preventing it from actually being released. isSafeToReleaseIP = true + status := &v1.LoadBalancerStatus{} status.Ingress = []v1.LoadBalancerIngress{{IP: ipAddressToUse}} @@ -339,7 +360,14 @@ func (g *Cloud) getExistingForwardingRules(loadBalancerName string) (map[string] existingFRs := make(map[string]*compute.ForwardingRule) for _, fr := range frs { - if strings.HasPrefix(fr.Name, loadBalancerName) { + isMatch := false + if fr.Name == loadBalancerName { + isMatch = true + } else if fr.Name == g.getProtocolForwardingRuleName(loadBalancerName, v1.Protocol(fr.IPProtocol)) { + isMatch = true + } + + if isMatch { existingFRs[fr.Name] = fr } } @@ -347,11 +375,17 @@ func (g *Cloud) getExistingForwardingRules(loadBalancerName string) (map[string] } // buildDesiredForwardingRules builds the desired forwarding rules for the given load balancer. -func (g *Cloud) buildDesiredForwardingRules(loadBalancerName, serviceName, ipAddress, targetPoolURL string, apiService *v1.Service, netTier cloud.NetworkTier, oldFwdRule *compute.ForwardingRule) (map[string]*compute.ForwardingRule, error) { +func (g *Cloud) buildDesiredForwardingRules(loadBalancerName, serviceName, ipAddress, targetPoolURL string, apiService *v1.Service, netTier cloud.NetworkTier, existingFRs map[string]*compute.ForwardingRule) (map[string]*compute.ForwardingRule, error) { desiredFRs := make(map[string]*compute.ForwardingRule) groupedPorts := groupPortsByProtocol(apiService.Spec.Ports) desc := makeServiceDescription(serviceName) + // Find the legacy forwarding rule to minimize changes. + var oldFwdRule *compute.ForwardingRule + if fwd, ok := existingFRs[loadBalancerName]; ok { + oldFwdRule = fwd + } + for protocol, protocolPorts := range groupedPorts { var frName string if g.AlphaFeatureGate.Enabled(AlphaFeatureMultiProtocolLB) { @@ -364,7 +398,6 @@ func (g *Cloud) buildDesiredForwardingRules(loadBalancerName, serviceName, ipAdd frName = loadBalancerName } - frPorts := getPorts(protocolPorts) portRange, err := loadBalancerPortRange(protocolPorts) if err != nil { return nil, err @@ -380,8 +413,10 @@ func (g *Cloud) buildDesiredForwardingRules(loadBalancerName, serviceName, ipAdd NetworkTier: netTier.ToGCEValue(), } - if len(frPorts) <= maxForwardedPorts && g.enableDiscretePortForwarding { - rule.Ports = frPorts + if len(protocolPorts) <= maxForwardedPorts && g.enableDiscretePortForwarding { + for _, p := range protocolPorts { + rule.Ports = append(rule.Ports, strconv.Itoa(int(p.Port))) + } rule.PortRange = "" } @@ -392,6 +427,14 @@ func (g *Cloud) buildDesiredForwardingRules(loadBalancerName, serviceName, ipAdd // forwardingRulesEqual checks if two forwarding rules are equal. func (g *Cloud) forwardingRulesEqual(existing, desired *compute.ForwardingRule) bool { + if existing.Name != desired.Name { + klog.V(3).Infof("Forwarding rule %s name changed from %s to %s", existing.Name, existing.Name, desired.Name) + return false + } + if existing.Description != desired.Description { + klog.V(3).Infof("Forwarding rule %s description changed from %s to %s", existing.Name, existing.Description, desired.Description) + return false + } if existing.IPAddress != desired.IPAddress { klog.V(3).Infof("Forwarding rule %s IP changed from %s to %s", existing.Name, existing.IPAddress, desired.IPAddress) return false @@ -415,7 +458,6 @@ func (g *Cloud) forwardingRulesEqual(existing, desired *compute.ForwardingRule) return true } - // updateExternalLoadBalancer is the external implementation of LoadBalancer.UpdateLoadBalancer. func (g *Cloud) updateExternalLoadBalancer(clusterName string, service *v1.Service, nodes []*v1.Node) error { // Process services with LoadBalancerClass "networking.gke.io/l4-regional-external-legacy" used for this controller. @@ -500,17 +542,16 @@ func (g *Cloud) ensureExternalLoadBalancerDeleted(clusterName, clusterID string, } // TODO: Always or just with alpha feature flag? - frs, err := g.ListRegionForwardingRules(g.region) + existingFRs, err := g.getExistingForwardingRules(loadBalancerName) if err != nil { return err } + var deleteErrs []error - for _, fr := range frs { - if strings.HasPrefix(fr.Name, loadBalancerName) { - klog.Infof("ensureExternalLoadBalancerDeleted(%s): Deleting forwarding rule %s.", lbRefStr, fr.Name) - if err := ignoreNotFound(g.DeleteRegionForwardingRule(fr.Name, g.region)); err != nil { - deleteErrs = append(deleteErrs, err) - } + for _, fr := range existingFRs { + klog.Infof("ensureExternalLoadBalancerDeleted(%s): Deleting forwarding rule %s.", lbRefStr, fr.Name) + if err := ignoreNotFound(g.DeleteRegionForwardingRule(fr.Name, g.region)); err != nil { + deleteErrs = append(deleteErrs, err) } } if len(deleteErrs) > 0 { @@ -908,8 +949,6 @@ func (g *Cloud) ensureHTTPHealthCheck(name, path string, port int32) (hc *comput return hc, nil } - - // Doesn't check whether the hosts have changed, since host updating is handled // separately. func (g *Cloud) targetPoolNeedsRecreation(name, region string, affinityType v1.ServiceAffinity) (exists bool, needsRecreation bool, err error) { @@ -964,26 +1003,14 @@ func hostURLToComparablePath(hostURL string) string { return hostURL[idx:] } -func getProtocol(svcPorts []v1.ServicePort) (v1.Protocol, error) { - if len(svcPorts) == 0 { - return v1.ProtocolTCP, nil - } - // The service controller verified all the protocols match on the ports, just check and use the first one - protocol := svcPorts[0].Protocol - if protocol != v1.ProtocolTCP && protocol != v1.ProtocolUDP { - return v1.ProtocolTCP, fmt.Errorf("invalid protocol %s, only TCP and UDP are supported", string(protocol)) - } - return protocol, nil -} - -func getPorts(svcPorts []v1.ServicePort) []string { - ports := []string{} - for _, p := range svcPorts { - ports = append(ports, strconv.Itoa(int(p.Port))) - } +// func getPorts(svcPorts []v1.ServicePort) []string { +// ports := []string{} +// for _, p := range svcPorts { +// ports = append(ports, strconv.Itoa(int(p.Port))) +// } - return ports -} +// return ports +// } func minMaxPort[T v1.ServicePort | string](svcPorts []T) (int32, int32) { minPort := int32(65536) @@ -1145,8 +1172,6 @@ func (g *Cloud) ensureHTTPHealthCheckFirewall(svc *v1.Service, serviceName, ipAd return nil } - - func (g *Cloud) createFirewall(svc *v1.Service, name, desc, destinationIP string, sourceRanges utilnet.IPNetSet, ports []v1.ServicePort, hosts []*gceInstance) error { firewall, err := g.firewallObject(name, desc, destinationIP, sourceRanges, ports, hosts) if err != nil { From 4d49b9ac0665266eb337dd344b51828aa0c4ca2c Mon Sep 17 00:00:00 2001 From: justinsb Date: Fri, 25 Jul 2025 00:06:08 -0400 Subject: [PATCH 08/10] Update protocol ports --- providers/gce/gce_loadbalancer_external.go | 80 +++++++------------- providers/gce/gce_loadbalancer_internal.go | 62 +++++++++------ providers/gce/gce_loadbalancer_naming.go | 20 ++++- providers/gce/gce_loadbalancer_utils_test.go | 3 +- 4 files changed, 83 insertions(+), 82 deletions(-) diff --git a/providers/gce/gce_loadbalancer_external.go b/providers/gce/gce_loadbalancer_external.go index 506daaa4f0..9555081271 100644 --- a/providers/gce/gce_loadbalancer_external.go +++ b/providers/gce/gce_loadbalancer_external.go @@ -377,7 +377,7 @@ func (g *Cloud) getExistingForwardingRules(loadBalancerName string) (map[string] // buildDesiredForwardingRules builds the desired forwarding rules for the given load balancer. func (g *Cloud) buildDesiredForwardingRules(loadBalancerName, serviceName, ipAddress, targetPoolURL string, apiService *v1.Service, netTier cloud.NetworkTier, existingFRs map[string]*compute.ForwardingRule) (map[string]*compute.ForwardingRule, error) { desiredFRs := make(map[string]*compute.ForwardingRule) - groupedPorts := groupPortsByProtocol(apiService.Spec.Ports) + groupedPorts := getPortsAndProtocols(apiService.Spec.Ports) desc := makeServiceDescription(serviceName) // Find the legacy forwarding rule to minimize changes. @@ -398,7 +398,7 @@ func (g *Cloud) buildDesiredForwardingRules(loadBalancerName, serviceName, ipAdd frName = loadBalancerName } - portRange, err := loadBalancerPortRange(protocolPorts) + portRange, err := loadBalancerPortRange(protocolPorts.ports) if err != nil { return nil, err } @@ -413,9 +413,9 @@ func (g *Cloud) buildDesiredForwardingRules(loadBalancerName, serviceName, ipAdd NetworkTier: netTier.ToGCEValue(), } - if len(protocolPorts) <= maxForwardedPorts && g.enableDiscretePortForwarding { - for _, p := range protocolPorts { - rule.Ports = append(rule.Ports, strconv.Itoa(int(p.Port))) + if len(protocolPorts.ports) <= maxForwardedPorts && g.enableDiscretePortForwarding { + for _, p := range protocolPorts.ports { + rule.Ports = append(rule.Ports, strconv.Itoa(p)) } rule.PortRange = "" } @@ -1003,30 +1003,15 @@ func hostURLToComparablePath(hostURL string) string { return hostURL[idx:] } -// func getPorts(svcPorts []v1.ServicePort) []string { -// ports := []string{} -// for _, p := range svcPorts { -// ports = append(ports, strconv.Itoa(int(p.Port))) -// } - -// return ports -// } - -func minMaxPort[T v1.ServicePort | string](svcPorts []T) (int32, int32) { - minPort := int32(65536) - maxPort := int32(0) - for _, svcPort := range svcPorts { - port := func(value any) int32 { - switch value.(type) { - case v1.ServicePort: - return value.(v1.ServicePort).Port - case string: - i, _ := strconv.ParseInt(value.(string), 10, 32) - return int32(i) - default: - return 0 - } - }(svcPort) +func loadBalancerPortRange(ports []int) (string, error) { + if len(ports) == 0 { + return "", fmt.Errorf("no ports specified for GCE load balancer") + } + + minPort := 65536 + maxPort := 0 + + for _, port := range ports { if port < minPort { minPort = port } @@ -1034,15 +1019,6 @@ func minMaxPort[T v1.ServicePort | string](svcPorts []T) (int32, int32) { maxPort = port } } - return minPort, maxPort -} - -func loadBalancerPortRange[T v1.ServicePort | string](svcPorts []T) (string, error) { - if len(svcPorts) == 0 { - return "", fmt.Errorf("no ports specified for GCE load balancer") - } - - minPort, maxPort := minMaxPort(svcPorts) return fmt.Sprintf("%d-%d", minPort, maxPort), nil } @@ -1058,19 +1034,19 @@ func equalPorts(existingPorts, newPorts []string, existingPortRange, newPortRang // Existing forwarding rule contains a port range. To keep it that way, // compare new list of ports as if it was a port range, too. if len(newPorts) != 0 { - newPortRange, _ = loadBalancerPortRange(newPorts) + var portInts []int + for _, port := range newPorts { + portInt, err := strconv.Atoi(port) + if err != nil { + klog.Errorf("invalid port %s: %v", port, err) + } + portInts = append(portInts, portInt) + } + newPortRange, _ = loadBalancerPortRange(portInts) } return existingPortRange == newPortRange } -func groupPortsByProtocol(ports []v1.ServicePort) map[v1.Protocol][]v1.ServicePort { - grouped := make(map[v1.Protocol][]v1.ServicePort) - for _, port := range ports { - grouped[port.Protocol] = append(grouped[port.Protocol], port) - } - return grouped -} - // translate from what K8s supports to what the cloud provider supports for session affinity. func translateAffinityType(affinityType v1.ServiceAffinity) string { switch affinityType { @@ -1096,11 +1072,10 @@ func (g *Cloud) firewallNeedsUpdate(name, serviceName, ipAddress string, ports [ return true, true, nil } - groupedPorts := groupPortsByProtocol(ports) + groupedPorts := getPortsAndProtocols(ports) expectedAllowed := make(map[string][]string) for protocol, protocolPorts := range groupedPorts { - _, portRanges, _ := getPortsAndProtocol(protocolPorts) - expectedAllowed[strings.ToLower(string(protocol))] = portRanges + expectedAllowed[strings.ToLower(string(protocol))] = protocolPorts.portRanges } actualAllowed := make(map[string][]string) @@ -1214,13 +1189,12 @@ func (g *Cloud) firewallObject(name, desc, destinationIP string, sourceRanges ut // GCE considers empty destinationRanges as "all" for ingress firewall-rules. // Concatenate service ports into port ranges. This help to workaround the gce firewall limitation where only // 100 ports or port ranges can be used in a firewall rule. - groupedPorts := groupPortsByProtocol(ports) + groupedPorts := getPortsAndProtocols(ports) var allowed []*compute.FirewallAllowed for protocol, protocolPorts := range groupedPorts { - _, portRanges, _ := getPortsAndProtocol(protocolPorts) allowed = append(allowed, &compute.FirewallAllowed{ IPProtocol: strings.ToLower(string(protocol)), - Ports: portRanges, + Ports: protocolPorts.portRanges, }) } diff --git a/providers/gce/gce_loadbalancer_internal.go b/providers/gce/gce_loadbalancer_internal.go index 562d417127..1c5bb7f355 100644 --- a/providers/gce/gce_loadbalancer_internal.go +++ b/providers/gce/gce_loadbalancer_internal.go @@ -94,10 +94,13 @@ func (g *Cloud) ensureInternalLoadBalancer(clusterName, clusterID string, svc *v return nil, err } - ports, _, protocol := getPortsAndProtocol(svc.Spec.Ports) - if protocol != v1.ProtocolTCP && protocol != v1.ProtocolUDP { - return nil, fmt.Errorf("Invalid protocol %s, only TCP and UDP are supported", string(protocol)) + groupedPorts := getPortsAndProtocols(svc.Spec.Ports) + for protocol := range groupedPorts { + if protocol != v1.ProtocolTCP && protocol != v1.ProtocolUDP { + return nil, fmt.Errorf("Invalid protocol %s, only TCP and UDP are supported", string(protocol)) + } } + scheme := cloud.SchemeInternal options := getILBOptions(svc) if g.IsLegacyNetwork() { @@ -106,7 +109,7 @@ func (g *Cloud) ensureInternalLoadBalancer(clusterName, clusterID string, svc *v } sharedBackend := shareBackendService(svc) - backendServiceName := makeBackendServiceName(loadBalancerName, clusterID, sharedBackend, scheme, protocol, svc.Spec.SessionAffinity) + backendServiceName := makeBackendServiceName(loadBalancerName, clusterID, sharedBackend, scheme, groupedPorts, svc.Spec.SessionAffinity) backendServiceLink := g.getBackendServiceLink(backendServiceName) // Ensure instance groups exist and nodes are assigned to groups @@ -359,10 +362,10 @@ func (g *Cloud) updateInternalLoadBalancer(clusterName, clusterID string, svc *v } // Generate the backend service name - _, _, protocol := getPortsAndProtocol(svc.Spec.Ports) + groupedPorts := getPortsAndProtocols(svc.Spec.Ports) scheme := cloud.SchemeInternal loadBalancerName := g.GetLoadBalancerName(context.TODO(), clusterName, svc) - backendServiceName := makeBackendServiceName(loadBalancerName, clusterID, shareBackendService(svc), scheme, protocol, svc.Spec.SessionAffinity) + backendServiceName := makeBackendServiceName(loadBalancerName, clusterID, shareBackendService(svc), scheme, groupedPorts, svc.Spec.SessionAffinity) // Ensure the backend service has the proper backend/instance-group links return g.ensureInternalBackendServiceGroups(backendServiceName, igLinks) } @@ -383,7 +386,7 @@ func (g *Cloud) ensureInternalLoadBalancerDeleted(clusterName, clusterID string, loadBalancerName := g.GetLoadBalancerName(context.TODO(), clusterName, svc) svcNamespacedName := types.NamespacedName{Name: svc.Name, Namespace: svc.Namespace} - _, _, protocol := getPortsAndProtocol(svc.Spec.Ports) + groupedPorts := getPortsAndProtocols(svc.Spec.Ports) scheme := cloud.SchemeInternal sharedBackend := shareBackendService(svc) sharedHealthCheck := !servicehelpers.RequestsOnlyLocalTraffic(svc) @@ -399,7 +402,7 @@ func (g *Cloud) ensureInternalLoadBalancerDeleted(clusterName, clusterID string, return err } - backendServiceName := makeBackendServiceName(loadBalancerName, clusterID, sharedBackend, scheme, protocol, svc.Spec.SessionAffinity) + backendServiceName := makeBackendServiceName(loadBalancerName, clusterID, sharedBackend, scheme, groupedPorts, svc.Spec.SessionAffinity) klog.V(2).Infof("ensureInternalLoadBalancerDeleted(%v): deleting region backend service %v", loadBalancerName, backendServiceName) if err := g.teardownInternalBackendService(backendServiceName); err != nil { return err @@ -495,7 +498,7 @@ func (g *Cloud) teardownInternalHealthCheckAndFirewall(svc *v1.Service, hcName s return nil } -func (g *Cloud) ensureInternalFirewall(svc *v1.Service, fwName, fwDesc, destinationIP string, sourceRanges []string, portRanges []string, protocol v1.Protocol, nodes []*v1.Node, legacyFwName string) error { +func (g *Cloud) ensureInternalFirewall(svc *v1.Service, fwName, fwDesc, destinationIP string, sourceRanges []string, groupedPorts map[v1.Protocol]ProtocolPorts, nodes []*v1.Node, legacyFwName string) error { klog.V(2).Infof("ensureInternalFirewall(%v): checking existing firewall", fwName) targetTags, err := g.GetNodeTags(nodeNames(nodes)) if err != nil { @@ -536,12 +539,13 @@ func (g *Cloud) ensureInternalFirewall(svc *v1.Service, fwName, fwDesc, destinat Network: g.networkURL, SourceRanges: sourceRanges, TargetTags: targetTags, - Allowed: []*compute.FirewallAllowed{ - { - IPProtocol: strings.ToLower(string(protocol)), - Ports: portRanges, - }, - }, + } + + for protocol, protocolPorts := range groupedPorts { + expectedFirewall.Allowed = append(expectedFirewall.Allowed, &compute.FirewallAllowed{ + IPProtocol: strings.ToLower(string(protocol)), + Ports: protocolPorts.portRanges, + }) } if destinationIP != "" { @@ -576,12 +580,12 @@ func (g *Cloud) ensureInternalFirewall(svc *v1.Service, fwName, fwDesc, destinat func (g *Cloud) ensureInternalFirewalls(loadBalancerName, ipAddress, clusterID string, nm types.NamespacedName, svc *v1.Service, healthCheckPort string, sharedHealthCheck bool, nodes []*v1.Node) error { // First firewall is for ingress traffic fwDesc := makeFirewallDescription(nm.String(), ipAddress) - _, portRanges, protocol := getPortsAndProtocol(svc.Spec.Ports) + groupedPorts := getPortsAndProtocols(svc.Spec.Ports) sourceRanges, err := servicehelpers.GetLoadBalancerSourceRanges(svc) if err != nil { return err } - err = g.ensureInternalFirewall(svc, MakeFirewallName(loadBalancerName), fwDesc, ipAddress, sourceRanges.StringSlice(), portRanges, protocol, nodes, loadBalancerName) + err = g.ensureInternalFirewall(svc, MakeFirewallName(loadBalancerName), fwDesc, ipAddress, sourceRanges.StringSlice(), groupedPorts, nodes, loadBalancerName) if err != nil { return err } @@ -957,20 +961,28 @@ func backendSvcEqual(a, b *compute.BackendService) bool { backendsListEqual(a.Backends, b.Backends) } -func getPortsAndProtocol(svcPorts []v1.ServicePort) (ports []string, portRanges []string, protocol v1.Protocol) { +type ProtocolPorts struct { + ports []int + portRanges []string +} + +func getPortsAndProtocols(svcPorts []v1.ServicePort) map[v1.Protocol]ProtocolPorts { if len(svcPorts) == 0 { - return []string{}, []string{}, v1.ProtocolUDP + return nil } - // GCP doesn't support multiple protocols for a single load balancer - protocol = svcPorts[0].Protocol - portInts := []int{} + m := make(map[v1.Protocol]ProtocolPorts) for _, p := range svcPorts { - ports = append(ports, strconv.Itoa(int(p.Port))) - portInts = append(portInts, int(p.Port)) + ports := m[p.Protocol] + ports.ports = append(ports.ports, int(p.Port)) + } + + for protocol, ports := range m { + ports.portRanges = getPortRanges(ports.ports) + m[protocol] = ports } - return ports, getPortRanges(portInts), protocol + return m } func getPortRanges(ports []int) (ranges []string) { diff --git a/providers/gce/gce_loadbalancer_naming.go b/providers/gce/gce_loadbalancer_naming.go index 01c8765e94..c3906e983f 100644 --- a/providers/gce/gce_loadbalancer_naming.go +++ b/providers/gce/gce_loadbalancer_naming.go @@ -23,10 +23,11 @@ import ( "crypto/sha1" "encoding/hex" "fmt" + "sort" "strings" "github.com/GoogleCloudPlatform/k8s-cloud-provider/pkg/cloud" - "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/types" ) @@ -42,7 +43,7 @@ func makeInstanceGroupName(clusterID string) string { return fmt.Sprintf("%s--%s", prefix, clusterID) } -func makeBackendServiceName(loadBalancerName, clusterID string, shared bool, scheme cloud.LbScheme, protocol v1.Protocol, svcAffinity v1.ServiceAffinity) string { +func makeBackendServiceName(loadBalancerName, clusterID string, shared bool, scheme cloud.LbScheme, protocols map[v1.Protocol]ProtocolPorts, svcAffinity v1.ServiceAffinity) string { if shared { hash := sha1.New() @@ -52,6 +53,19 @@ func makeBackendServiceName(loadBalancerName, clusterID string, shared bool, sch hashed := hex.EncodeToString(hash.Sum(nil)) hashed = hashed[:16] + // We pick TCP as the default, otherwise we pick the first protocol alphabetically. + chosenProtocol := "" + if _, found := protocols[v1.ProtocolTCP]; found { + chosenProtocol = "tcp" + } else { + var keys []string + for protocol := range protocols { + keys = append(keys, strings.ToLower(string(protocol))) + } + sort.Strings(keys) + chosenProtocol = keys[0] + } + // k8s- 4 // {clusterid}- 17 // {scheme}- 9 (internal/external) @@ -60,7 +74,7 @@ func makeBackendServiceName(loadBalancerName, clusterID string, shared bool, sch // {suffix} 16 (hash of settings) // ----------------- // 55 characters used - return fmt.Sprintf("k8s-%s-%s-%s-nmv1-%s", clusterID, strings.ToLower(string(scheme)), strings.ToLower(string(protocol)), hashed) + return fmt.Sprintf("k8s-%s-%s-%s-nmv1-%s", clusterID, strings.ToLower(string(scheme)), chosenProtocol, hashed) } return loadBalancerName } diff --git a/providers/gce/gce_loadbalancer_utils_test.go b/providers/gce/gce_loadbalancer_utils_test.go index cf06b3b1d0..507a44bb4d 100644 --- a/providers/gce/gce_loadbalancer_utils_test.go +++ b/providers/gce/gce_loadbalancer_utils_test.go @@ -248,7 +248,8 @@ func assertInternalLbResources(t *testing.T, gce *Cloud, apiService *v1.Service, // Check that BackendService exists sharedBackend := shareBackendService(apiService) - backendServiceName := makeBackendServiceName(lbName, vals.ClusterID, sharedBackend, cloud.SchemeInternal, "TCP", apiService.Spec.SessionAffinity) + groupedPorts := getPortsAndProtocols(apiService.Spec.Ports) + backendServiceName := makeBackendServiceName(lbName, vals.ClusterID, sharedBackend, cloud.SchemeInternal, groupedPorts, apiService.Spec.SessionAffinity) backendServiceLink := gce.getBackendServiceLink(backendServiceName) bs, err := gce.GetRegionBackendService(backendServiceName, gce.region) From a068f95a47a3fac77dc076df07e75ca521e7bef5 Mon Sep 17 00:00:00 2001 From: justinsb Date: Fri, 25 Jul 2025 07:54:15 -0400 Subject: [PATCH 09/10] Implement multi forwardingRule in interanal case --- providers/gce/gce_loadbalancer_internal.go | 208 +++++++++++++++------ 1 file changed, 154 insertions(+), 54 deletions(-) diff --git a/providers/gce/gce_loadbalancer_internal.go b/providers/gce/gce_loadbalancer_internal.go index 1c5bb7f355..45d2d664cb 100644 --- a/providers/gce/gce_loadbalancer_internal.go +++ b/providers/gce/gce_loadbalancer_internal.go @@ -182,57 +182,133 @@ func (g *Cloud) ensureInternalLoadBalancer(clusterName, clusterID string, svc *v if err != nil { return nil, err } - newFwdRule := &compute.ForwardingRule{ - Name: loadBalancerName, - Description: fwdRuleDescriptionString, - IPAddress: ipToUse, - BackendService: backendServiceLink, - Ports: ports, - IPProtocol: string(protocol), - LoadBalancingScheme: string(scheme), - // Given that CreateGCECloud will attempt to determine the subnet based off the network, - // the subnetwork should rarely be unknown. - Subnetwork: subnetworkURL, - Network: g.networkURL, - } - if options.AllowGlobalAccess { - newFwdRule.AllowGlobalAccess = options.AllowGlobalAccess - } - if len(ports) > maxL4ILBPorts { - newFwdRule.Ports = nil - newFwdRule.AllPorts = true - } - fwdRuleDeleted := false - if existingFwdRule != nil && !forwardingRulesEqual(existingFwdRule, newFwdRule) { - // Delete existing forwarding rule before making changes to the backend service. For example - changing protocol - // of backend service without first deleting forwarding rule will throw an error since the linked forwarding - // rule would show the old protocol. - if klogV := klog.V(2); klogV.Enabled() { - frDiff := cmp.Diff(existingFwdRule, newFwdRule) - klogV.Infof("ensureInternalLoadBalancer(%v): forwarding rule changed - Existing - %+v\n, New - %+v\n, Diff(-existing, +new) - %s\n. Deleting existing forwarding rule.", loadBalancerName, existingFwdRule, newFwdRule, frDiff) + // Logic to handle multiple forwarding rules, one per protocol. + // Based on the logic for external load balancers. + + // Get all existing forwarding rules for this service. + // A service can have a forwarding rule with the base name, or with a protocol suffix. + existingFwdRules := make(map[string]*compute.ForwardingRule) + // The `existingFwdRule` is the one with the base name, passed into this function. + if existingFwdRule != nil { + existingFwdRules[existingFwdRule.Name] = existingFwdRule + } + // Check for forwarding rules with protocol suffixes. + if len(groupedPorts) > 1 { + for protocol := range groupedPorts { + frName := fmt.Sprintf("%s-%s", loadBalancerName, strings.ToLower(string(protocol))) + if _, ok := existingFwdRules[frName]; ok { + continue + } + fr, err := g.GetRegionForwardingRule(frName, g.region) + if err != nil && !isNotFound(err) { + return nil, err + } + if fr != nil { + existingFwdRules[fr.Name] = fr + } } - if err = ignoreNotFound(g.DeleteRegionForwardingRule(loadBalancerName, g.region)); err != nil { + } + + desiredFwdRuleNames := sets.NewString() + var desiredFwdRuleProtocols []v1.Protocol + for protocol := range groupedPorts { + desiredFwdRuleProtocols = append(desiredFwdRuleProtocols, protocol) + } + // Sort protocols to have a stable order for naming and processing. + sort.Slice(desiredFwdRuleProtocols, func(i, j int) bool { + return desiredFwdRuleProtocols[i] < desiredFwdRuleProtocols[j] + }) + + var createdFwdRules []*compute.ForwardingRule + var desiredBackendServices = make(map[string]bool) + + for _, protocol := range desiredFwdRuleProtocols { + portStruct := groupedPorts[protocol] + ports := portStruct.portRanges + + // Each protocol gets its own backend service. + // The backend service name must be unique per protocol. + // Pass a single-protocol map to makeBackendServiceName. + singleProtocolGroupedPorts := map[v1.Protocol]ProtocolPorts{protocol: portStruct} + backendServiceName := makeBackendServiceName(loadBalancerName, clusterID, sharedBackend, scheme, singleProtocolGroupedPorts, svc.Spec.SessionAffinity) + desiredBackendServices[backendServiceName] = true + backendServiceLink := g.getBackendServiceLink(backendServiceName) + + bsDescription := makeBackendServiceDescription(nm, sharedBackend) + err = g.ensureInternalBackendService(backendServiceName, bsDescription, svc.Spec.SessionAffinity, scheme, protocol, igLinks, hc.SelfLink) + if err != nil { return nil, err } - fwdRuleDeleted = true - } - bsDescription := makeBackendServiceDescription(nm, sharedBackend) - err = g.ensureInternalBackendService(backendServiceName, bsDescription, svc.Spec.SessionAffinity, scheme, protocol, igLinks, hc.SelfLink) - if err != nil { - return nil, err + // Each protocol gets its own forwarding rule. + // If there's only one protocol, the forwarding rule name is the load balancer name. + // Otherwise, it's load-balancer-name-. + frName := loadBalancerName + if len(groupedPorts) > 1 { + frName = fmt.Sprintf("%s-%s", loadBalancerName, strings.ToLower(string(protocol))) + } + desiredFwdRuleNames.Insert(frName) + + newFwdRule := &compute.ForwardingRule{ + Name: frName, + Description: fwdRuleDescriptionString, + IPAddress: ipToUse, + BackendService: backendServiceLink, + Ports: ports, + IPProtocol: string(protocol), + LoadBalancingScheme: string(scheme), + Subnetwork: subnetworkURL, + Network: g.networkURL, + } + if options.AllowGlobalAccess { + newFwdRule.AllowGlobalAccess = options.AllowGlobalAccess + } + if len(ports) > maxL4ILBPorts { + newFwdRule.Ports = nil + newFwdRule.AllPorts = true + } + + // Check if a forwarding rule for this protocol already exists. + var existingFwdRuleForProtocol *compute.ForwardingRule + if fr, ok := existingFwdRules[frName]; ok { + existingFwdRuleForProtocol = fr + } + + if err := g.ensureInternalForwardingRule(existingFwdRuleForProtocol, newFwdRule); err != nil { + return nil, err + } + createdFwdRules = append(createdFwdRules, newFwdRule) } - if fwdRuleDeleted || existingFwdRule == nil { - // existing rule has been deleted, pass in nil - if err := g.ensureInternalForwardingRule(nil, newFwdRule); err != nil { + // Delete any forwarding rules that are no longer needed. + for frName, fr := range existingFwdRules { + if desiredFwdRuleNames.Has(frName) { + continue + } + klog.V(2).Infof("ensureInternalLoadBalancer(%v): deleting stale forwarding rule %s", loadBalancerName, frName) + if err := ignoreNotFound(g.DeleteRegionForwardingRule(frName, g.region)); err != nil { return nil, err } + // Also delete the associated backend service if it's not used by other forwarding rules. + if fr.BackendService != "" { + bsName := getNameFromLink(fr.BackendService) + if !desiredBackendServices[bsName] { + klog.V(2).Infof("ensureInternalLoadBalancer(%v): deleting stale backend service %s", loadBalancerName, bsName) + if err := g.teardownInternalBackendService(bsName); err != nil { + klog.Warningf("ensureInternalLoadBalancer: could not delete old backend service %s: %v", bsName, err) + } + } + } + } + + if len(createdFwdRules) == 0 { + klog.V(2).Infof("ensureInternalLoadBalancer(%v): no forwarding rules needed, all deleted.", loadBalancerName) + return &v1.LoadBalancerStatus{}, nil } // Get the most recent forwarding rule for the address. - updatedFwdRule, err := g.GetRegionForwardingRule(loadBalancerName, g.region) + updatedFwdRule, err := g.GetRegionForwardingRule(createdFwdRules[0].Name, g.region) if err != nil { return nil, err } @@ -243,11 +319,6 @@ func (g *Cloud) ensureInternalLoadBalancer(clusterName, clusterID string, svc *v return nil, err } - // Delete the previous internal load balancer resources if necessary - if existingBackendService != nil { - g.clearPreviousInternalResources(svc, loadBalancerName, existingBackendService, backendServiceName, hcName) - } - serviceState.InSuccess = true if options.AllowGlobalAccess { serviceState.EnabledGlobalAccess = true @@ -365,9 +436,17 @@ func (g *Cloud) updateInternalLoadBalancer(clusterName, clusterID string, svc *v groupedPorts := getPortsAndProtocols(svc.Spec.Ports) scheme := cloud.SchemeInternal loadBalancerName := g.GetLoadBalancerName(context.TODO(), clusterName, svc) - backendServiceName := makeBackendServiceName(loadBalancerName, clusterID, shareBackendService(svc), scheme, groupedPorts, svc.Spec.SessionAffinity) - // Ensure the backend service has the proper backend/instance-group links - return g.ensureInternalBackendServiceGroups(backendServiceName, igLinks) + sharedBackend := shareBackendService(svc) + + for protocol, portStruct := range groupedPorts { + singleProtocolGroupedPorts := map[v1.Protocol]ProtocolPorts{protocol: portStruct} + backendServiceName := makeBackendServiceName(loadBalancerName, clusterID, sharedBackend, scheme, singleProtocolGroupedPorts, svc.Spec.SessionAffinity) + // Ensure the backend service has the proper backend/instance-group links + if err := g.ensureInternalBackendServiceGroups(backendServiceName, igLinks); err != nil { + return err + } + } + return nil } func (g *Cloud) ensureInternalLoadBalancerDeleted(clusterName, clusterID string, svc *v1.Service) error { @@ -397,15 +476,36 @@ func (g *Cloud) ensureInternalLoadBalancerDeleted(clusterName, clusterID string, klog.V(2).Infof("ensureInternalLoadBalancerDeleted(%v): attempting delete of region internal address", loadBalancerName) ensureAddressDeleted(g, loadBalancerName, g.region) - klog.V(2).Infof("ensureInternalLoadBalancerDeleted(%v): deleting region internal forwarding rule", loadBalancerName) - if err := ignoreNotFound(g.DeleteRegionForwardingRule(loadBalancerName, g.region)); err != nil { - return err + frNames := sets.NewString(loadBalancerName) + if len(groupedPorts) > 1 { + for protocol := range groupedPorts { + frNames.Insert(fmt.Sprintf("%s-%s", loadBalancerName, strings.ToLower(string(protocol)))) + } + } + // Sort for deterministic deletion order. + sortedFrNames := frNames.List() + sort.Strings(sortedFrNames) + for _, frName := range sortedFrNames { + klog.V(2).Infof("ensureInternalLoadBalancerDeleted(%v): deleting region internal forwarding rule %s", loadBalancerName, frName) + if err := ignoreNotFound(g.DeleteRegionForwardingRule(frName, g.region)); err != nil { + return err + } } - backendServiceName := makeBackendServiceName(loadBalancerName, clusterID, sharedBackend, scheme, groupedPorts, svc.Spec.SessionAffinity) - klog.V(2).Infof("ensureInternalLoadBalancerDeleted(%v): deleting region backend service %v", loadBalancerName, backendServiceName) - if err := g.teardownInternalBackendService(backendServiceName); err != nil { - return err + var protocols []v1.Protocol + for p := range groupedPorts { + protocols = append(protocols, p) + } + sort.Slice(protocols, func(i, j int) bool { return protocols[i] < protocols[j] }) + + for _, protocol := range protocols { + portStruct := groupedPorts[protocol] + singleProtocolGroupedPorts := map[v1.Protocol]ProtocolPorts{protocol: portStruct} + backendServiceName := makeBackendServiceName(loadBalancerName, clusterID, sharedBackend, scheme, singleProtocolGroupedPorts, svc.Spec.SessionAffinity) + klog.V(2).Infof("ensureInternalLoadBalancerDeleted(%v): deleting region backend service %v", loadBalancerName, backendServiceName) + if err := g.teardownInternalBackendService(backendServiceName); err != nil { + return err + } } deleteFunc := func(fwName string) error { From 796dc40f9e53008286c4d7707488b84107740645 Mon Sep 17 00:00:00 2001 From: justinsb Date: Fri, 25 Jul 2025 08:24:14 -0400 Subject: [PATCH 10/10] fixup! Implement multi forwardingRule in interanal case --- providers/gce/gce_loadbalancer_internal.go | 239 +++++++++++---------- 1 file changed, 121 insertions(+), 118 deletions(-) diff --git a/providers/gce/gce_loadbalancer_internal.go b/providers/gce/gce_loadbalancer_internal.go index 45d2d664cb..28c8e2d6d1 100644 --- a/providers/gce/gce_loadbalancer_internal.go +++ b/providers/gce/gce_loadbalancer_internal.go @@ -29,7 +29,6 @@ import ( "github.com/GoogleCloudPlatform/k8s-cloud-provider/pkg/cloud" "github.com/GoogleCloudPlatform/k8s-cloud-provider/pkg/cloud/meta" - "github.com/google/go-cmp/cmp" compute "google.golang.org/api/compute/v1" v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/types" @@ -188,59 +187,21 @@ func (g *Cloud) ensureInternalLoadBalancer(clusterName, clusterID string, svc *v // Get all existing forwarding rules for this service. // A service can have a forwarding rule with the base name, or with a protocol suffix. - existingFwdRules := make(map[string]*compute.ForwardingRule) - // The `existingFwdRule` is the one with the base name, passed into this function. - if existingFwdRule != nil { - existingFwdRules[existingFwdRule.Name] = existingFwdRule - } + // TODO: Remove existingFwdRules and use the one from the function. // Check for forwarding rules with protocol suffixes. - if len(groupedPorts) > 1 { - for protocol := range groupedPorts { - frName := fmt.Sprintf("%s-%s", loadBalancerName, strings.ToLower(string(protocol))) - if _, ok := existingFwdRules[frName]; ok { - continue - } - fr, err := g.GetRegionForwardingRule(frName, g.region) - if err != nil && !isNotFound(err) { - return nil, err - } - if fr != nil { - existingFwdRules[fr.Name] = fr - } - } - } + op := &ensureOperation{} + g.findActualBackendServiceInternal(backendServiceName, op) - desiredFwdRuleNames := sets.NewString() - var desiredFwdRuleProtocols []v1.Protocol - for protocol := range groupedPorts { - desiredFwdRuleProtocols = append(desiredFwdRuleProtocols, protocol) + existingForwardingRules, err := g.getExistingForwardingRules(loadBalancerName) + if err != nil { + return nil, err } - // Sort protocols to have a stable order for naming and processing. - sort.Slice(desiredFwdRuleProtocols, func(i, j int) bool { - return desiredFwdRuleProtocols[i] < desiredFwdRuleProtocols[j] - }) + op.forwardingRulesActual = existingForwardingRules var createdFwdRules []*compute.ForwardingRule - var desiredBackendServices = make(map[string]bool) - - for _, protocol := range desiredFwdRuleProtocols { - portStruct := groupedPorts[protocol] - ports := portStruct.portRanges - - // Each protocol gets its own backend service. - // The backend service name must be unique per protocol. - // Pass a single-protocol map to makeBackendServiceName. - singleProtocolGroupedPorts := map[v1.Protocol]ProtocolPorts{protocol: portStruct} - backendServiceName := makeBackendServiceName(loadBalancerName, clusterID, sharedBackend, scheme, singleProtocolGroupedPorts, svc.Spec.SessionAffinity) - desiredBackendServices[backendServiceName] = true - backendServiceLink := g.getBackendServiceLink(backendServiceName) - - bsDescription := makeBackendServiceDescription(nm, sharedBackend) - err = g.ensureInternalBackendService(backendServiceName, bsDescription, svc.Spec.SessionAffinity, scheme, protocol, igLinks, hc.SelfLink) - if err != nil { - return nil, err - } + var desiredForwardingRules []*compute.ForwardingRule + for protocol, protocolPorts := range groupedPorts { // Each protocol gets its own forwarding rule. // If there's only one protocol, the forwarding rule name is the load balancer name. // Otherwise, it's load-balancer-name-. @@ -248,61 +209,87 @@ func (g *Cloud) ensureInternalLoadBalancer(clusterName, clusterID string, svc *v if len(groupedPorts) > 1 { frName = fmt.Sprintf("%s-%s", loadBalancerName, strings.ToLower(string(protocol))) } - desiredFwdRuleNames.Insert(frName) - newFwdRule := &compute.ForwardingRule{ + forwardingRule := &compute.ForwardingRule{ Name: frName, Description: fwdRuleDescriptionString, IPAddress: ipToUse, BackendService: backendServiceLink, - Ports: ports, IPProtocol: string(protocol), LoadBalancingScheme: string(scheme), Subnetwork: subnetworkURL, Network: g.networkURL, } if options.AllowGlobalAccess { - newFwdRule.AllowGlobalAccess = options.AllowGlobalAccess + forwardingRule.AllowGlobalAccess = options.AllowGlobalAccess } - if len(ports) > maxL4ILBPorts { - newFwdRule.Ports = nil - newFwdRule.AllPorts = true + if len(protocolPorts.ports) > maxL4ILBPorts { + forwardingRule.Ports = nil + forwardingRule.AllPorts = true + } else { + for _, port := range protocolPorts.ports { + forwardingRule.Ports = append(forwardingRule.Ports, strconv.Itoa(port)) + } } - // Check if a forwarding rule for this protocol already exists. + // Check if a forwarding rule for this protocol already exists; reuse the same name if possible. + for _, existingFwdRule := range existingForwardingRules { + if existingFwdRule.IPProtocol == forwardingRule.IPProtocol { + forwardingRule.Name = existingFwdRule.Name + break + } + } + + desiredForwardingRules = append(desiredForwardingRules, forwardingRule) + } + op.forwardingRulesDesired = desiredForwardingRules + + bsDescription := makeBackendServiceDescription(nm, sharedBackend) + backendServiceProtocol := "UNSPECIFIED" + if len(desiredForwardingRules) == 1 { + backendServiceProtocol = desiredForwardingRules[0].IPProtocol + } + g.buildDesiredBackendServiceInternal(backendServiceName, bsDescription, svc.Spec.SessionAffinity, scheme, backendServiceProtocol, igLinks, hc.SelfLink, op) + + if err := g.ensureInternalBackendService(backendServiceName, op); err != nil { + return nil, err + } + + for _, desiredForwardingRule := range desiredForwardingRules { var existingFwdRuleForProtocol *compute.ForwardingRule - if fr, ok := existingFwdRules[frName]; ok { - existingFwdRuleForProtocol = fr + for _, existingFwdRule := range existingForwardingRules { + if existingFwdRule.IPProtocol == desiredForwardingRule.IPProtocol { + existingFwdRuleForProtocol = existingFwdRule + break + } } - if err := g.ensureInternalForwardingRule(existingFwdRuleForProtocol, newFwdRule); err != nil { + if err := g.ensureInternalForwardingRule(existingFwdRuleForProtocol, desiredForwardingRule); err != nil { return nil, err } - createdFwdRules = append(createdFwdRules, newFwdRule) + createdFwdRules = append(createdFwdRules, desiredForwardingRule) } // Delete any forwarding rules that are no longer needed. - for frName, fr := range existingFwdRules { - if desiredFwdRuleNames.Has(frName) { + for frName, fr := range existingForwardingRules { + var matching *compute.ForwardingRule + for _, desiredForwardingRule := range desiredForwardingRules { + if desiredForwardingRule.Name == fr.Name { + matching = desiredForwardingRule + continue + } + } + if matching != nil { continue } + klog.V(2).Infof("ensureInternalLoadBalancer(%v): deleting stale forwarding rule %s", loadBalancerName, frName) if err := ignoreNotFound(g.DeleteRegionForwardingRule(frName, g.region)); err != nil { return nil, err } - // Also delete the associated backend service if it's not used by other forwarding rules. - if fr.BackendService != "" { - bsName := getNameFromLink(fr.BackendService) - if !desiredBackendServices[bsName] { - klog.V(2).Infof("ensureInternalLoadBalancer(%v): deleting stale backend service %s", loadBalancerName, bsName) - if err := g.teardownInternalBackendService(bsName); err != nil { - klog.Warningf("ensureInternalLoadBalancer: could not delete old backend service %s: %v", bsName, err) - } - } - } } - if len(createdFwdRules) == 0 { + if len(desiredForwardingRules) == 0 { klog.V(2).Infof("ensureInternalLoadBalancer(%v): no forwarding rules needed, all deleted.", loadBalancerName) return &v1.LoadBalancerStatus{}, nil } @@ -438,14 +425,12 @@ func (g *Cloud) updateInternalLoadBalancer(clusterName, clusterID string, svc *v loadBalancerName := g.GetLoadBalancerName(context.TODO(), clusterName, svc) sharedBackend := shareBackendService(svc) - for protocol, portStruct := range groupedPorts { - singleProtocolGroupedPorts := map[v1.Protocol]ProtocolPorts{protocol: portStruct} - backendServiceName := makeBackendServiceName(loadBalancerName, clusterID, sharedBackend, scheme, singleProtocolGroupedPorts, svc.Spec.SessionAffinity) - // Ensure the backend service has the proper backend/instance-group links - if err := g.ensureInternalBackendServiceGroups(backendServiceName, igLinks); err != nil { - return err - } + backendServiceName := makeBackendServiceName(loadBalancerName, clusterID, sharedBackend, scheme, groupedPorts, svc.Spec.SessionAffinity) + // Ensure the backend service has the proper backend/instance-group links + if err := g.ensureInternalBackendServiceGroups(backendServiceName, igLinks); err != nil { + return err } + return nil } @@ -476,36 +461,25 @@ func (g *Cloud) ensureInternalLoadBalancerDeleted(clusterName, clusterID string, klog.V(2).Infof("ensureInternalLoadBalancerDeleted(%v): attempting delete of region internal address", loadBalancerName) ensureAddressDeleted(g, loadBalancerName, g.region) - frNames := sets.NewString(loadBalancerName) - if len(groupedPorts) > 1 { - for protocol := range groupedPorts { - frNames.Insert(fmt.Sprintf("%s-%s", loadBalancerName, strings.ToLower(string(protocol)))) - } + // Get existing forwarding rules. + existingForwardingRules, err := g.getExistingForwardingRules(loadBalancerName) + if err != nil { + return err } - // Sort for deterministic deletion order. - sortedFrNames := frNames.List() - sort.Strings(sortedFrNames) - for _, frName := range sortedFrNames { - klog.V(2).Infof("ensureInternalLoadBalancerDeleted(%v): deleting region internal forwarding rule %s", loadBalancerName, frName) - if err := ignoreNotFound(g.DeleteRegionForwardingRule(frName, g.region)); err != nil { + + // Delete existing forwarding rules. + for _, existingForwardingRule := range existingForwardingRules { + klog.V(2).Infof("ensureInternalLoadBalancerDeleted(%v): deleting region internal forwarding rule %s", loadBalancerName, existingForwardingRule.Name) + if err := ignoreNotFound(g.DeleteRegionForwardingRule(existingForwardingRule.Name, g.region)); err != nil { return err } } - var protocols []v1.Protocol - for p := range groupedPorts { - protocols = append(protocols, p) - } - sort.Slice(protocols, func(i, j int) bool { return protocols[i] < protocols[j] }) - - for _, protocol := range protocols { - portStruct := groupedPorts[protocol] - singleProtocolGroupedPorts := map[v1.Protocol]ProtocolPorts{protocol: portStruct} - backendServiceName := makeBackendServiceName(loadBalancerName, clusterID, sharedBackend, scheme, singleProtocolGroupedPorts, svc.Spec.SessionAffinity) - klog.V(2).Infof("ensureInternalLoadBalancerDeleted(%v): deleting region backend service %v", loadBalancerName, backendServiceName) - if err := g.teardownInternalBackendService(backendServiceName); err != nil { - return err - } + // Delete existing backend service. + backendServiceName := makeBackendServiceName(loadBalancerName, clusterID, sharedBackend, scheme, groupedPorts, svc.Spec.SessionAffinity) + klog.V(2).Infof("ensureInternalLoadBalancerDeleted(%v): deleting region backend service %v", loadBalancerName, backendServiceName) + if err := g.teardownInternalBackendService(backendServiceName); err != nil { + return err } deleteFunc := func(fwName string) error { @@ -881,17 +855,18 @@ func (g *Cloud) ensureInternalInstanceGroupsDeleted(name string) error { return nil } -func (g *Cloud) ensureInternalBackendService(name, description string, affinityType v1.ServiceAffinity, scheme cloud.LbScheme, protocol v1.Protocol, igLinks []string, hcLink string) error { - klog.V(2).Infof("ensureInternalBackendService(%v, %v, %v): checking existing backend service with %d groups", name, scheme, protocol, len(igLinks)) - bs, err := g.GetRegionBackendService(name, g.region) - if err != nil && !isNotFound(err) { - return err - } +type ensureOperation struct { + backendServiceDesired *compute.BackendService + backendServiceActual *compute.BackendService + forwardingRulesDesired []*compute.ForwardingRule + forwardingRulesActual map[string]*compute.ForwardingRule +} +func (g *Cloud) buildDesiredBackendServiceInternal(name, description string, affinityType v1.ServiceAffinity, scheme cloud.LbScheme, protocol string, igLinks []string, hcLink string, op *ensureOperation) error { backends := backendsFromGroupLinks(igLinks) - expectedBS := &compute.BackendService{ + bs := &compute.BackendService{ Name: name, - Protocol: string(protocol), + Protocol: protocol, Description: description, HealthChecks: []string{hcLink}, Backends: backends, @@ -899,10 +874,27 @@ func (g *Cloud) ensureInternalBackendService(name, description string, affinityT LoadBalancingScheme: string(scheme), } + op.backendServiceDesired = bs + return nil +} + +func (g *Cloud) findActualBackendServiceInternal(name string, op *ensureOperation) error { + klog.V(2).Infof("findActualBackendServiceInternal(%v): checking existing backend services", name) + + bs, err := g.GetRegionBackendService(name, g.region) + if err != nil && !isNotFound(err) { + return err + } + + op.backendServiceActual = bs + return nil +} + +func (g *Cloud) ensureInternalBackendService(name string, op *ensureOperation) error { // Create backend service if none was found - if bs == nil { + if op.backendServiceActual == nil { klog.V(2).Infof("ensureInternalBackendService: creating backend service %v", name) - err := g.CreateRegionBackendService(expectedBS, g.region) + err := g.CreateRegionBackendService(op.backendServiceDesired, g.region) if err != nil { return err } @@ -910,14 +902,25 @@ func (g *Cloud) ensureInternalBackendService(name, description string, affinityT return nil } - if backendSvcEqual(expectedBS, bs) { + if backendSvcEqual(op.backendServiceActual, op.backendServiceDesired) { return nil } + // Delete existing forwarding rule before making changes to the backend service. For example - changing protocol + // of backend service without first deleting forwarding rule will throw an error since the linked forwarding + // rule would show the old protocol. + for _, fr := range op.forwardingRulesActual { + klog.V(2).Infof("ensureInternalBackendService: deleting forwarding rule %v", fr.Name) + if err := g.DeleteRegionForwardingRule(fr.Name, g.region); ignoreNotFound(err) != nil { + return err + } + } + op.forwardingRulesActual = nil + klog.V(2).Infof("ensureInternalBackendService: updating backend service %v", name) // Set fingerprint for optimistic locking - expectedBS.Fingerprint = bs.Fingerprint - if err := g.UpdateRegionBackendService(expectedBS, g.region); err != nil { + op.backendServiceDesired.Fingerprint = op.backendServiceActual.Fingerprint + if err := g.UpdateRegionBackendService(op.backendServiceDesired, g.region); err != nil { return err } klog.V(2).Infof("ensureInternalBackendService: updated backend service %v successfully", name)