Skip to content

Commit bd8a8db

Browse files
authored
Merge pull request kubernetes#81477 from paulsubrata55/kube-proxy-sctp-ipset-fix
Fix in kube-proxy for sctp ipset entries
2 parents c4ccb62 + 138b8b8 commit bd8a8db

File tree

2 files changed

+277
-20
lines changed

2 files changed

+277
-20
lines changed

pkg/proxy/ipvs/proxier.go

Lines changed: 38 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1204,42 +1204,57 @@ func (proxier *Proxier) syncProxyRules() {
12041204
// Nodeports need SNAT, unless they're local.
12051205
// ipset call
12061206

1207-
var nodePortSet *IPSet
1207+
var (
1208+
nodePortSet *IPSet
1209+
entries []*utilipset.Entry
1210+
)
1211+
12081212
switch protocol {
12091213
case "tcp":
12101214
nodePortSet = proxier.ipsetList[kubeNodePortSetTCP]
1211-
entry = &utilipset.Entry{
1215+
entries = []*utilipset.Entry{{
12121216
// No need to provide ip info
12131217
Port: svcInfo.NodePort(),
12141218
Protocol: protocol,
12151219
SetType: utilipset.BitmapPort,
1216-
}
1220+
}}
12171221
case "udp":
12181222
nodePortSet = proxier.ipsetList[kubeNodePortSetUDP]
1219-
entry = &utilipset.Entry{
1223+
entries = []*utilipset.Entry{{
12201224
// No need to provide ip info
12211225
Port: svcInfo.NodePort(),
12221226
Protocol: protocol,
12231227
SetType: utilipset.BitmapPort,
1224-
}
1228+
}}
12251229
case "sctp":
12261230
nodePortSet = proxier.ipsetList[kubeNodePortSetSCTP]
1227-
entry = &utilipset.Entry{
1228-
IP: proxier.nodeIP.String(),
1229-
Port: svcInfo.NodePort(),
1230-
Protocol: protocol,
1231-
SetType: utilipset.HashIPPort,
1231+
// Since hash ip:port is used for SCTP, all the nodeIPs to be used in the SCTP ipset entries.
1232+
entries = []*utilipset.Entry{}
1233+
for _, nodeIP := range nodeIPs {
1234+
entries = append(entries, &utilipset.Entry{
1235+
IP: nodeIP.String(),
1236+
Port: svcInfo.NodePort(),
1237+
Protocol: protocol,
1238+
SetType: utilipset.HashIPPort,
1239+
})
12321240
}
12331241
default:
12341242
// It should never hit
12351243
klog.Errorf("Unsupported protocol type: %s", protocol)
12361244
}
12371245
if nodePortSet != nil {
1238-
if valid := nodePortSet.validateEntry(entry); !valid {
1239-
klog.Errorf("%s", fmt.Sprintf(EntryInvalidErr, entry, nodePortSet.Name))
1246+
entryInvalidErr := false
1247+
for _, entry := range entries {
1248+
if valid := nodePortSet.validateEntry(entry); !valid {
1249+
klog.Errorf("%s", fmt.Sprintf(EntryInvalidErr, entry, nodePortSet.Name))
1250+
entryInvalidErr = true
1251+
break
1252+
}
1253+
nodePortSet.activeEntries.Insert(entry.String())
1254+
}
1255+
if entryInvalidErr {
12401256
continue
12411257
}
1242-
nodePortSet.activeEntries.Insert(entry.String())
12431258
}
12441259

12451260
// Add externaltrafficpolicy=local type nodeport entry
@@ -1257,11 +1272,18 @@ func (proxier *Proxier) syncProxyRules() {
12571272
klog.Errorf("Unsupported protocol type: %s", protocol)
12581273
}
12591274
if nodePortLocalSet != nil {
1260-
if valid := nodePortLocalSet.validateEntry(entry); !valid {
1261-
klog.Errorf("%s", fmt.Sprintf(EntryInvalidErr, entry, nodePortLocalSet.Name))
1275+
entryInvalidErr := false
1276+
for _, entry := range entries {
1277+
if valid := nodePortLocalSet.validateEntry(entry); !valid {
1278+
klog.Errorf("%s", fmt.Sprintf(EntryInvalidErr, entry, nodePortLocalSet.Name))
1279+
entryInvalidErr = true
1280+
break
1281+
}
1282+
nodePortLocalSet.activeEntries.Insert(entry.String())
1283+
}
1284+
if entryInvalidErr {
12621285
continue
12631286
}
1264-
nodePortLocalSet.activeEntries.Insert(entry.String())
12651287
}
12661288
}
12671289

pkg/proxy/ipvs/proxier_test.go

Lines changed: 239 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import (
2121
"fmt"
2222
"net"
2323
"reflect"
24+
"sort"
2425
"strings"
2526
"testing"
2627

@@ -695,6 +696,236 @@ func TestNodePort(t *testing.T) {
695696
},
696697
},
697698
},
699+
{
700+
name: "node port service with protocol sctp on a node with multiple nodeIPs",
701+
services: []*v1.Service{
702+
makeTestService("ns1", "svc1", func(svc *v1.Service) {
703+
svc.Spec.Type = "NodePort"
704+
svc.Spec.ClusterIP = "10.20.30.41"
705+
svc.Spec.Ports = []v1.ServicePort{{
706+
Name: "p80",
707+
Port: int32(80),
708+
Protocol: v1.ProtocolSCTP,
709+
NodePort: int32(3001),
710+
}}
711+
}),
712+
},
713+
endpoints: []*v1.Endpoints{
714+
makeTestEndpoints("ns1", "svc1", func(ept *v1.Endpoints) {
715+
ept.Subsets = []v1.EndpointSubset{{
716+
Addresses: []v1.EndpointAddress{{
717+
IP: "10.180.0.1",
718+
}},
719+
Ports: []v1.EndpointPort{{
720+
Name: "p80",
721+
Port: int32(80),
722+
}},
723+
}}
724+
}),
725+
},
726+
nodeIPs: []net.IP{
727+
net.ParseIP("100.101.102.103"),
728+
net.ParseIP("100.101.102.104"),
729+
net.ParseIP("100.101.102.105"),
730+
net.ParseIP("2001:db8::1:1"),
731+
net.ParseIP("2001:db8::1:2"),
732+
net.ParseIP("2001:db8::1:3"),
733+
},
734+
nodePortAddresses: []string{},
735+
expectedIPVS: &ipvstest.FakeIPVS{
736+
Services: map[ipvstest.ServiceKey]*utilipvs.VirtualServer{
737+
{
738+
IP: "10.20.30.41",
739+
Port: 80,
740+
Protocol: "SCTP",
741+
}: {
742+
Address: net.ParseIP("10.20.30.41"),
743+
Protocol: "SCTP",
744+
Port: uint16(80),
745+
Scheduler: "rr",
746+
},
747+
{
748+
IP: "100.101.102.103",
749+
Port: 3001,
750+
Protocol: "SCTP",
751+
}: {
752+
Address: net.ParseIP("100.101.102.103"),
753+
Protocol: "SCTP",
754+
Port: uint16(3001),
755+
Scheduler: "rr",
756+
},
757+
{
758+
IP: "100.101.102.104",
759+
Port: 3001,
760+
Protocol: "SCTP",
761+
}: {
762+
Address: net.ParseIP("100.101.102.104"),
763+
Protocol: "SCTP",
764+
Port: uint16(3001),
765+
Scheduler: "rr",
766+
},
767+
{
768+
IP: "100.101.102.105",
769+
Port: 3001,
770+
Protocol: "SCTP",
771+
}: {
772+
Address: net.ParseIP("100.101.102.105"),
773+
Protocol: "SCTP",
774+
Port: uint16(3001),
775+
Scheduler: "rr",
776+
},
777+
{
778+
IP: "2001:db8::1:1",
779+
Port: 3001,
780+
Protocol: "SCTP",
781+
}: {
782+
Address: net.ParseIP("2001:db8::1:1"),
783+
Protocol: "SCTP",
784+
Port: uint16(3001),
785+
Scheduler: "rr",
786+
},
787+
{
788+
IP: "2001:db8::1:2",
789+
Port: 3001,
790+
Protocol: "SCTP",
791+
}: {
792+
Address: net.ParseIP("2001:db8::1:2"),
793+
Protocol: "SCTP",
794+
Port: uint16(3001),
795+
Scheduler: "rr",
796+
},
797+
{
798+
IP: "2001:db8::1:3",
799+
Port: 3001,
800+
Protocol: "SCTP",
801+
}: {
802+
Address: net.ParseIP("2001:db8::1:3"),
803+
Protocol: "SCTP",
804+
Port: uint16(3001),
805+
Scheduler: "rr",
806+
},
807+
},
808+
Destinations: map[ipvstest.ServiceKey][]*utilipvs.RealServer{
809+
{
810+
IP: "10.20.30.41",
811+
Port: 80,
812+
Protocol: "SCTP",
813+
}: {
814+
{
815+
Address: net.ParseIP("10.180.0.1"),
816+
Port: uint16(80),
817+
Weight: 1,
818+
},
819+
},
820+
{
821+
IP: "100.101.102.103",
822+
Port: 3001,
823+
Protocol: "SCTP",
824+
}: {
825+
{
826+
Address: net.ParseIP("10.180.0.1"),
827+
Port: uint16(80),
828+
Weight: 1,
829+
},
830+
},
831+
{
832+
IP: "100.101.102.104",
833+
Port: 3001,
834+
Protocol: "SCTP",
835+
}: {
836+
{
837+
Address: net.ParseIP("10.180.0.1"),
838+
Port: uint16(80),
839+
Weight: 1,
840+
},
841+
},
842+
{
843+
IP: "100.101.102.105",
844+
Port: 3001,
845+
Protocol: "SCTP",
846+
}: {
847+
{
848+
Address: net.ParseIP("10.180.0.1"),
849+
Port: uint16(80),
850+
Weight: 1,
851+
},
852+
},
853+
{
854+
IP: "2001:db8::1:1",
855+
Port: 3001,
856+
Protocol: "SCTP",
857+
}: {
858+
{
859+
Address: net.ParseIP("10.180.0.1"),
860+
Port: uint16(80),
861+
Weight: 1,
862+
},
863+
},
864+
{
865+
IP: "2001:db8::1:2",
866+
Port: 3001,
867+
Protocol: "SCTP",
868+
}: {
869+
{
870+
Address: net.ParseIP("10.180.0.1"),
871+
Port: uint16(80),
872+
Weight: 1,
873+
},
874+
},
875+
{
876+
IP: "2001:db8::1:3",
877+
Port: 3001,
878+
Protocol: "SCTP",
879+
}: {
880+
{
881+
Address: net.ParseIP("10.180.0.1"),
882+
Port: uint16(80),
883+
Weight: 1,
884+
},
885+
},
886+
},
887+
},
888+
expectedIPSets: netlinktest.ExpectedIPSet{
889+
kubeNodePortSetSCTP: {
890+
{
891+
IP: "100.101.102.103",
892+
Port: 3001,
893+
Protocol: strings.ToLower(string(v1.ProtocolSCTP)),
894+
SetType: utilipset.HashIPPort,
895+
},
896+
{
897+
IP: "100.101.102.104",
898+
Port: 3001,
899+
Protocol: strings.ToLower(string(v1.ProtocolSCTP)),
900+
SetType: utilipset.HashIPPort,
901+
},
902+
{
903+
IP: "100.101.102.105",
904+
Port: 3001,
905+
Protocol: strings.ToLower(string(v1.ProtocolSCTP)),
906+
SetType: utilipset.HashIPPort,
907+
},
908+
{
909+
IP: "2001:db8::1:1",
910+
Port: 3001,
911+
Protocol: strings.ToLower(string(v1.ProtocolSCTP)),
912+
SetType: utilipset.HashIPPort,
913+
},
914+
{
915+
IP: "2001:db8::1:2",
916+
Port: 3001,
917+
Protocol: strings.ToLower(string(v1.ProtocolSCTP)),
918+
SetType: utilipset.HashIPPort,
919+
},
920+
{
921+
IP: "2001:db8::1:3",
922+
Port: 3001,
923+
Protocol: strings.ToLower(string(v1.ProtocolSCTP)),
924+
SetType: utilipset.HashIPPort,
925+
},
926+
},
927+
},
928+
},
698929
}
699930

700931
for _, test := range tests {
@@ -2897,10 +3128,14 @@ func checkIPSet(t *testing.T, fp *Proxier, ipSet netlinktest.ExpectedIPSet) {
28973128
t.Errorf("Check ipset entries failed for ipset: %q, expect %d, got %d", set, len(entries), len(ents))
28983129
continue
28993130
}
2900-
if len(entries) == 1 {
2901-
if ents[0] != entries[0].String() {
2902-
t.Errorf("Check ipset entries failed for ipset: %q", set)
2903-
}
3131+
expectedEntries := []string{}
3132+
for _, entry := range entries {
3133+
expectedEntries = append(expectedEntries, entry.String())
3134+
}
3135+
sort.Strings(ents)
3136+
sort.Strings(expectedEntries)
3137+
if !reflect.DeepEqual(ents, expectedEntries) {
3138+
t.Errorf("Check ipset entries failed for ipset: %q", set)
29043139
}
29053140
}
29063141
}

0 commit comments

Comments
 (0)