Skip to content

Commit c5b3dba

Browse files
authored
Merge pull request kubernetes#92109 from kishorj/nlb_udp_support
Allow UDP for AWS NLB
2 parents 3c31a00 + f76c21c commit c5b3dba

File tree

3 files changed

+95
-22
lines changed

3 files changed

+95
-22
lines changed

staging/src/k8s.io/legacy-cloud-providers/aws/aws.go

Lines changed: 22 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -3691,9 +3691,10 @@ func (c *Cloud) EnsureLoadBalancer(ctx context.Context, clusterName string, apiS
36913691

36923692
sslPorts := getPortSets(annotations[ServiceAnnotationLoadBalancerSSLPorts])
36933693
for _, port := range apiService.Spec.Ports {
3694-
if port.Protocol != v1.ProtocolTCP {
3695-
return nil, fmt.Errorf("Only TCP LoadBalancer is supported for AWS ELB")
3694+
if err := checkProtocol(port, annotations); err != nil {
3695+
return nil, err
36963696
}
3697+
36973698
if port.NodePort == 0 {
36983699
klog.Errorf("Ignoring port without NodePort defined: %v", port)
36993700
continue
@@ -3713,7 +3714,7 @@ func (c *Cloud) EnsureLoadBalancer(ctx context.Context, clusterName string, apiS
37133714
}
37143715

37153716
certificateARN := annotations[ServiceAnnotationLoadBalancerCertificate]
3716-
if certificateARN != "" && (sslPorts == nil || sslPorts.numbers.Has(int64(port.Port)) || sslPorts.names.Has(port.Name)) {
3717+
if port.Protocol != v1.ProtocolUDP && certificateARN != "" && (sslPorts == nil || sslPorts.numbers.Has(int64(port.Port)) || sslPorts.names.Has(port.Name)) {
37173718
portMapping.FrontendProtocol = elbv2.ProtocolEnumTls
37183719
portMapping.SSLCertificateARN = certificateARN
37193720
portMapping.SSLPolicy = annotations[ServiceAnnotationLoadBalancerSSLNegotiationPolicy]
@@ -3724,12 +3725,13 @@ func (c *Cloud) EnsureLoadBalancer(ctx context.Context, clusterName string, apiS
37243725
}
37253726

37263727
v2Mappings = append(v2Mappings, portMapping)
3728+
} else {
3729+
listener, err := buildListener(port, annotations, sslPorts)
3730+
if err != nil {
3731+
return nil, err
3732+
}
3733+
listeners = append(listeners, listener)
37273734
}
3728-
listener, err := buildListener(port, annotations, sslPorts)
3729-
if err != nil {
3730-
return nil, err
3731-
}
3732-
listeners = append(listeners, listener)
37333735
}
37343736

37353737
if apiService.Spec.LoadBalancerIP != "" {
@@ -4762,6 +4764,18 @@ func (c *Cloud) nodeNameToProviderID(nodeName types.NodeName) (InstanceID, error
47624764
return KubernetesInstanceID(node.Spec.ProviderID).MapToAWSInstanceID()
47634765
}
47644766

4767+
func checkProtocol(port v1.ServicePort, annotations map[string]string) error {
4768+
// nlb supports tcp, udp
4769+
if isNLB(annotations) && (port.Protocol == v1.ProtocolTCP || port.Protocol == v1.ProtocolUDP) {
4770+
return nil
4771+
}
4772+
// elb only supports tcp
4773+
if !isNLB(annotations) && port.Protocol == v1.ProtocolTCP {
4774+
return nil
4775+
}
4776+
return fmt.Errorf("Protocol %s not supported by LoadBalancer", port.Protocol)
4777+
}
4778+
47654779
func setNodeDisk(
47664780
nodeDiskMap map[types.NodeName]map[KubernetesVolumeID]bool,
47674781
volumeID KubernetesVolumeID,

staging/src/k8s.io/legacy-cloud-providers/aws/aws_loadbalancer.go

Lines changed: 26 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -185,9 +185,12 @@ func (c *Cloud) ensureLoadBalancerv2(namespacedName types.NamespacedName, loadBa
185185
}
186186

187187
// actual maps FrontendPort to an elbv2.Listener
188-
actual := map[int64]*elbv2.Listener{}
188+
actual := map[int64]map[string]*elbv2.Listener{}
189189
for _, listener := range listenerDescriptions.Listeners {
190-
actual[*listener.Port] = listener
190+
if actual[*listener.Port] == nil {
191+
actual[*listener.Port] = map[string]*elbv2.Listener{}
192+
}
193+
actual[*listener.Port][*listener.Protocol] = listener
191194
}
192195

193196
actualTargetGroups, err := c.elbv2.DescribeTargetGroups(
@@ -207,10 +210,11 @@ func (c *Cloud) ensureLoadBalancerv2(namespacedName types.NamespacedName, loadBa
207210
// Handle additions/modifications
208211
for _, mapping := range mappings {
209212
frontendPort := mapping.FrontendPort
213+
frontendProtocol := mapping.FrontendProtocol
210214
nodePort := mapping.TrafficPort
211215

212216
// modifications
213-
if listener, ok := actual[frontendPort]; ok {
217+
if listener, ok := actual[frontendPort][frontendProtocol]; ok {
214218
listenerNeedsModification := false
215219

216220
if aws.StringValue(listener.Protocol) != mapping.FrontendProtocol {
@@ -315,23 +319,27 @@ func (c *Cloud) ensureLoadBalancerv2(namespacedName types.NamespacedName, loadBa
315319
dirty = true
316320
}
317321

318-
frontEndPorts := map[int64]bool{}
322+
frontEndPorts := map[int64]map[string]bool{}
319323
for i := range mappings {
320-
frontEndPorts[mappings[i].FrontendPort] = true
324+
if frontEndPorts[mappings[i].FrontendPort] == nil {
325+
frontEndPorts[mappings[i].FrontendPort] = map[string]bool{}
326+
}
327+
frontEndPorts[mappings[i].FrontendPort][mappings[i].FrontendProtocol] = true
321328
}
322329

323330
// handle deletions
324-
for port, listener := range actual {
325-
if _, ok := frontEndPorts[port]; !ok {
326-
err := c.deleteListenerV2(listener)
327-
if err != nil {
328-
return nil, err
331+
for port := range actual {
332+
for protocol := range actual[port] {
333+
if _, ok := frontEndPorts[port][protocol]; !ok {
334+
err := c.deleteListenerV2(actual[port][protocol])
335+
if err != nil {
336+
return nil, err
337+
}
338+
dirty = true
329339
}
330-
dirty = true
331340
}
332341
}
333342
}
334-
335343
if err := c.reconcileLBAttributes(aws.StringValue(loadBalancer.LoadBalancerArn), annotations); err != nil {
336344
return nil, err
337345
}
@@ -768,10 +776,14 @@ func (c *Cloud) updateInstanceSecurityGroupsForNLB(lbName string, instances map[
768776

769777
{
770778
clientPorts := sets.Int64{}
779+
clientProtocol := "tcp"
771780
healthCheckPorts := sets.Int64{}
772781
for _, port := range portMappings {
773782
clientPorts.Insert(port.TrafficPort)
774783
healthCheckPorts.Insert(port.HealthCheckPort)
784+
if port.TrafficProtocol == string(v1.ProtocolUDP) {
785+
clientProtocol = "udp"
786+
}
775787
}
776788
clientRuleAnnotation := fmt.Sprintf("%s=%s", NLBClientRuleDescription, lbName)
777789
healthRuleAnnotation := fmt.Sprintf("%s=%s", NLBHealthCheckRuleDescription, lbName)
@@ -785,14 +797,14 @@ func (c *Cloud) updateInstanceSecurityGroupsForNLB(lbName string, instances map[
785797
if err := c.updateInstanceSecurityGroupForNLBTraffic(sgID, sgPerms, healthRuleAnnotation, "tcp", healthCheckPorts, vpcCIDRs); err != nil {
786798
return err
787799
}
788-
if err := c.updateInstanceSecurityGroupForNLBTraffic(sgID, sgPerms, clientRuleAnnotation, "tcp", clientPorts, clientCIDRs); err != nil {
800+
if err := c.updateInstanceSecurityGroupForNLBTraffic(sgID, sgPerms, clientRuleAnnotation, clientProtocol, clientPorts, clientCIDRs); err != nil {
789801
return err
790802
}
791803
} else {
792804
if err := c.updateInstanceSecurityGroupForNLBTraffic(sgID, sgPerms, healthRuleAnnotation, "tcp", nil, nil); err != nil {
793805
return err
794806
}
795-
if err := c.updateInstanceSecurityGroupForNLBTraffic(sgID, sgPerms, clientRuleAnnotation, "tcp", nil, nil); err != nil {
807+
if err := c.updateInstanceSecurityGroupForNLBTraffic(sgID, sgPerms, clientRuleAnnotation, clientProtocol, nil, nil); err != nil {
796808
return err
797809
}
798810
}

staging/src/k8s.io/legacy-cloud-providers/aws/aws_test.go

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1371,6 +1371,53 @@ func TestDescribeLoadBalancerOnEnsure(t *testing.T) {
13711371
c.EnsureLoadBalancer(context.TODO(), TestClusterName, &v1.Service{ObjectMeta: metav1.ObjectMeta{Name: "myservice", UID: "id"}}, []*v1.Node{})
13721372
}
13731373

1374+
func TestCheckProtocol(t *testing.T) {
1375+
tests := []struct {
1376+
name string
1377+
annotations map[string]string
1378+
port v1.ServicePort
1379+
wantErr error
1380+
}{
1381+
{
1382+
name: "TCP with ELB",
1383+
annotations: make(map[string]string),
1384+
port: v1.ServicePort{Protocol: v1.ProtocolTCP, Port: int32(8080)},
1385+
wantErr: nil,
1386+
},
1387+
{
1388+
name: "TCP with NLB",
1389+
annotations: map[string]string{ServiceAnnotationLoadBalancerType: "nlb"},
1390+
port: v1.ServicePort{Protocol: v1.ProtocolTCP, Port: int32(8080)},
1391+
wantErr: nil,
1392+
},
1393+
{
1394+
name: "UDP with ELB",
1395+
annotations: make(map[string]string),
1396+
port: v1.ServicePort{Protocol: v1.ProtocolUDP, Port: int32(8080)},
1397+
wantErr: fmt.Errorf("Protocol UDP not supported by load balancer"),
1398+
},
1399+
{
1400+
name: "UDP with NLB",
1401+
annotations: map[string]string{ServiceAnnotationLoadBalancerType: "nlb"},
1402+
port: v1.ServicePort{Protocol: v1.ProtocolUDP, Port: int32(8080)},
1403+
wantErr: nil,
1404+
},
1405+
}
1406+
for _, test := range tests {
1407+
tt := test
1408+
t.Run(tt.name, func(t *testing.T) {
1409+
t.Parallel()
1410+
err := checkProtocol(tt.port, tt.annotations)
1411+
if tt.wantErr != nil && err == nil {
1412+
t.Errorf("Expected error: want=%s got =%s", tt.wantErr, err)
1413+
}
1414+
if tt.wantErr == nil && err != nil {
1415+
t.Errorf("Unexpected error: want=%s got =%s", tt.wantErr, err)
1416+
}
1417+
})
1418+
}
1419+
}
1420+
13741421
func TestBuildListener(t *testing.T) {
13751422
tests := []struct {
13761423
name string

0 commit comments

Comments
 (0)