Skip to content

Commit 138b8b8

Browse files
paulsubrata55rsys-spaul
authored andcommitted
Fix in kube-proxy for sctp ipset entries
Kube-proxy will add ipset entries for all node ips for an SCTP nodeport service. This will solve the problem 'SCTP nodeport service is not working for all IPs present in the node when ipvs is enabled. It is working only for node's InternalIP.'
1 parent 0d579bf commit 138b8b8

File tree

2 files changed

+277
-20
lines changed

2 files changed

+277
-20
lines changed

pkg/proxy/ipvs/proxier.go

Lines changed: 38 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1202,42 +1202,57 @@ func (proxier *Proxier) syncProxyRules() {
12021202
// Nodeports need SNAT, unless they're local.
12031203
// ipset call
12041204

1205-
var nodePortSet *IPSet
1205+
var (
1206+
nodePortSet *IPSet
1207+
entries []*utilipset.Entry
1208+
)
1209+
12061210
switch protocol {
12071211
case "tcp":
12081212
nodePortSet = proxier.ipsetList[kubeNodePortSetTCP]
1209-
entry = &utilipset.Entry{
1213+
entries = []*utilipset.Entry{{
12101214
// No need to provide ip info
12111215
Port: svcInfo.NodePort(),
12121216
Protocol: protocol,
12131217
SetType: utilipset.BitmapPort,
1214-
}
1218+
}}
12151219
case "udp":
12161220
nodePortSet = proxier.ipsetList[kubeNodePortSetUDP]
1217-
entry = &utilipset.Entry{
1221+
entries = []*utilipset.Entry{{
12181222
// No need to provide ip info
12191223
Port: svcInfo.NodePort(),
12201224
Protocol: protocol,
12211225
SetType: utilipset.BitmapPort,
1222-
}
1226+
}}
12231227
case "sctp":
12241228
nodePortSet = proxier.ipsetList[kubeNodePortSetSCTP]
1225-
entry = &utilipset.Entry{
1226-
IP: proxier.nodeIP.String(),
1227-
Port: svcInfo.NodePort(),
1228-
Protocol: protocol,
1229-
SetType: utilipset.HashIPPort,
1229+
// Since hash ip:port is used for SCTP, all the nodeIPs to be used in the SCTP ipset entries.
1230+
entries = []*utilipset.Entry{}
1231+
for _, nodeIP := range nodeIPs {
1232+
entries = append(entries, &utilipset.Entry{
1233+
IP: nodeIP.String(),
1234+
Port: svcInfo.NodePort(),
1235+
Protocol: protocol,
1236+
SetType: utilipset.HashIPPort,
1237+
})
12301238
}
12311239
default:
12321240
// It should never hit
12331241
klog.Errorf("Unsupported protocol type: %s", protocol)
12341242
}
12351243
if nodePortSet != nil {
1236-
if valid := nodePortSet.validateEntry(entry); !valid {
1237-
klog.Errorf("%s", fmt.Sprintf(EntryInvalidErr, entry, nodePortSet.Name))
1244+
entryInvalidErr := false
1245+
for _, entry := range entries {
1246+
if valid := nodePortSet.validateEntry(entry); !valid {
1247+
klog.Errorf("%s", fmt.Sprintf(EntryInvalidErr, entry, nodePortSet.Name))
1248+
entryInvalidErr = true
1249+
break
1250+
}
1251+
nodePortSet.activeEntries.Insert(entry.String())
1252+
}
1253+
if entryInvalidErr {
12381254
continue
12391255
}
1240-
nodePortSet.activeEntries.Insert(entry.String())
12411256
}
12421257

12431258
// Add externaltrafficpolicy=local type nodeport entry
@@ -1255,11 +1270,18 @@ func (proxier *Proxier) syncProxyRules() {
12551270
klog.Errorf("Unsupported protocol type: %s", protocol)
12561271
}
12571272
if nodePortLocalSet != nil {
1258-
if valid := nodePortLocalSet.validateEntry(entry); !valid {
1259-
klog.Errorf("%s", fmt.Sprintf(EntryInvalidErr, entry, nodePortLocalSet.Name))
1273+
entryInvalidErr := false
1274+
for _, entry := range entries {
1275+
if valid := nodePortLocalSet.validateEntry(entry); !valid {
1276+
klog.Errorf("%s", fmt.Sprintf(EntryInvalidErr, entry, nodePortLocalSet.Name))
1277+
entryInvalidErr = true
1278+
break
1279+
}
1280+
nodePortLocalSet.activeEntries.Insert(entry.String())
1281+
}
1282+
if entryInvalidErr {
12601283
continue
12611284
}
1262-
nodePortLocalSet.activeEntries.Insert(entry.String())
12631285
}
12641286
}
12651287

pkg/proxy/ipvs/proxier_test.go

Lines changed: 239 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import (
2121
"fmt"
2222
"net"
2323
"reflect"
24+
"sort"
2425
"strings"
2526
"testing"
2627

@@ -695,6 +696,236 @@ func TestNodePort(t *testing.T) {
695696
},
696697
},
697698
},
699+
{
700+
name: "node port service with protocol sctp on a node with multiple nodeIPs",
701+
services: []*v1.Service{
702+
makeTestService("ns1", "svc1", func(svc *v1.Service) {
703+
svc.Spec.Type = "NodePort"
704+
svc.Spec.ClusterIP = "10.20.30.41"
705+
svc.Spec.Ports = []v1.ServicePort{{
706+
Name: "p80",
707+
Port: int32(80),
708+
Protocol: v1.ProtocolSCTP,
709+
NodePort: int32(3001),
710+
}}
711+
}),
712+
},
713+
endpoints: []*v1.Endpoints{
714+
makeTestEndpoints("ns1", "svc1", func(ept *v1.Endpoints) {
715+
ept.Subsets = []v1.EndpointSubset{{
716+
Addresses: []v1.EndpointAddress{{
717+
IP: "10.180.0.1",
718+
}},
719+
Ports: []v1.EndpointPort{{
720+
Name: "p80",
721+
Port: int32(80),
722+
}},
723+
}}
724+
}),
725+
},
726+
nodeIPs: []net.IP{
727+
net.ParseIP("100.101.102.103"),
728+
net.ParseIP("100.101.102.104"),
729+
net.ParseIP("100.101.102.105"),
730+
net.ParseIP("2001:db8::1:1"),
731+
net.ParseIP("2001:db8::1:2"),
732+
net.ParseIP("2001:db8::1:3"),
733+
},
734+
nodePortAddresses: []string{},
735+
expectedIPVS: &ipvstest.FakeIPVS{
736+
Services: map[ipvstest.ServiceKey]*utilipvs.VirtualServer{
737+
{
738+
IP: "10.20.30.41",
739+
Port: 80,
740+
Protocol: "SCTP",
741+
}: {
742+
Address: net.ParseIP("10.20.30.41"),
743+
Protocol: "SCTP",
744+
Port: uint16(80),
745+
Scheduler: "rr",
746+
},
747+
{
748+
IP: "100.101.102.103",
749+
Port: 3001,
750+
Protocol: "SCTP",
751+
}: {
752+
Address: net.ParseIP("100.101.102.103"),
753+
Protocol: "SCTP",
754+
Port: uint16(3001),
755+
Scheduler: "rr",
756+
},
757+
{
758+
IP: "100.101.102.104",
759+
Port: 3001,
760+
Protocol: "SCTP",
761+
}: {
762+
Address: net.ParseIP("100.101.102.104"),
763+
Protocol: "SCTP",
764+
Port: uint16(3001),
765+
Scheduler: "rr",
766+
},
767+
{
768+
IP: "100.101.102.105",
769+
Port: 3001,
770+
Protocol: "SCTP",
771+
}: {
772+
Address: net.ParseIP("100.101.102.105"),
773+
Protocol: "SCTP",
774+
Port: uint16(3001),
775+
Scheduler: "rr",
776+
},
777+
{
778+
IP: "2001:db8::1:1",
779+
Port: 3001,
780+
Protocol: "SCTP",
781+
}: {
782+
Address: net.ParseIP("2001:db8::1:1"),
783+
Protocol: "SCTP",
784+
Port: uint16(3001),
785+
Scheduler: "rr",
786+
},
787+
{
788+
IP: "2001:db8::1:2",
789+
Port: 3001,
790+
Protocol: "SCTP",
791+
}: {
792+
Address: net.ParseIP("2001:db8::1:2"),
793+
Protocol: "SCTP",
794+
Port: uint16(3001),
795+
Scheduler: "rr",
796+
},
797+
{
798+
IP: "2001:db8::1:3",
799+
Port: 3001,
800+
Protocol: "SCTP",
801+
}: {
802+
Address: net.ParseIP("2001:db8::1:3"),
803+
Protocol: "SCTP",
804+
Port: uint16(3001),
805+
Scheduler: "rr",
806+
},
807+
},
808+
Destinations: map[ipvstest.ServiceKey][]*utilipvs.RealServer{
809+
{
810+
IP: "10.20.30.41",
811+
Port: 80,
812+
Protocol: "SCTP",
813+
}: {
814+
{
815+
Address: net.ParseIP("10.180.0.1"),
816+
Port: uint16(80),
817+
Weight: 1,
818+
},
819+
},
820+
{
821+
IP: "100.101.102.103",
822+
Port: 3001,
823+
Protocol: "SCTP",
824+
}: {
825+
{
826+
Address: net.ParseIP("10.180.0.1"),
827+
Port: uint16(80),
828+
Weight: 1,
829+
},
830+
},
831+
{
832+
IP: "100.101.102.104",
833+
Port: 3001,
834+
Protocol: "SCTP",
835+
}: {
836+
{
837+
Address: net.ParseIP("10.180.0.1"),
838+
Port: uint16(80),
839+
Weight: 1,
840+
},
841+
},
842+
{
843+
IP: "100.101.102.105",
844+
Port: 3001,
845+
Protocol: "SCTP",
846+
}: {
847+
{
848+
Address: net.ParseIP("10.180.0.1"),
849+
Port: uint16(80),
850+
Weight: 1,
851+
},
852+
},
853+
{
854+
IP: "2001:db8::1:1",
855+
Port: 3001,
856+
Protocol: "SCTP",
857+
}: {
858+
{
859+
Address: net.ParseIP("10.180.0.1"),
860+
Port: uint16(80),
861+
Weight: 1,
862+
},
863+
},
864+
{
865+
IP: "2001:db8::1:2",
866+
Port: 3001,
867+
Protocol: "SCTP",
868+
}: {
869+
{
870+
Address: net.ParseIP("10.180.0.1"),
871+
Port: uint16(80),
872+
Weight: 1,
873+
},
874+
},
875+
{
876+
IP: "2001:db8::1:3",
877+
Port: 3001,
878+
Protocol: "SCTP",
879+
}: {
880+
{
881+
Address: net.ParseIP("10.180.0.1"),
882+
Port: uint16(80),
883+
Weight: 1,
884+
},
885+
},
886+
},
887+
},
888+
expectedIPSets: netlinktest.ExpectedIPSet{
889+
kubeNodePortSetSCTP: {
890+
{
891+
IP: "100.101.102.103",
892+
Port: 3001,
893+
Protocol: strings.ToLower(string(v1.ProtocolSCTP)),
894+
SetType: utilipset.HashIPPort,
895+
},
896+
{
897+
IP: "100.101.102.104",
898+
Port: 3001,
899+
Protocol: strings.ToLower(string(v1.ProtocolSCTP)),
900+
SetType: utilipset.HashIPPort,
901+
},
902+
{
903+
IP: "100.101.102.105",
904+
Port: 3001,
905+
Protocol: strings.ToLower(string(v1.ProtocolSCTP)),
906+
SetType: utilipset.HashIPPort,
907+
},
908+
{
909+
IP: "2001:db8::1:1",
910+
Port: 3001,
911+
Protocol: strings.ToLower(string(v1.ProtocolSCTP)),
912+
SetType: utilipset.HashIPPort,
913+
},
914+
{
915+
IP: "2001:db8::1:2",
916+
Port: 3001,
917+
Protocol: strings.ToLower(string(v1.ProtocolSCTP)),
918+
SetType: utilipset.HashIPPort,
919+
},
920+
{
921+
IP: "2001:db8::1:3",
922+
Port: 3001,
923+
Protocol: strings.ToLower(string(v1.ProtocolSCTP)),
924+
SetType: utilipset.HashIPPort,
925+
},
926+
},
927+
},
928+
},
698929
}
699930

700931
for _, test := range tests {
@@ -2897,10 +3128,14 @@ func checkIPSet(t *testing.T, fp *Proxier, ipSet netlinktest.ExpectedIPSet) {
28973128
t.Errorf("Check ipset entries failed for ipset: %q, expect %d, got %d", set, len(entries), len(ents))
28983129
continue
28993130
}
2900-
if len(entries) == 1 {
2901-
if ents[0] != entries[0].String() {
2902-
t.Errorf("Check ipset entries failed for ipset: %q", set)
2903-
}
3131+
expectedEntries := []string{}
3132+
for _, entry := range entries {
3133+
expectedEntries = append(expectedEntries, entry.String())
3134+
}
3135+
sort.Strings(ents)
3136+
sort.Strings(expectedEntries)
3137+
if !reflect.DeepEqual(ents, expectedEntries) {
3138+
t.Errorf("Check ipset entries failed for ipset: %q", set)
29043139
}
29053140
}
29063141
}

0 commit comments

Comments
 (0)