Skip to content

Commit 4d49b9a

Browse files
committed
Update protocol ports
1 parent 3224275 commit 4d49b9a

File tree

4 files changed

+83
-82
lines changed

4 files changed

+83
-82
lines changed

providers/gce/gce_loadbalancer_external.go

Lines changed: 27 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -377,7 +377,7 @@ func (g *Cloud) getExistingForwardingRules(loadBalancerName string) (map[string]
377377
// buildDesiredForwardingRules builds the desired forwarding rules for the given load balancer.
378378
func (g *Cloud) buildDesiredForwardingRules(loadBalancerName, serviceName, ipAddress, targetPoolURL string, apiService *v1.Service, netTier cloud.NetworkTier, existingFRs map[string]*compute.ForwardingRule) (map[string]*compute.ForwardingRule, error) {
379379
desiredFRs := make(map[string]*compute.ForwardingRule)
380-
groupedPorts := groupPortsByProtocol(apiService.Spec.Ports)
380+
groupedPorts := getPortsAndProtocols(apiService.Spec.Ports)
381381
desc := makeServiceDescription(serviceName)
382382

383383
// Find the legacy forwarding rule to minimize changes.
@@ -398,7 +398,7 @@ func (g *Cloud) buildDesiredForwardingRules(loadBalancerName, serviceName, ipAdd
398398
frName = loadBalancerName
399399
}
400400

401-
portRange, err := loadBalancerPortRange(protocolPorts)
401+
portRange, err := loadBalancerPortRange(protocolPorts.ports)
402402
if err != nil {
403403
return nil, err
404404
}
@@ -413,9 +413,9 @@ func (g *Cloud) buildDesiredForwardingRules(loadBalancerName, serviceName, ipAdd
413413
NetworkTier: netTier.ToGCEValue(),
414414
}
415415

416-
if len(protocolPorts) <= maxForwardedPorts && g.enableDiscretePortForwarding {
417-
for _, p := range protocolPorts {
418-
rule.Ports = append(rule.Ports, strconv.Itoa(int(p.Port)))
416+
if len(protocolPorts.ports) <= maxForwardedPorts && g.enableDiscretePortForwarding {
417+
for _, p := range protocolPorts.ports {
418+
rule.Ports = append(rule.Ports, strconv.Itoa(p))
419419
}
420420
rule.PortRange = ""
421421
}
@@ -1003,46 +1003,22 @@ func hostURLToComparablePath(hostURL string) string {
10031003
return hostURL[idx:]
10041004
}
10051005

1006-
// func getPorts(svcPorts []v1.ServicePort) []string {
1007-
// ports := []string{}
1008-
// for _, p := range svcPorts {
1009-
// ports = append(ports, strconv.Itoa(int(p.Port)))
1010-
// }
1011-
1012-
// return ports
1013-
// }
1014-
1015-
func minMaxPort[T v1.ServicePort | string](svcPorts []T) (int32, int32) {
1016-
minPort := int32(65536)
1017-
maxPort := int32(0)
1018-
for _, svcPort := range svcPorts {
1019-
port := func(value any) int32 {
1020-
switch value.(type) {
1021-
case v1.ServicePort:
1022-
return value.(v1.ServicePort).Port
1023-
case string:
1024-
i, _ := strconv.ParseInt(value.(string), 10, 32)
1025-
return int32(i)
1026-
default:
1027-
return 0
1028-
}
1029-
}(svcPort)
1006+
func loadBalancerPortRange(ports []int) (string, error) {
1007+
if len(ports) == 0 {
1008+
return "", fmt.Errorf("no ports specified for GCE load balancer")
1009+
}
1010+
1011+
minPort := 65536
1012+
maxPort := 0
1013+
1014+
for _, port := range ports {
10301015
if port < minPort {
10311016
minPort = port
10321017
}
10331018
if port > maxPort {
10341019
maxPort = port
10351020
}
10361021
}
1037-
return minPort, maxPort
1038-
}
1039-
1040-
func loadBalancerPortRange[T v1.ServicePort | string](svcPorts []T) (string, error) {
1041-
if len(svcPorts) == 0 {
1042-
return "", fmt.Errorf("no ports specified for GCE load balancer")
1043-
}
1044-
1045-
minPort, maxPort := minMaxPort(svcPorts)
10461022
return fmt.Sprintf("%d-%d", minPort, maxPort), nil
10471023
}
10481024

@@ -1058,19 +1034,19 @@ func equalPorts(existingPorts, newPorts []string, existingPortRange, newPortRang
10581034
// Existing forwarding rule contains a port range. To keep it that way,
10591035
// compare new list of ports as if it was a port range, too.
10601036
if len(newPorts) != 0 {
1061-
newPortRange, _ = loadBalancerPortRange(newPorts)
1037+
var portInts []int
1038+
for _, port := range newPorts {
1039+
portInt, err := strconv.Atoi(port)
1040+
if err != nil {
1041+
klog.Errorf("invalid port %s: %v", port, err)
1042+
}
1043+
portInts = append(portInts, portInt)
1044+
}
1045+
newPortRange, _ = loadBalancerPortRange(portInts)
10621046
}
10631047
return existingPortRange == newPortRange
10641048
}
10651049

1066-
func groupPortsByProtocol(ports []v1.ServicePort) map[v1.Protocol][]v1.ServicePort {
1067-
grouped := make(map[v1.Protocol][]v1.ServicePort)
1068-
for _, port := range ports {
1069-
grouped[port.Protocol] = append(grouped[port.Protocol], port)
1070-
}
1071-
return grouped
1072-
}
1073-
10741050
// translate from what K8s supports to what the cloud provider supports for session affinity.
10751051
func translateAffinityType(affinityType v1.ServiceAffinity) string {
10761052
switch affinityType {
@@ -1096,11 +1072,10 @@ func (g *Cloud) firewallNeedsUpdate(name, serviceName, ipAddress string, ports [
10961072
return true, true, nil
10971073
}
10981074

1099-
groupedPorts := groupPortsByProtocol(ports)
1075+
groupedPorts := getPortsAndProtocols(ports)
11001076
expectedAllowed := make(map[string][]string)
11011077
for protocol, protocolPorts := range groupedPorts {
1102-
_, portRanges, _ := getPortsAndProtocol(protocolPorts)
1103-
expectedAllowed[strings.ToLower(string(protocol))] = portRanges
1078+
expectedAllowed[strings.ToLower(string(protocol))] = protocolPorts.portRanges
11041079
}
11051080

11061081
actualAllowed := make(map[string][]string)
@@ -1214,13 +1189,12 @@ func (g *Cloud) firewallObject(name, desc, destinationIP string, sourceRanges ut
12141189
// GCE considers empty destinationRanges as "all" for ingress firewall-rules.
12151190
// Concatenate service ports into port ranges. This help to workaround the gce firewall limitation where only
12161191
// 100 ports or port ranges can be used in a firewall rule.
1217-
groupedPorts := groupPortsByProtocol(ports)
1192+
groupedPorts := getPortsAndProtocols(ports)
12181193
var allowed []*compute.FirewallAllowed
12191194
for protocol, protocolPorts := range groupedPorts {
1220-
_, portRanges, _ := getPortsAndProtocol(protocolPorts)
12211195
allowed = append(allowed, &compute.FirewallAllowed{
12221196
IPProtocol: strings.ToLower(string(protocol)),
1223-
Ports: portRanges,
1197+
Ports: protocolPorts.portRanges,
12241198
})
12251199
}
12261200

providers/gce/gce_loadbalancer_internal.go

Lines changed: 37 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -94,10 +94,13 @@ func (g *Cloud) ensureInternalLoadBalancer(clusterName, clusterID string, svc *v
9494
return nil, err
9595
}
9696

97-
ports, _, protocol := getPortsAndProtocol(svc.Spec.Ports)
98-
if protocol != v1.ProtocolTCP && protocol != v1.ProtocolUDP {
99-
return nil, fmt.Errorf("Invalid protocol %s, only TCP and UDP are supported", string(protocol))
97+
groupedPorts := getPortsAndProtocols(svc.Spec.Ports)
98+
for protocol := range groupedPorts {
99+
if protocol != v1.ProtocolTCP && protocol != v1.ProtocolUDP {
100+
return nil, fmt.Errorf("Invalid protocol %s, only TCP and UDP are supported", string(protocol))
101+
}
100102
}
103+
101104
scheme := cloud.SchemeInternal
102105
options := getILBOptions(svc)
103106
if g.IsLegacyNetwork() {
@@ -106,7 +109,7 @@ func (g *Cloud) ensureInternalLoadBalancer(clusterName, clusterID string, svc *v
106109
}
107110

108111
sharedBackend := shareBackendService(svc)
109-
backendServiceName := makeBackendServiceName(loadBalancerName, clusterID, sharedBackend, scheme, protocol, svc.Spec.SessionAffinity)
112+
backendServiceName := makeBackendServiceName(loadBalancerName, clusterID, sharedBackend, scheme, groupedPorts, svc.Spec.SessionAffinity)
110113
backendServiceLink := g.getBackendServiceLink(backendServiceName)
111114

112115
// Ensure instance groups exist and nodes are assigned to groups
@@ -359,10 +362,10 @@ func (g *Cloud) updateInternalLoadBalancer(clusterName, clusterID string, svc *v
359362
}
360363

361364
// Generate the backend service name
362-
_, _, protocol := getPortsAndProtocol(svc.Spec.Ports)
365+
groupedPorts := getPortsAndProtocols(svc.Spec.Ports)
363366
scheme := cloud.SchemeInternal
364367
loadBalancerName := g.GetLoadBalancerName(context.TODO(), clusterName, svc)
365-
backendServiceName := makeBackendServiceName(loadBalancerName, clusterID, shareBackendService(svc), scheme, protocol, svc.Spec.SessionAffinity)
368+
backendServiceName := makeBackendServiceName(loadBalancerName, clusterID, shareBackendService(svc), scheme, groupedPorts, svc.Spec.SessionAffinity)
366369
// Ensure the backend service has the proper backend/instance-group links
367370
return g.ensureInternalBackendServiceGroups(backendServiceName, igLinks)
368371
}
@@ -383,7 +386,7 @@ func (g *Cloud) ensureInternalLoadBalancerDeleted(clusterName, clusterID string,
383386

384387
loadBalancerName := g.GetLoadBalancerName(context.TODO(), clusterName, svc)
385388
svcNamespacedName := types.NamespacedName{Name: svc.Name, Namespace: svc.Namespace}
386-
_, _, protocol := getPortsAndProtocol(svc.Spec.Ports)
389+
groupedPorts := getPortsAndProtocols(svc.Spec.Ports)
387390
scheme := cloud.SchemeInternal
388391
sharedBackend := shareBackendService(svc)
389392
sharedHealthCheck := !servicehelpers.RequestsOnlyLocalTraffic(svc)
@@ -399,7 +402,7 @@ func (g *Cloud) ensureInternalLoadBalancerDeleted(clusterName, clusterID string,
399402
return err
400403
}
401404

402-
backendServiceName := makeBackendServiceName(loadBalancerName, clusterID, sharedBackend, scheme, protocol, svc.Spec.SessionAffinity)
405+
backendServiceName := makeBackendServiceName(loadBalancerName, clusterID, sharedBackend, scheme, groupedPorts, svc.Spec.SessionAffinity)
403406
klog.V(2).Infof("ensureInternalLoadBalancerDeleted(%v): deleting region backend service %v", loadBalancerName, backendServiceName)
404407
if err := g.teardownInternalBackendService(backendServiceName); err != nil {
405408
return err
@@ -495,7 +498,7 @@ func (g *Cloud) teardownInternalHealthCheckAndFirewall(svc *v1.Service, hcName s
495498
return nil
496499
}
497500

498-
func (g *Cloud) ensureInternalFirewall(svc *v1.Service, fwName, fwDesc, destinationIP string, sourceRanges []string, portRanges []string, protocol v1.Protocol, nodes []*v1.Node, legacyFwName string) error {
501+
func (g *Cloud) ensureInternalFirewall(svc *v1.Service, fwName, fwDesc, destinationIP string, sourceRanges []string, groupedPorts map[v1.Protocol]ProtocolPorts, nodes []*v1.Node, legacyFwName string) error {
499502
klog.V(2).Infof("ensureInternalFirewall(%v): checking existing firewall", fwName)
500503
targetTags, err := g.GetNodeTags(nodeNames(nodes))
501504
if err != nil {
@@ -536,12 +539,13 @@ func (g *Cloud) ensureInternalFirewall(svc *v1.Service, fwName, fwDesc, destinat
536539
Network: g.networkURL,
537540
SourceRanges: sourceRanges,
538541
TargetTags: targetTags,
539-
Allowed: []*compute.FirewallAllowed{
540-
{
541-
IPProtocol: strings.ToLower(string(protocol)),
542-
Ports: portRanges,
543-
},
544-
},
542+
}
543+
544+
for protocol, protocolPorts := range groupedPorts {
545+
expectedFirewall.Allowed = append(expectedFirewall.Allowed, &compute.FirewallAllowed{
546+
IPProtocol: strings.ToLower(string(protocol)),
547+
Ports: protocolPorts.portRanges,
548+
})
545549
}
546550

547551
if destinationIP != "" {
@@ -576,12 +580,12 @@ func (g *Cloud) ensureInternalFirewall(svc *v1.Service, fwName, fwDesc, destinat
576580
func (g *Cloud) ensureInternalFirewalls(loadBalancerName, ipAddress, clusterID string, nm types.NamespacedName, svc *v1.Service, healthCheckPort string, sharedHealthCheck bool, nodes []*v1.Node) error {
577581
// First firewall is for ingress traffic
578582
fwDesc := makeFirewallDescription(nm.String(), ipAddress)
579-
_, portRanges, protocol := getPortsAndProtocol(svc.Spec.Ports)
583+
groupedPorts := getPortsAndProtocols(svc.Spec.Ports)
580584
sourceRanges, err := servicehelpers.GetLoadBalancerSourceRanges(svc)
581585
if err != nil {
582586
return err
583587
}
584-
err = g.ensureInternalFirewall(svc, MakeFirewallName(loadBalancerName), fwDesc, ipAddress, sourceRanges.StringSlice(), portRanges, protocol, nodes, loadBalancerName)
588+
err = g.ensureInternalFirewall(svc, MakeFirewallName(loadBalancerName), fwDesc, ipAddress, sourceRanges.StringSlice(), groupedPorts, nodes, loadBalancerName)
585589
if err != nil {
586590
return err
587591
}
@@ -957,20 +961,28 @@ func backendSvcEqual(a, b *compute.BackendService) bool {
957961
backendsListEqual(a.Backends, b.Backends)
958962
}
959963

960-
func getPortsAndProtocol(svcPorts []v1.ServicePort) (ports []string, portRanges []string, protocol v1.Protocol) {
964+
type ProtocolPorts struct {
965+
ports []int
966+
portRanges []string
967+
}
968+
969+
func getPortsAndProtocols(svcPorts []v1.ServicePort) map[v1.Protocol]ProtocolPorts {
961970
if len(svcPorts) == 0 {
962-
return []string{}, []string{}, v1.ProtocolUDP
971+
return nil
963972
}
964973

965-
// GCP doesn't support multiple protocols for a single load balancer
966-
protocol = svcPorts[0].Protocol
967-
portInts := []int{}
974+
m := make(map[v1.Protocol]ProtocolPorts)
968975
for _, p := range svcPorts {
969-
ports = append(ports, strconv.Itoa(int(p.Port)))
970-
portInts = append(portInts, int(p.Port))
976+
ports := m[p.Protocol]
977+
ports.ports = append(ports.ports, int(p.Port))
978+
}
979+
980+
for protocol, ports := range m {
981+
ports.portRanges = getPortRanges(ports.ports)
982+
m[protocol] = ports
971983
}
972984

973-
return ports, getPortRanges(portInts), protocol
985+
return m
974986
}
975987

976988
func getPortRanges(ports []int) (ranges []string) {

providers/gce/gce_loadbalancer_naming.go

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,10 +23,11 @@ import (
2323
"crypto/sha1"
2424
"encoding/hex"
2525
"fmt"
26+
"sort"
2627
"strings"
2728

2829
"github.com/GoogleCloudPlatform/k8s-cloud-provider/pkg/cloud"
29-
"k8s.io/api/core/v1"
30+
v1 "k8s.io/api/core/v1"
3031
"k8s.io/apimachinery/pkg/types"
3132
)
3233

@@ -42,7 +43,7 @@ func makeInstanceGroupName(clusterID string) string {
4243
return fmt.Sprintf("%s--%s", prefix, clusterID)
4344
}
4445

45-
func makeBackendServiceName(loadBalancerName, clusterID string, shared bool, scheme cloud.LbScheme, protocol v1.Protocol, svcAffinity v1.ServiceAffinity) string {
46+
func makeBackendServiceName(loadBalancerName, clusterID string, shared bool, scheme cloud.LbScheme, protocols map[v1.Protocol]ProtocolPorts, svcAffinity v1.ServiceAffinity) string {
4647
if shared {
4748
hash := sha1.New()
4849

@@ -52,6 +53,19 @@ func makeBackendServiceName(loadBalancerName, clusterID string, shared bool, sch
5253
hashed := hex.EncodeToString(hash.Sum(nil))
5354
hashed = hashed[:16]
5455

56+
// We pick TCP as the default, otherwise we pick the first protocol alphabetically.
57+
chosenProtocol := ""
58+
if _, found := protocols[v1.ProtocolTCP]; found {
59+
chosenProtocol = "tcp"
60+
} else {
61+
var keys []string
62+
for protocol := range protocols {
63+
keys = append(keys, strings.ToLower(string(protocol)))
64+
}
65+
sort.Strings(keys)
66+
chosenProtocol = keys[0]
67+
}
68+
5569
// k8s- 4
5670
// {clusterid}- 17
5771
// {scheme}- 9 (internal/external)
@@ -60,7 +74,7 @@ func makeBackendServiceName(loadBalancerName, clusterID string, shared bool, sch
6074
// {suffix} 16 (hash of settings)
6175
// -----------------
6276
// 55 characters used
63-
return fmt.Sprintf("k8s-%s-%s-%s-nmv1-%s", clusterID, strings.ToLower(string(scheme)), strings.ToLower(string(protocol)), hashed)
77+
return fmt.Sprintf("k8s-%s-%s-%s-nmv1-%s", clusterID, strings.ToLower(string(scheme)), chosenProtocol, hashed)
6478
}
6579
return loadBalancerName
6680
}

providers/gce/gce_loadbalancer_utils_test.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -248,7 +248,8 @@ func assertInternalLbResources(t *testing.T, gce *Cloud, apiService *v1.Service,
248248

249249
// Check that BackendService exists
250250
sharedBackend := shareBackendService(apiService)
251-
backendServiceName := makeBackendServiceName(lbName, vals.ClusterID, sharedBackend, cloud.SchemeInternal, "TCP", apiService.Spec.SessionAffinity)
251+
groupedPorts := getPortsAndProtocols(apiService.Spec.Ports)
252+
backendServiceName := makeBackendServiceName(lbName, vals.ClusterID, sharedBackend, cloud.SchemeInternal, groupedPorts, apiService.Spec.SessionAffinity)
252253
backendServiceLink := gce.getBackendServiceLink(backendServiceName)
253254

254255
bs, err := gce.GetRegionBackendService(backendServiceName, gce.region)

0 commit comments

Comments
 (0)