diff --git a/docs/design-docs/multi-protocol-cidr.md b/docs/design-docs/multi-protocol-cidr.md new file mode 100644 index 0000000000..ebbaf9ae38 --- /dev/null +++ b/docs/design-docs/multi-protocol-cidr.md @@ -0,0 +1,85 @@ +## Native MixedProtocolLBService Support in the GKE Service Controller + +### **1\. Abstract** + +This document proposes a design to implement support for Kubernetes Service objects of `type: LoadBalancer` +with mixed protocols (e.g., TCP and UDP) in the cloud-provider-gcp controller. +The core of this proposal is to evolve the controller's logic from managing a +single GCP Forwarding Rule per Kubernetes Service to managing a set of forwarding rules, +one for each protocol specified in the Service manifest. +This change will align GCP's behavior with the generally available `MixedProtocolLBService` +feature in upstream Kubernetes, eliminating the need for the current "dual-service, single-IP" workaround. + +### **2\. Motivation** + +The `MixedProtocolLBService` feature gate has been stable in Kubernetes since v1.26, allowing users to define a single LoadBalancer Service with ports for both TCP and UDP. +However, the GCP Service controller does not currently support this. +The controller's logic is based on a one-to-one mapping between a Service object +and a single GCP Forwarding Rule. +Since a GCP Forwarding Rule is inherently bound to a single protocol (TCP or UDP), the controller cannot provision the necessary infrastructure for a mixed-protocol service. + +This forces users to adopt a less intuitive workaround: +deploying two separate Service objects (one for TCP, one for UDP) +and manually assigning them to the same reserved static IP address. +While functional, this approach is cumbersome, increases configuration complexity, +and is not aligned with the declarative intent of the Kubernetes API. + +Implementing `MixedProtocolLBService` support would provide a superior user experience, +reduce configuration errors, +and make GCP's networking capabilities consistent with the core Kubernetes feature set here. + +### **3\. Proposed Design** + +The proposed design modifies the reconciliation loop within the GCP Service controller +to manage a collection of forwarding rules for each LoadBalancer Service, rather than just one. + +#### **3.1. Core Controller Logic Modification** + +The primary changes will be within the `EnsureLoadBalancer` method in the GCP cloud provider's Service controller. The existing logic creates a single load balancer configuration. The new logic will perform the following steps during its reconciliation loop: + +1. **Protocol Grouping:** Upon receiving a Service object, the controller will first inspect the `spec.ports` array and group the ports by their declared protocol (e.g., create a map of `corev1.Protocol` -> `corev1.ServicePort`). +2. **IP Address Management:** + * If a static IP is specified in `spec.loadBalancerIP`, it will be used for all forwarding rules. + * If no IP is specified, the controller will reserve a new static IP address upon the first reconciliation. This address will be used for all forwarding rules created for this Service. The controller must ensure this IP is retained across updates and released only upon Service deletion. +3. **Per-Protocol Reconciliation Loop:** The controller will iterate through the grouped protocols. For each protocol (e.g., TCP, UDP): + * **Generate Desired State:** It will construct a set of desired GCP Forwarding Rule object. + * **Naming Convention:** To avoid collisions and maintain a clear association, forwarding rules will be named using a convention that includes the service UID and the protocol. A proposed convention is `k8s-fw-\[service-uid\]-\[protocol\]`, for example, `k8s-fw-a8b4f12-tcp`. + * **Configuration:** Each forwarding rule will be configured with the shared static IP, the specific protocol (TCP or UDP), and the list of ports for that protocol. + * **Reconcile with Actual State:** The controller will check if a forwarding rule with the generated name already exists in GCP. + * **Create:** If the rule does not exist, it will be created. + * **Update:** If the rule exists, its configuration will be compared against the desired state and updated if necessary. + * **No-Op:** If the existing rule matches the desired state, no action is taken. +4. **Resource Garbage Collection:** After the reconciliation loop for all protocols present in the Service spec, the controller must clean up any orphaned resources. It will list all GCP Forwarding Rules that match the Service's UID pattern (`k8s-fw-\[service-uid\]-\*`). If any of these existing rules correspond to a protocol that has been removed from the Service spec, the controller will delete that now-orphaned forwarding rule. +5. **Backend Resource Management:** The underlying GCP Backend Service or Target Pool can typically be shared across the different forwarding rules, as it defines the set of backend nodes, not the frontend protocol. The controller logic for managing the backend service will largely remain the same, ensuring it points to the correct set of cluster nodes. + +#### **3.2. Deletion Logic Modification** + +The `EnsureLoadBalancerDeleted` method must also be updated. When a Service is deleted, the controller will use the naming convention (`k8s-fw-\[service-uid\]-\*`) to find and delete *all* associated forwarding rules, in addition to the backend service and the reserved static IP address (if it was provisioned by the controller). + +### **4\. Code Implementation Pointers** + +The necessary changes would be localized within the GCP-specific implementation of the cloudprovider.LoadBalancer interface.7 + +* **Primary Files for Modification:** The core logic for L4 load balancer reconciliation is located in the gce package. The files managing Service objects of `type: LoadBalancer` would be the main focus. +* **Key Functions:** + * gce.EnsureLoadBalancer: This function would need to be refactored to contain the protocol-grouping and per-protocol reconciliation loop described above. + * gce.EnsureLoadBalancerDeleted: This function would need to be updated to iterate through all potential forwarding rules based on the new naming scheme and delete them. + * **Resource Naming Functions:** Helper functions that generate names for GCP resources would need to be adapted to produce the protocol-specific forwarding rule names. + +### 5. Production Readiness + +For ease of introduction, we will implement a feature flag for the support. If the feature flag is not set, +the existing behaviour will be used - specifying a service with multiple protocols will be an error. + +So that traffic is not interrupted, if a ForwardingRule exists with the "old" naming convention (`k8s-fw-\[service-uid\]`), +that name will be used for the matching desired ForwardingRule. + +### **5\. Alternatives Considered** + +* **Continue with Dual-Service Workaround:** This is the current state. It is not a true implementation of the Kubernetes feature and places an unnecessary operational burden on users. +* **Rely on Gateway API:** The Gateway API is the long-term strategic direction for advanced Kubernetes networking.10 However, it is a separate, more complex API. This proposal aims to fix a specific deficiency in the existing and widely used + Service API, which should function as specified by the core Kubernetes project. Implementing this feature does not preclude or conflict with the ongoing development of the Gateway API. + +By adopting this design, the GCP Service controller can provide a seamless and intuitive experience for users needing to expose mixed-protocol applications, directly translating the Kubernetes API's intent into the necessary underlying cloud infrastructure. + + diff --git a/providers/gce/gce_alpha.go b/providers/gce/gce_alpha.go index 64a4f1b236..ef365fb9a3 100644 --- a/providers/gce/gce_alpha.go +++ b/providers/gce/gce_alpha.go @@ -27,6 +27,9 @@ const ( // AlphaFeatureSkipIGsManagement enabled L4 Regional Backend Services and // disables instance group management in service controller AlphaFeatureSkipIGsManagement = "SkipIGsManagement" + + // AlphaFeatureMultiProtocolLB allows services to use multiple protocols in the same LoadBalancer. + AlphaFeatureMultiProtocolLB = "MultiProtocolLB" ) // AlphaFeatureGate contains a mapping of alpha features to whether they are enabled diff --git a/providers/gce/gce_loadbalancer.go b/providers/gce/gce_loadbalancer.go index 7123781236..447463ef45 100644 --- a/providers/gce/gce_loadbalancer.go +++ b/providers/gce/gce_loadbalancer.go @@ -145,10 +145,12 @@ func (g *Cloud) EnsureLoadBalancer(ctx context.Context, clusterName string, svc return nil, err } - // Services with multiples protocols are not supported by this controller, warn the users and sets - // the corresponding Service Status Condition. + // Services with multiples protocols are not supported by this controller (without AlphaFeatureMultiProtocolLB), + // warn the users and set the corresponding Service Status Condition. // https://github.com/kubernetes/enhancements/tree/master/keps/sig-network/1435-mixed-protocol-lb - if err := checkMixedProtocol(svc.Spec.Ports); err != nil { + if g.AlphaFeatureGate.Enabled(AlphaFeatureMultiProtocolLB) { + klog.Infof("AlphaFeatureMultiProtocolLB feature gate is enabled") + } else if err := checkMixedProtocol(svc.Spec.Ports); err != nil { if hasLoadBalancerPortsError(svc) { return nil, err } @@ -228,10 +230,12 @@ func (g *Cloud) UpdateLoadBalancer(ctx context.Context, clusterName string, svc return err } - // Services with multiples protocols are not supported by this controller, warn the users and sets + // Services with multiples protocols are not supported by this controller (without AlphaFeatureMultiProtocolLB), warn the users and sets // the corresponding Service Status Condition, but keep processing the Update to not break upgrades. // https://github.com/kubernetes/enhancements/tree/master/keps/sig-network/1435-mixed-protocol-lb - if err := checkMixedProtocol(svc.Spec.Ports); err != nil && !hasLoadBalancerPortsError(svc) { + if g.AlphaFeatureGate.Enabled(AlphaFeatureMultiProtocolLB) { + klog.Infof("AlphaFeatureMultiProtocolLB feature gate is enabled") + } else if err := checkMixedProtocol(svc.Spec.Ports); err != nil && !hasLoadBalancerPortsError(svc) { klog.Warningf("Ignoring update for service %s/%s using different ports protocols", svc.Namespace, svc.Name) g.eventRecorder.Event(svc, v1.EventTypeWarning, v1.LoadBalancerPortsErrorReason, "LoadBalancer with multiple protocols are not supported.") svcApplyStatus := corev1apply.ServiceStatus().WithConditions( diff --git a/providers/gce/gce_loadbalancer_external.go b/providers/gce/gce_loadbalancer_external.go index 9c8daab905..9555081271 100644 --- a/providers/gce/gce_loadbalancer_external.go +++ b/providers/gce/gce_loadbalancer_external.go @@ -52,9 +52,9 @@ const ( // IP address, a firewall rule, a target pool, and a forwarding rule. This // function has to manage all of them. // -// Due to an interesting series of design decisions, this handles both creating -// new load balancers and updating existing load balancers, recognizing when -// each is needed. +// This function handles both creating new load balancers and updating existing load balancers, +// recognizing when each is needed. +// This approach is resilient, for example if we are interrupted part-way during creation. func (g *Cloud) ensureExternalLoadBalancer(clusterName string, clusterID string, apiService *v1.Service, existingFwdRule *compute.ForwardingRule, nodes []*v1.Node) (*v1.LoadBalancerStatus, error) { // Process services with LoadBalancerClass "networking.gke.io/l4-regional-external-legacy" used for this controller. // LoadBalancerClass can't be updated so we know this controller should process the NetLB. @@ -112,13 +112,21 @@ func (g *Cloud) ensureExternalLoadBalancer(clusterName string, clusterID string, g.deleteWrongNetworkTieredResources(loadBalancerName, lbRefStr, netTier) } - // Check if the forwarding rule exists, and if so, what its IP is. - fwdRuleExists, fwdRuleNeedsUpdate, fwdRuleIP, err := g.forwardingRuleNeedsUpdate(loadBalancerName, g.region, requestedIP, ports) + existingFRs, err := g.getExistingForwardingRules(loadBalancerName) if err != nil { - return nil, err + return nil, fmt.Errorf("error getting existing forwarding rules for %s: %v", loadBalancerName, err) } - if !fwdRuleExists { - klog.V(2).Infof("ensureExternalLoadBalancer(%s): Forwarding rule %v doesn't exist.", lbRefStr, loadBalancerName) + + // Check if a forwarding rule exists, and if so, what the IP is. + var fwdRuleIP string + for _, fr := range existingFRs { + if fr.IPAddress != "" { + if fwdRuleIP == "" { + fwdRuleIP = fr.IPAddress + } else if fwdRuleIP != fr.IPAddress { + return nil, fmt.Errorf("found multiple forwarding rules with different IP addresses (%s and %s) for load balancer %s", fwdRuleIP, fr.IPAddress, loadBalancerName) + } + } } // Make sure we know which IP address will be used and have properly reserved @@ -190,6 +198,12 @@ func (g *Cloud) ensureExternalLoadBalancer(clusterName string, clusterID string, ipAddressToUse = ipAddr } + // Now we have the IP address we can build the desired forwarding rules. + desiredFRs, err := g.buildDesiredForwardingRules(loadBalancerName, serviceName.String(), ipAddressToUse, g.targetPoolURL(loadBalancerName), apiService, netTier, existingFRs) + if err != nil { + return nil, fmt.Errorf("error building desired forwarding rules for %s: %v", loadBalancerName, err) + } + // Deal with the firewall next. The reason we do this here rather than last // is because the forwarding rule is used as the indicator that the load // balancer is fully created - it's what getLoadBalancer checks for. @@ -261,47 +275,189 @@ func (g *Cloud) ensureExternalLoadBalancer(clusterName string, clusterID string, } hcToCreate = makeHTTPHealthCheck(MakeNodesHealthCheckName(clusterID), GetNodesHealthCheckPath(), GetNodesHealthCheckPort()) } + // Now we get to some slightly more interesting logic. // First, neither target pools nor forwarding rules can be updated in place - // they have to be deleted and recreated. // Second, forwarding rules are layered on top of target pools in that you // can't delete a target pool that's currently in use by a forwarding rule. - // Thus, we have to tear down the forwarding rule if either it or the target + // Thus, we have to tear down a forwarding rule if either it or the target // pool needs to be updated. - if fwdRuleExists && (fwdRuleNeedsUpdate || tpNeedsRecreation) { + if len(existingFRs) > 0 && tpNeedsRecreation { // Begin critical section. If we have to delete the forwarding rule, // and something should fail before we recreate it, don't release the // IP. That way we can come back to it later. isSafeToReleaseIP = false - if err := g.DeleteRegionForwardingRule(loadBalancerName, g.region); err != nil && !isNotFound(err) { - return nil, fmt.Errorf("failed to delete existing forwarding rule for load balancer (%s) update: %v", lbRefStr, err) + + for _, fr := range existingFRs { + if err := g.DeleteRegionForwardingRule(fr.Name, g.region); err != nil && !isNotFound(err) { + return nil, fmt.Errorf("failed to delete existing forwarding rule for load balancer (%s) update: %v", lbRefStr, err) + } } - klog.Infof("ensureExternalLoadBalancer(%s): Deleted forwarding rule.", lbRefStr) + klog.Infof("ensureExternalLoadBalancer(%s): Deleted forwarding rule(s).", lbRefStr) + + // Clear the existing forwarding rules so we don't try to delete them again. + existingFRs = nil } if err := g.ensureTargetPoolAndHealthCheck(tpExists, tpNeedsRecreation, apiService, loadBalancerName, clusterID, ipAddressToUse, hosts, hcToCreate, hcToDelete); err != nil { return nil, err } - if tpNeedsRecreation || fwdRuleNeedsUpdate { - klog.Infof("ensureExternalLoadBalancer(%s): Creating forwarding rule, IP %s (tier: %s).", lbRefStr, ipAddressToUse, netTier) - if err := createForwardingRule(g, loadBalancerName, serviceName.String(), g.region, ipAddressToUse, g.targetPoolURL(loadBalancerName), ports, netTier, g.enableDiscretePortForwarding); err != nil { + // Delete unwanted forwarding rules. + for _, fr := range existingFRs { + if _, ok := desiredFRs[fr.Name]; !ok { + klog.Infof("ensureExternalLoadBalancer(%s): Deleting orphaned forwarding rule %s.", lbRefStr, fr.Name) + if err := g.DeleteRegionForwardingRule(fr.Name, g.region); err != nil && !isNotFound(err) { + return nil, err + } + } + } + + // Create or update forwarding rules. + for _, desiredFR := range desiredFRs { + existingFR, exists := existingFRs[desiredFR.Name] + if exists { + if g.forwardingRulesEqual(existingFR, desiredFR) { + continue + } + } + + // We can't update a forwarding rule in place, so we have to delete it and recreate it. + if exists { + klog.Infof("ensureExternalLoadBalancer(%s): Deleting forwarding rule %s to update.", lbRefStr, desiredFR.Name) + if err := g.DeleteRegionForwardingRule(desiredFR.Name, g.region); err != nil && !isNotFound(err) { + return nil, err + } + } + klog.Infof("ensureExternalLoadBalancer(%s): Creating forwarding rule %s for protocol %s, IP %s (tier: %s).", lbRefStr, desiredFR.Name, desiredFR.IPProtocol, ipAddressToUse, netTier) + // The desiredFR is a complete spec, so we can just pass it to CreateRegionForwardingRule. + // TODO: Why do we ignore the conflict error? + if err := g.CreateRegionForwardingRule(desiredFR, g.region); err != nil && !isHTTPErrorCode(err, http.StatusConflict) { return nil, fmt.Errorf("failed to create forwarding rule for load balancer (%s): %v", lbRefStr, err) } - // End critical section. It is safe to release the static IP (which - // just demotes it to ephemeral) now that it is attached. In the case - // of a user-requested IP, the "is user-owned" flag will be set, - // preventing it from actually being released. - isSafeToReleaseIP = true - klog.Infof("ensureExternalLoadBalancer(%s): Created forwarding rule, IP %s.", lbRefStr, ipAddressToUse) + klog.Infof("ensureExternalLoadBalancer(%s): Created forwarding rule %s.", lbRefStr, desiredFR.Name) } + // End critical section. It is safe to release the static IP (which + // just demotes it to ephemeral) now that it is attached. In the case + // of a user-requested IP, the "is user-owned" flag will be set, + // preventing it from actually being released. + isSafeToReleaseIP = true + status := &v1.LoadBalancerStatus{} status.Ingress = []v1.LoadBalancerIngress{{IP: ipAddressToUse}} return status, nil } +// getExistingForwardingRules returns a map of forwarding rules for the given load balancer. +func (g *Cloud) getExistingForwardingRules(loadBalancerName string) (map[string]*compute.ForwardingRule, error) { + frs, err := g.ListRegionForwardingRules(g.region) + if err != nil { + return nil, err + } + + existingFRs := make(map[string]*compute.ForwardingRule) + for _, fr := range frs { + isMatch := false + if fr.Name == loadBalancerName { + isMatch = true + } else if fr.Name == g.getProtocolForwardingRuleName(loadBalancerName, v1.Protocol(fr.IPProtocol)) { + isMatch = true + } + + if isMatch { + existingFRs[fr.Name] = fr + } + } + return existingFRs, nil +} + +// buildDesiredForwardingRules builds the desired forwarding rules for the given load balancer. +func (g *Cloud) buildDesiredForwardingRules(loadBalancerName, serviceName, ipAddress, targetPoolURL string, apiService *v1.Service, netTier cloud.NetworkTier, existingFRs map[string]*compute.ForwardingRule) (map[string]*compute.ForwardingRule, error) { + desiredFRs := make(map[string]*compute.ForwardingRule) + groupedPorts := getPortsAndProtocols(apiService.Spec.Ports) + desc := makeServiceDescription(serviceName) + + // Find the legacy forwarding rule to minimize changes. + var oldFwdRule *compute.ForwardingRule + if fwd, ok := existingFRs[loadBalancerName]; ok { + oldFwdRule = fwd + } + + for protocol, protocolPorts := range groupedPorts { + var frName string + if g.AlphaFeatureGate.Enabled(AlphaFeatureMultiProtocolLB) { + frName = g.getProtocolForwardingRuleName(loadBalancerName, protocol) + // If the old forwarding rule matches this protocol, use its name for backward compatibility. + if oldFwdRule != nil && oldFwdRule.IPProtocol == string(protocol) { + frName = loadBalancerName + } + } else { + frName = loadBalancerName + } + + portRange, err := loadBalancerPortRange(protocolPorts.ports) + if err != nil { + return nil, err + } + + rule := &compute.ForwardingRule{ + Name: frName, + Description: desc, + IPAddress: ipAddress, + IPProtocol: string(protocol), + PortRange: portRange, + Target: targetPoolURL, + NetworkTier: netTier.ToGCEValue(), + } + + if len(protocolPorts.ports) <= maxForwardedPorts && g.enableDiscretePortForwarding { + for _, p := range protocolPorts.ports { + rule.Ports = append(rule.Ports, strconv.Itoa(p)) + } + rule.PortRange = "" + } + + desiredFRs[frName] = rule + } + return desiredFRs, nil +} + +// forwardingRulesEqual checks if two forwarding rules are equal. +func (g *Cloud) forwardingRulesEqual(existing, desired *compute.ForwardingRule) bool { + if existing.Name != desired.Name { + klog.V(3).Infof("Forwarding rule %s name changed from %s to %s", existing.Name, existing.Name, desired.Name) + return false + } + if existing.Description != desired.Description { + klog.V(3).Infof("Forwarding rule %s description changed from %s to %s", existing.Name, existing.Description, desired.Description) + return false + } + if existing.IPAddress != desired.IPAddress { + klog.V(3).Infof("Forwarding rule %s IP changed from %s to %s", existing.Name, existing.IPAddress, desired.IPAddress) + return false + } + if existing.IPProtocol != desired.IPProtocol { + klog.V(3).Infof("Forwarding rule %s protocol changed from %s to %s", existing.Name, existing.IPProtocol, desired.IPProtocol) + return false + } + + frEqualPorts := equalPorts(existing.Ports, desired.Ports, existing.PortRange, desired.PortRange, g.enableDiscretePortForwarding) + if !frEqualPorts { + klog.V(3).Infof("Forwarding rule %s ports changed from (range: %v, ports: %v) to (range: %v, ports: %v)", existing.Name, existing.PortRange, existing.Ports, desired.PortRange, desired.Ports) + return false + } + + if existing.NetworkTier != desired.NetworkTier { + klog.V(3).Infof("Forwarding rule %s network tier changed from %s to %s", existing.Name, existing.NetworkTier, desired.NetworkTier) + return false + } + + return true +} + // updateExternalLoadBalancer is the external implementation of LoadBalancer.UpdateLoadBalancer. func (g *Cloud) updateExternalLoadBalancer(clusterName string, service *v1.Service, nodes []*v1.Node) error { // Process services with LoadBalancerClass "networking.gke.io/l4-regional-external-legacy" used for this controller. @@ -378,12 +534,30 @@ func (g *Cloud) ensureExternalLoadBalancerDeleted(clusterName, clusterID string, return ignoreNotFound(g.DeleteRegionAddress(loadBalancerName, g.region)) }, func() error { - klog.Infof("ensureExternalLoadBalancerDeleted(%s): Deleting forwarding rule.", lbRefStr) - // The forwarding rule must be deleted before either the target pool can, - // unfortunately, so we have to do these two serially. + klog.Infof("ensureExternalLoadBalancerDeleted(%s): Deleting forwarding rules.", lbRefStr) + // The forwarding rule must be deleted before the target pool can be deleted, + // unfortunately, so we have to delete forwarding rules then target pools serially. if err := ignoreNotFound(g.DeleteRegionForwardingRule(loadBalancerName, g.region)); err != nil { return err } + + // TODO: Always or just with alpha feature flag? + existingFRs, err := g.getExistingForwardingRules(loadBalancerName) + if err != nil { + return err + } + + var deleteErrs []error + for _, fr := range existingFRs { + klog.Infof("ensureExternalLoadBalancerDeleted(%s): Deleting forwarding rule %s.", lbRefStr, fr.Name) + if err := ignoreNotFound(g.DeleteRegionForwardingRule(fr.Name, g.region)); err != nil { + deleteErrs = append(deleteErrs, err) + } + } + if len(deleteErrs) > 0 { + return utilerrors.NewAggregate(deleteErrs) + } + klog.Infof("ensureExternalLoadBalancerDeleted(%s): Deleting target pool.", lbRefStr) if err := g.DeleteExternalTargetPoolAndChecks(service, loadBalancerName, g.region, clusterID, hcNames...); err != nil { return err @@ -685,6 +859,10 @@ func (g *Cloud) targetPoolURL(name string) string { return g.projectsBasePath + strings.Join([]string{g.projectID, "regions", g.region, "targetPools", name}, "/") } +func (g *Cloud) getProtocolForwardingRuleName(loadBalancerName string, protocol v1.Protocol) string { + return loadBalancerName + "-" + strings.ToLower(string(protocol)) +} + func makeHTTPHealthCheck(name, path string, port int32) *compute.HttpHealthCheck { return &compute.HttpHealthCheck{ Name: name, @@ -771,59 +949,6 @@ func (g *Cloud) ensureHTTPHealthCheck(name, path string, port int32) (hc *comput return hc, nil } -// Passing nil for requested IP is perfectly fine - it just means that no specific -// IP is being requested. -// Returns whether the forwarding rule exists, whether it needs to be updated, -// what its IP address is (if it exists), and any error we encountered. -func (g *Cloud) forwardingRuleNeedsUpdate(name, region string, loadBalancerIP string, ports []v1.ServicePort) (exists bool, needsUpdate bool, ipAddress string, err error) { - fwd, err := g.GetRegionForwardingRule(name, region) - if err != nil { - if isHTTPErrorCode(err, http.StatusNotFound) { - return false, true, "", nil - } - // Err on the side of caution in case of errors. Caller should notice the error and retry. - // We never want to end up recreating resources because g api flaked. - return true, false, "", fmt.Errorf("error getting load balancer's forwarding rule: %v", err) - } - // If the user asks for a specific static ip through the Service spec, - // check that we're actually using it. - // TODO: we report loadbalancer IP through status, so we want to verify if - // that matches the forwarding rule as well. - if loadBalancerIP != "" && loadBalancerIP != fwd.IPAddress { - klog.Infof("LoadBalancer ip for forwarding rule %v was expected to be %v, but was actually %v", fwd.Name, fwd.IPAddress, loadBalancerIP) - return true, true, fwd.IPAddress, nil - } - - protocol, err := getProtocol(ports) - if err != nil { - return true, false, "", err - } - - newPortRange, err := loadBalancerPortRange(ports) - if err != nil { - // Err on the side of caution in case of errors. Caller should notice the error and retry. - // We never want to end up recreating resources because g api flaked. - return true, false, "", err - } - newPorts := []string{} - if frPorts := getPorts(ports); len(frPorts) <= maxForwardedPorts && g.enableDiscretePortForwarding { - newPorts = frPorts - newPortRange = "" - } - frEqualPorts := equalPorts(fwd.Ports, newPorts, fwd.PortRange, newPortRange, g.enableDiscretePortForwarding) - if !frEqualPorts { - klog.Infof("Forwarding rule port range / ports are not equal, old (port range: %v, ports: %v), new (port range: %v, ports: %v)", fwd.PortRange, fwd.Ports, newPortRange, newPorts) - return true, true, fwd.IPAddress, nil - } - - if string(protocol) != fwd.IPProtocol { - klog.Infof("LoadBalancer protocol for forwarding rule %v was expected to be %v, but was actually %v", fwd.Name, fwd.IPProtocol, string(protocol)) - return true, true, fwd.IPAddress, nil - } - - return true, false, fwd.IPAddress, nil -} - // Doesn't check whether the hosts have changed, since host updating is handled // separately. func (g *Cloud) targetPoolNeedsRecreation(name, region string, affinityType v1.ServiceAffinity) (exists bool, needsRecreation bool, err error) { @@ -878,42 +1003,15 @@ func hostURLToComparablePath(hostURL string) string { return hostURL[idx:] } -func getProtocol(svcPorts []v1.ServicePort) (v1.Protocol, error) { - if len(svcPorts) == 0 { - return v1.ProtocolTCP, nil - } - // The service controller verified all the protocols match on the ports, just check and use the first one - protocol := svcPorts[0].Protocol - if protocol != v1.ProtocolTCP && protocol != v1.ProtocolUDP { - return v1.ProtocolTCP, fmt.Errorf("invalid protocol %s, only TCP and UDP are supported", string(protocol)) - } - return protocol, nil -} - -func getPorts(svcPorts []v1.ServicePort) []string { - ports := []string{} - for _, p := range svcPorts { - ports = append(ports, strconv.Itoa(int(p.Port))) +func loadBalancerPortRange(ports []int) (string, error) { + if len(ports) == 0 { + return "", fmt.Errorf("no ports specified for GCE load balancer") } - return ports -} + minPort := 65536 + maxPort := 0 -func minMaxPort[T v1.ServicePort | string](svcPorts []T) (int32, int32) { - minPort := int32(65536) - maxPort := int32(0) - for _, svcPort := range svcPorts { - port := func(value any) int32 { - switch value.(type) { - case v1.ServicePort: - return value.(v1.ServicePort).Port - case string: - i, _ := strconv.ParseInt(value.(string), 10, 32) - return int32(i) - default: - return 0 - } - }(svcPort) + for _, port := range ports { if port < minPort { minPort = port } @@ -921,15 +1019,6 @@ func minMaxPort[T v1.ServicePort | string](svcPorts []T) (int32, int32) { maxPort = port } } - return minPort, maxPort -} - -func loadBalancerPortRange[T v1.ServicePort | string](svcPorts []T) (string, error) { - if len(svcPorts) == 0 { - return "", fmt.Errorf("no ports specified for GCE load balancer") - } - - minPort, maxPort := minMaxPort(svcPorts) return fmt.Sprintf("%d-%d", minPort, maxPort), nil } @@ -945,7 +1034,15 @@ func equalPorts(existingPorts, newPorts []string, existingPortRange, newPortRang // Existing forwarding rule contains a port range. To keep it that way, // compare new list of ports as if it was a port range, too. if len(newPorts) != 0 { - newPortRange, _ = loadBalancerPortRange(newPorts) + var portInts []int + for _, port := range newPorts { + portInt, err := strconv.Atoi(port) + if err != nil { + klog.Errorf("invalid port %s: %v", port, err) + } + portInts = append(portInts, portInt) + } + newPortRange, _ = loadBalancerPortRange(portInts) } return existingPortRange == newPortRange } @@ -974,15 +1071,19 @@ func (g *Cloud) firewallNeedsUpdate(name, serviceName, ipAddress string, ports [ if fw.Description != makeFirewallDescription(serviceName, ipAddress) { return true, true, nil } - if len(fw.Allowed) != 1 || (fw.Allowed[0].IPProtocol != "tcp" && fw.Allowed[0].IPProtocol != "udp") { - return true, true, nil + + groupedPorts := getPortsAndProtocols(ports) + expectedAllowed := make(map[string][]string) + for protocol, protocolPorts := range groupedPorts { + expectedAllowed[strings.ToLower(string(protocol))] = protocolPorts.portRanges + } + + actualAllowed := make(map[string][]string) + for _, allow := range fw.Allowed { + actualAllowed[strings.ToLower(allow.IPProtocol)] = allow.Ports } - // Make sure the allowed ports match. - portNums, portRanges, _ := getPortsAndProtocol(ports) - // This logic checks if the existing firewall rules contains either enumerated service ports or port ranges. - // This is to prevent unnecessary noop updates to the firewall rule when the existing firewall rule is - // set up via the previous pattern using enumerated ports instead of port ranges. - if !equalStringSets(portNums, fw.Allowed[0].Ports) && !equalStringSets(portRanges, fw.Allowed[0].Ports) { + + if !reflect.DeepEqual(expectedAllowed, actualAllowed) { return true, true, nil } @@ -1046,42 +1147,6 @@ func (g *Cloud) ensureHTTPHealthCheckFirewall(svc *v1.Service, serviceName, ipAd return nil } -func createForwardingRule(s CloudForwardingRuleService, name, serviceName, region, ipAddress, target string, ports []v1.ServicePort, netTier cloud.NetworkTier, enableDiscretePortForwarding bool) error { - frPorts := getPorts(ports) - protocol, err := getProtocol(ports) - if err != nil { - return err - } - portRange, err := loadBalancerPortRange(ports) - if err != nil { - return err - } - desc := makeServiceDescription(serviceName) - - rule := &compute.ForwardingRule{ - Name: name, - Description: desc, - IPAddress: ipAddress, - IPProtocol: string(protocol), - PortRange: portRange, - Target: target, - NetworkTier: netTier.ToGCEValue(), - } - - if len(frPorts) <= maxForwardedPorts && enableDiscretePortForwarding { - rule.Ports = frPorts - rule.PortRange = "" - } - - err = s.CreateRegionForwardingRule(rule, region) - - if err != nil && !isHTTPErrorCode(err, http.StatusConflict) { - return err - } - - return nil -} - func (g *Cloud) createFirewall(svc *v1.Service, name, desc, destinationIP string, sourceRanges utilnet.IPNetSet, ports []v1.ServicePort, hosts []*gceInstance) error { firewall, err := g.firewallObject(name, desc, destinationIP, sourceRanges, ports, hosts) if err != nil { @@ -1124,7 +1189,14 @@ func (g *Cloud) firewallObject(name, desc, destinationIP string, sourceRanges ut // GCE considers empty destinationRanges as "all" for ingress firewall-rules. // Concatenate service ports into port ranges. This help to workaround the gce firewall limitation where only // 100 ports or port ranges can be used in a firewall rule. - _, portRanges, _ := getPortsAndProtocol(ports) + groupedPorts := getPortsAndProtocols(ports) + var allowed []*compute.FirewallAllowed + for protocol, protocolPorts := range groupedPorts { + allowed = append(allowed, &compute.FirewallAllowed{ + IPProtocol: strings.ToLower(string(protocol)), + Ports: protocolPorts.portRanges, + }) + } // If the node tags to be used for this cluster have been predefined in the // provider config, just use them. Otherwise, invoke computeHostTags method to get the tags. @@ -1142,17 +1214,7 @@ func (g *Cloud) firewallObject(name, desc, destinationIP string, sourceRanges ut Network: g.networkURL, SourceRanges: sourceRanges.StringSlice(), TargetTags: hostTags, - Allowed: []*compute.FirewallAllowed{ - { - // TODO: Make this more generic. Currently this method is only - // used to create firewall rules for loadbalancers, which have - // exactly one protocol, so we can never end up with a list of - // mixed TCP and UDP ports. It should be possible to use a - // single firewall rule for both a TCP and UDP lb. - IPProtocol: strings.ToLower(string(ports[0].Protocol)), - Ports: portRanges, - }, - }, + Allowed: allowed, } if destinationIP != "" { firewall.DestinationRanges = []string{destinationIP} diff --git a/providers/gce/gce_loadbalancer_internal.go b/providers/gce/gce_loadbalancer_internal.go index 562d417127..28c8e2d6d1 100644 --- a/providers/gce/gce_loadbalancer_internal.go +++ b/providers/gce/gce_loadbalancer_internal.go @@ -29,7 +29,6 @@ import ( "github.com/GoogleCloudPlatform/k8s-cloud-provider/pkg/cloud" "github.com/GoogleCloudPlatform/k8s-cloud-provider/pkg/cloud/meta" - "github.com/google/go-cmp/cmp" compute "google.golang.org/api/compute/v1" v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/types" @@ -94,10 +93,13 @@ func (g *Cloud) ensureInternalLoadBalancer(clusterName, clusterID string, svc *v return nil, err } - ports, _, protocol := getPortsAndProtocol(svc.Spec.Ports) - if protocol != v1.ProtocolTCP && protocol != v1.ProtocolUDP { - return nil, fmt.Errorf("Invalid protocol %s, only TCP and UDP are supported", string(protocol)) + groupedPorts := getPortsAndProtocols(svc.Spec.Ports) + for protocol := range groupedPorts { + if protocol != v1.ProtocolTCP && protocol != v1.ProtocolUDP { + return nil, fmt.Errorf("Invalid protocol %s, only TCP and UDP are supported", string(protocol)) + } } + scheme := cloud.SchemeInternal options := getILBOptions(svc) if g.IsLegacyNetwork() { @@ -106,7 +108,7 @@ func (g *Cloud) ensureInternalLoadBalancer(clusterName, clusterID string, svc *v } sharedBackend := shareBackendService(svc) - backendServiceName := makeBackendServiceName(loadBalancerName, clusterID, sharedBackend, scheme, protocol, svc.Spec.SessionAffinity) + backendServiceName := makeBackendServiceName(loadBalancerName, clusterID, sharedBackend, scheme, groupedPorts, svc.Spec.SessionAffinity) backendServiceLink := g.getBackendServiceLink(backendServiceName) // Ensure instance groups exist and nodes are assigned to groups @@ -179,57 +181,121 @@ func (g *Cloud) ensureInternalLoadBalancer(clusterName, clusterID string, svc *v if err != nil { return nil, err } - newFwdRule := &compute.ForwardingRule{ - Name: loadBalancerName, - Description: fwdRuleDescriptionString, - IPAddress: ipToUse, - BackendService: backendServiceLink, - Ports: ports, - IPProtocol: string(protocol), - LoadBalancingScheme: string(scheme), - // Given that CreateGCECloud will attempt to determine the subnet based off the network, - // the subnetwork should rarely be unknown. - Subnetwork: subnetworkURL, - Network: g.networkURL, - } - if options.AllowGlobalAccess { - newFwdRule.AllowGlobalAccess = options.AllowGlobalAccess - } - if len(ports) > maxL4ILBPorts { - newFwdRule.Ports = nil - newFwdRule.AllPorts = true + + // Logic to handle multiple forwarding rules, one per protocol. + // Based on the logic for external load balancers. + + // Get all existing forwarding rules for this service. + // A service can have a forwarding rule with the base name, or with a protocol suffix. + // TODO: Remove existingFwdRules and use the one from the function. + // Check for forwarding rules with protocol suffixes. + op := &ensureOperation{} + g.findActualBackendServiceInternal(backendServiceName, op) + + existingForwardingRules, err := g.getExistingForwardingRules(loadBalancerName) + if err != nil { + return nil, err } + op.forwardingRulesActual = existingForwardingRules - fwdRuleDeleted := false - if existingFwdRule != nil && !forwardingRulesEqual(existingFwdRule, newFwdRule) { - // Delete existing forwarding rule before making changes to the backend service. For example - changing protocol - // of backend service without first deleting forwarding rule will throw an error since the linked forwarding - // rule would show the old protocol. - if klogV := klog.V(2); klogV.Enabled() { - frDiff := cmp.Diff(existingFwdRule, newFwdRule) - klogV.Infof("ensureInternalLoadBalancer(%v): forwarding rule changed - Existing - %+v\n, New - %+v\n, Diff(-existing, +new) - %s\n. Deleting existing forwarding rule.", loadBalancerName, existingFwdRule, newFwdRule, frDiff) + var createdFwdRules []*compute.ForwardingRule + + var desiredForwardingRules []*compute.ForwardingRule + for protocol, protocolPorts := range groupedPorts { + // Each protocol gets its own forwarding rule. + // If there's only one protocol, the forwarding rule name is the load balancer name. + // Otherwise, it's load-balancer-name-. + frName := loadBalancerName + if len(groupedPorts) > 1 { + frName = fmt.Sprintf("%s-%s", loadBalancerName, strings.ToLower(string(protocol))) } - if err = ignoreNotFound(g.DeleteRegionForwardingRule(loadBalancerName, g.region)); err != nil { - return nil, err + + forwardingRule := &compute.ForwardingRule{ + Name: frName, + Description: fwdRuleDescriptionString, + IPAddress: ipToUse, + BackendService: backendServiceLink, + IPProtocol: string(protocol), + LoadBalancingScheme: string(scheme), + Subnetwork: subnetworkURL, + Network: g.networkURL, + } + if options.AllowGlobalAccess { + forwardingRule.AllowGlobalAccess = options.AllowGlobalAccess + } + if len(protocolPorts.ports) > maxL4ILBPorts { + forwardingRule.Ports = nil + forwardingRule.AllPorts = true + } else { + for _, port := range protocolPorts.ports { + forwardingRule.Ports = append(forwardingRule.Ports, strconv.Itoa(port)) + } + } + + // Check if a forwarding rule for this protocol already exists; reuse the same name if possible. + for _, existingFwdRule := range existingForwardingRules { + if existingFwdRule.IPProtocol == forwardingRule.IPProtocol { + forwardingRule.Name = existingFwdRule.Name + break + } } - fwdRuleDeleted = true + + desiredForwardingRules = append(desiredForwardingRules, forwardingRule) } + op.forwardingRulesDesired = desiredForwardingRules bsDescription := makeBackendServiceDescription(nm, sharedBackend) - err = g.ensureInternalBackendService(backendServiceName, bsDescription, svc.Spec.SessionAffinity, scheme, protocol, igLinks, hc.SelfLink) - if err != nil { + backendServiceProtocol := "UNSPECIFIED" + if len(desiredForwardingRules) == 1 { + backendServiceProtocol = desiredForwardingRules[0].IPProtocol + } + g.buildDesiredBackendServiceInternal(backendServiceName, bsDescription, svc.Spec.SessionAffinity, scheme, backendServiceProtocol, igLinks, hc.SelfLink, op) + + if err := g.ensureInternalBackendService(backendServiceName, op); err != nil { return nil, err } - if fwdRuleDeleted || existingFwdRule == nil { - // existing rule has been deleted, pass in nil - if err := g.ensureInternalForwardingRule(nil, newFwdRule); err != nil { + for _, desiredForwardingRule := range desiredForwardingRules { + var existingFwdRuleForProtocol *compute.ForwardingRule + for _, existingFwdRule := range existingForwardingRules { + if existingFwdRule.IPProtocol == desiredForwardingRule.IPProtocol { + existingFwdRuleForProtocol = existingFwdRule + break + } + } + + if err := g.ensureInternalForwardingRule(existingFwdRuleForProtocol, desiredForwardingRule); err != nil { + return nil, err + } + createdFwdRules = append(createdFwdRules, desiredForwardingRule) + } + + // Delete any forwarding rules that are no longer needed. + for frName, fr := range existingForwardingRules { + var matching *compute.ForwardingRule + for _, desiredForwardingRule := range desiredForwardingRules { + if desiredForwardingRule.Name == fr.Name { + matching = desiredForwardingRule + continue + } + } + if matching != nil { + continue + } + + klog.V(2).Infof("ensureInternalLoadBalancer(%v): deleting stale forwarding rule %s", loadBalancerName, frName) + if err := ignoreNotFound(g.DeleteRegionForwardingRule(frName, g.region)); err != nil { return nil, err } } + if len(desiredForwardingRules) == 0 { + klog.V(2).Infof("ensureInternalLoadBalancer(%v): no forwarding rules needed, all deleted.", loadBalancerName) + return &v1.LoadBalancerStatus{}, nil + } + // Get the most recent forwarding rule for the address. - updatedFwdRule, err := g.GetRegionForwardingRule(loadBalancerName, g.region) + updatedFwdRule, err := g.GetRegionForwardingRule(createdFwdRules[0].Name, g.region) if err != nil { return nil, err } @@ -240,11 +306,6 @@ func (g *Cloud) ensureInternalLoadBalancer(clusterName, clusterID string, svc *v return nil, err } - // Delete the previous internal load balancer resources if necessary - if existingBackendService != nil { - g.clearPreviousInternalResources(svc, loadBalancerName, existingBackendService, backendServiceName, hcName) - } - serviceState.InSuccess = true if options.AllowGlobalAccess { serviceState.EnabledGlobalAccess = true @@ -359,12 +420,18 @@ func (g *Cloud) updateInternalLoadBalancer(clusterName, clusterID string, svc *v } // Generate the backend service name - _, _, protocol := getPortsAndProtocol(svc.Spec.Ports) + groupedPorts := getPortsAndProtocols(svc.Spec.Ports) scheme := cloud.SchemeInternal loadBalancerName := g.GetLoadBalancerName(context.TODO(), clusterName, svc) - backendServiceName := makeBackendServiceName(loadBalancerName, clusterID, shareBackendService(svc), scheme, protocol, svc.Spec.SessionAffinity) + sharedBackend := shareBackendService(svc) + + backendServiceName := makeBackendServiceName(loadBalancerName, clusterID, sharedBackend, scheme, groupedPorts, svc.Spec.SessionAffinity) // Ensure the backend service has the proper backend/instance-group links - return g.ensureInternalBackendServiceGroups(backendServiceName, igLinks) + if err := g.ensureInternalBackendServiceGroups(backendServiceName, igLinks); err != nil { + return err + } + + return nil } func (g *Cloud) ensureInternalLoadBalancerDeleted(clusterName, clusterID string, svc *v1.Service) error { @@ -383,7 +450,7 @@ func (g *Cloud) ensureInternalLoadBalancerDeleted(clusterName, clusterID string, loadBalancerName := g.GetLoadBalancerName(context.TODO(), clusterName, svc) svcNamespacedName := types.NamespacedName{Name: svc.Name, Namespace: svc.Namespace} - _, _, protocol := getPortsAndProtocol(svc.Spec.Ports) + groupedPorts := getPortsAndProtocols(svc.Spec.Ports) scheme := cloud.SchemeInternal sharedBackend := shareBackendService(svc) sharedHealthCheck := !servicehelpers.RequestsOnlyLocalTraffic(svc) @@ -394,12 +461,22 @@ func (g *Cloud) ensureInternalLoadBalancerDeleted(clusterName, clusterID string, klog.V(2).Infof("ensureInternalLoadBalancerDeleted(%v): attempting delete of region internal address", loadBalancerName) ensureAddressDeleted(g, loadBalancerName, g.region) - klog.V(2).Infof("ensureInternalLoadBalancerDeleted(%v): deleting region internal forwarding rule", loadBalancerName) - if err := ignoreNotFound(g.DeleteRegionForwardingRule(loadBalancerName, g.region)); err != nil { + // Get existing forwarding rules. + existingForwardingRules, err := g.getExistingForwardingRules(loadBalancerName) + if err != nil { return err } - backendServiceName := makeBackendServiceName(loadBalancerName, clusterID, sharedBackend, scheme, protocol, svc.Spec.SessionAffinity) + // Delete existing forwarding rules. + for _, existingForwardingRule := range existingForwardingRules { + klog.V(2).Infof("ensureInternalLoadBalancerDeleted(%v): deleting region internal forwarding rule %s", loadBalancerName, existingForwardingRule.Name) + if err := ignoreNotFound(g.DeleteRegionForwardingRule(existingForwardingRule.Name, g.region)); err != nil { + return err + } + } + + // Delete existing backend service. + backendServiceName := makeBackendServiceName(loadBalancerName, clusterID, sharedBackend, scheme, groupedPorts, svc.Spec.SessionAffinity) klog.V(2).Infof("ensureInternalLoadBalancerDeleted(%v): deleting region backend service %v", loadBalancerName, backendServiceName) if err := g.teardownInternalBackendService(backendServiceName); err != nil { return err @@ -495,7 +572,7 @@ func (g *Cloud) teardownInternalHealthCheckAndFirewall(svc *v1.Service, hcName s return nil } -func (g *Cloud) ensureInternalFirewall(svc *v1.Service, fwName, fwDesc, destinationIP string, sourceRanges []string, portRanges []string, protocol v1.Protocol, nodes []*v1.Node, legacyFwName string) error { +func (g *Cloud) ensureInternalFirewall(svc *v1.Service, fwName, fwDesc, destinationIP string, sourceRanges []string, groupedPorts map[v1.Protocol]ProtocolPorts, nodes []*v1.Node, legacyFwName string) error { klog.V(2).Infof("ensureInternalFirewall(%v): checking existing firewall", fwName) targetTags, err := g.GetNodeTags(nodeNames(nodes)) if err != nil { @@ -536,12 +613,13 @@ func (g *Cloud) ensureInternalFirewall(svc *v1.Service, fwName, fwDesc, destinat Network: g.networkURL, SourceRanges: sourceRanges, TargetTags: targetTags, - Allowed: []*compute.FirewallAllowed{ - { - IPProtocol: strings.ToLower(string(protocol)), - Ports: portRanges, - }, - }, + } + + for protocol, protocolPorts := range groupedPorts { + expectedFirewall.Allowed = append(expectedFirewall.Allowed, &compute.FirewallAllowed{ + IPProtocol: strings.ToLower(string(protocol)), + Ports: protocolPorts.portRanges, + }) } if destinationIP != "" { @@ -576,12 +654,12 @@ func (g *Cloud) ensureInternalFirewall(svc *v1.Service, fwName, fwDesc, destinat func (g *Cloud) ensureInternalFirewalls(loadBalancerName, ipAddress, clusterID string, nm types.NamespacedName, svc *v1.Service, healthCheckPort string, sharedHealthCheck bool, nodes []*v1.Node) error { // First firewall is for ingress traffic fwDesc := makeFirewallDescription(nm.String(), ipAddress) - _, portRanges, protocol := getPortsAndProtocol(svc.Spec.Ports) + groupedPorts := getPortsAndProtocols(svc.Spec.Ports) sourceRanges, err := servicehelpers.GetLoadBalancerSourceRanges(svc) if err != nil { return err } - err = g.ensureInternalFirewall(svc, MakeFirewallName(loadBalancerName), fwDesc, ipAddress, sourceRanges.StringSlice(), portRanges, protocol, nodes, loadBalancerName) + err = g.ensureInternalFirewall(svc, MakeFirewallName(loadBalancerName), fwDesc, ipAddress, sourceRanges.StringSlice(), groupedPorts, nodes, loadBalancerName) if err != nil { return err } @@ -777,17 +855,18 @@ func (g *Cloud) ensureInternalInstanceGroupsDeleted(name string) error { return nil } -func (g *Cloud) ensureInternalBackendService(name, description string, affinityType v1.ServiceAffinity, scheme cloud.LbScheme, protocol v1.Protocol, igLinks []string, hcLink string) error { - klog.V(2).Infof("ensureInternalBackendService(%v, %v, %v): checking existing backend service with %d groups", name, scheme, protocol, len(igLinks)) - bs, err := g.GetRegionBackendService(name, g.region) - if err != nil && !isNotFound(err) { - return err - } +type ensureOperation struct { + backendServiceDesired *compute.BackendService + backendServiceActual *compute.BackendService + forwardingRulesDesired []*compute.ForwardingRule + forwardingRulesActual map[string]*compute.ForwardingRule +} +func (g *Cloud) buildDesiredBackendServiceInternal(name, description string, affinityType v1.ServiceAffinity, scheme cloud.LbScheme, protocol string, igLinks []string, hcLink string, op *ensureOperation) error { backends := backendsFromGroupLinks(igLinks) - expectedBS := &compute.BackendService{ + bs := &compute.BackendService{ Name: name, - Protocol: string(protocol), + Protocol: protocol, Description: description, HealthChecks: []string{hcLink}, Backends: backends, @@ -795,10 +874,27 @@ func (g *Cloud) ensureInternalBackendService(name, description string, affinityT LoadBalancingScheme: string(scheme), } + op.backendServiceDesired = bs + return nil +} + +func (g *Cloud) findActualBackendServiceInternal(name string, op *ensureOperation) error { + klog.V(2).Infof("findActualBackendServiceInternal(%v): checking existing backend services", name) + + bs, err := g.GetRegionBackendService(name, g.region) + if err != nil && !isNotFound(err) { + return err + } + + op.backendServiceActual = bs + return nil +} + +func (g *Cloud) ensureInternalBackendService(name string, op *ensureOperation) error { // Create backend service if none was found - if bs == nil { + if op.backendServiceActual == nil { klog.V(2).Infof("ensureInternalBackendService: creating backend service %v", name) - err := g.CreateRegionBackendService(expectedBS, g.region) + err := g.CreateRegionBackendService(op.backendServiceDesired, g.region) if err != nil { return err } @@ -806,14 +902,25 @@ func (g *Cloud) ensureInternalBackendService(name, description string, affinityT return nil } - if backendSvcEqual(expectedBS, bs) { + if backendSvcEqual(op.backendServiceActual, op.backendServiceDesired) { return nil } + // Delete existing forwarding rule before making changes to the backend service. For example - changing protocol + // of backend service without first deleting forwarding rule will throw an error since the linked forwarding + // rule would show the old protocol. + for _, fr := range op.forwardingRulesActual { + klog.V(2).Infof("ensureInternalBackendService: deleting forwarding rule %v", fr.Name) + if err := g.DeleteRegionForwardingRule(fr.Name, g.region); ignoreNotFound(err) != nil { + return err + } + } + op.forwardingRulesActual = nil + klog.V(2).Infof("ensureInternalBackendService: updating backend service %v", name) // Set fingerprint for optimistic locking - expectedBS.Fingerprint = bs.Fingerprint - if err := g.UpdateRegionBackendService(expectedBS, g.region); err != nil { + op.backendServiceDesired.Fingerprint = op.backendServiceActual.Fingerprint + if err := g.UpdateRegionBackendService(op.backendServiceDesired, g.region); err != nil { return err } klog.V(2).Infof("ensureInternalBackendService: updated backend service %v successfully", name) @@ -957,20 +1064,28 @@ func backendSvcEqual(a, b *compute.BackendService) bool { backendsListEqual(a.Backends, b.Backends) } -func getPortsAndProtocol(svcPorts []v1.ServicePort) (ports []string, portRanges []string, protocol v1.Protocol) { +type ProtocolPorts struct { + ports []int + portRanges []string +} + +func getPortsAndProtocols(svcPorts []v1.ServicePort) map[v1.Protocol]ProtocolPorts { if len(svcPorts) == 0 { - return []string{}, []string{}, v1.ProtocolUDP + return nil } - // GCP doesn't support multiple protocols for a single load balancer - protocol = svcPorts[0].Protocol - portInts := []int{} + m := make(map[v1.Protocol]ProtocolPorts) for _, p := range svcPorts { - ports = append(ports, strconv.Itoa(int(p.Port))) - portInts = append(portInts, int(p.Port)) + ports := m[p.Protocol] + ports.ports = append(ports.ports, int(p.Port)) + } + + for protocol, ports := range m { + ports.portRanges = getPortRanges(ports.ports) + m[protocol] = ports } - return ports, getPortRanges(portInts), protocol + return m } func getPortRanges(ports []int) (ranges []string) { diff --git a/providers/gce/gce_loadbalancer_naming.go b/providers/gce/gce_loadbalancer_naming.go index 01c8765e94..c3906e983f 100644 --- a/providers/gce/gce_loadbalancer_naming.go +++ b/providers/gce/gce_loadbalancer_naming.go @@ -23,10 +23,11 @@ import ( "crypto/sha1" "encoding/hex" "fmt" + "sort" "strings" "github.com/GoogleCloudPlatform/k8s-cloud-provider/pkg/cloud" - "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/types" ) @@ -42,7 +43,7 @@ func makeInstanceGroupName(clusterID string) string { return fmt.Sprintf("%s--%s", prefix, clusterID) } -func makeBackendServiceName(loadBalancerName, clusterID string, shared bool, scheme cloud.LbScheme, protocol v1.Protocol, svcAffinity v1.ServiceAffinity) string { +func makeBackendServiceName(loadBalancerName, clusterID string, shared bool, scheme cloud.LbScheme, protocols map[v1.Protocol]ProtocolPorts, svcAffinity v1.ServiceAffinity) string { if shared { hash := sha1.New() @@ -52,6 +53,19 @@ func makeBackendServiceName(loadBalancerName, clusterID string, shared bool, sch hashed := hex.EncodeToString(hash.Sum(nil)) hashed = hashed[:16] + // We pick TCP as the default, otherwise we pick the first protocol alphabetically. + chosenProtocol := "" + if _, found := protocols[v1.ProtocolTCP]; found { + chosenProtocol = "tcp" + } else { + var keys []string + for protocol := range protocols { + keys = append(keys, strings.ToLower(string(protocol))) + } + sort.Strings(keys) + chosenProtocol = keys[0] + } + // k8s- 4 // {clusterid}- 17 // {scheme}- 9 (internal/external) @@ -60,7 +74,7 @@ func makeBackendServiceName(loadBalancerName, clusterID string, shared bool, sch // {suffix} 16 (hash of settings) // ----------------- // 55 characters used - return fmt.Sprintf("k8s-%s-%s-%s-nmv1-%s", clusterID, strings.ToLower(string(scheme)), strings.ToLower(string(protocol)), hashed) + return fmt.Sprintf("k8s-%s-%s-%s-nmv1-%s", clusterID, strings.ToLower(string(scheme)), chosenProtocol, hashed) } return loadBalancerName } diff --git a/providers/gce/gce_loadbalancer_utils_test.go b/providers/gce/gce_loadbalancer_utils_test.go index cf06b3b1d0..507a44bb4d 100644 --- a/providers/gce/gce_loadbalancer_utils_test.go +++ b/providers/gce/gce_loadbalancer_utils_test.go @@ -248,7 +248,8 @@ func assertInternalLbResources(t *testing.T, gce *Cloud, apiService *v1.Service, // Check that BackendService exists sharedBackend := shareBackendService(apiService) - backendServiceName := makeBackendServiceName(lbName, vals.ClusterID, sharedBackend, cloud.SchemeInternal, "TCP", apiService.Spec.SessionAffinity) + groupedPorts := getPortsAndProtocols(apiService.Spec.Ports) + backendServiceName := makeBackendServiceName(lbName, vals.ClusterID, sharedBackend, cloud.SchemeInternal, groupedPorts, apiService.Spec.SessionAffinity) backendServiceLink := gce.getBackendServiceLink(backendServiceName) bs, err := gce.GetRegionBackendService(backendServiceName, gce.region)