Skip to content

Commit 0d85d16

Browse files
authored
Merge pull request kubernetes#88786 from freehan/externalIP
Fix ExternalTrafficPolicy support for Service ExternalIPs
2 parents b6f1138 + 068963f commit 0d85d16

File tree

8 files changed

+238
-19
lines changed

8 files changed

+238
-19
lines changed

pkg/features/kube_features.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -574,6 +574,13 @@ const (
574574
// medium: HugePages-1Gi
575575
HugePageStorageMediumSize featuregate.Feature = "HugePageStorageMediumSize"
576576

577+
// owner: @freehan
578+
// GA: v1.18
579+
//
580+
// Enable ExternalTrafficPolicy for Service ExternalIPs.
581+
// This is for bug fix #69811
582+
ExternalPolicyForExternalIP featuregate.Feature = "ExternalPolicyForExternalIP"
583+
577584
// owner: @bswartz
578585
// alpha: v1.18
579586
//
@@ -668,6 +675,7 @@ var defaultKubernetesFeatureGates = map[featuregate.Feature]featuregate.FeatureS
668675
ImmutableEphemeralVolumes: {Default: false, PreRelease: featuregate.Alpha},
669676
DefaultIngressClass: {Default: true, PreRelease: featuregate.Beta},
670677
HugePageStorageMediumSize: {Default: false, PreRelease: featuregate.Alpha},
678+
ExternalPolicyForExternalIP: {Default: false, PreRelease: featuregate.GA}, // remove in 1.19
671679
AnyVolumeDataSource: {Default: false, PreRelease: featuregate.Alpha},
672680

673681
// inherited features from generic apiserver, relisted here to get a conflict if it is changed

pkg/proxy/iptables/BUILD

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ go_test(
3939
srcs = ["proxier_test.go"],
4040
embed = [":go_default_library"],
4141
deps = [
42+
"//pkg/features:go_default_library",
4243
"//pkg/proxy:go_default_library",
4344
"//pkg/proxy/healthcheck:go_default_library",
4445
"//pkg/proxy/util:go_default_library",
@@ -53,6 +54,8 @@ go_test(
5354
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
5455
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",
5556
"//staging/src/k8s.io/apimachinery/pkg/util/intstr:go_default_library",
57+
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
58+
"//staging/src/k8s.io/component-base/featuregate/testing:go_default_library",
5659
"//vendor/github.com/stretchr/testify/assert:go_default_library",
5760
"//vendor/k8s.io/klog:go_default_library",
5861
"//vendor/k8s.io/utils/exec:go_default_library",

pkg/proxy/iptables/proxier.go

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1070,8 +1070,13 @@ func (proxier *Proxier) syncProxyRules() {
10701070
"-d", utilproxy.ToCIDR(net.ParseIP(externalIP)),
10711071
"--dport", strconv.Itoa(svcInfo.Port()),
10721072
)
1073-
// We have to SNAT packets to external IPs.
1074-
writeLine(proxier.natRules, append(args, "-j", string(KubeMarkMasqChain))...)
1073+
1074+
destChain := svcXlbChain
1075+
// We have to SNAT packets to external IPs if externalTrafficPolicy is cluster.
1076+
if !(utilfeature.DefaultFeatureGate.Enabled(features.ExternalPolicyForExternalIP) && svcInfo.OnlyNodeLocalEndpoints()) {
1077+
destChain = svcChain
1078+
writeLine(proxier.natRules, append(args, "-j", string(KubeMarkMasqChain))...)
1079+
}
10751080

10761081
// Allow traffic for external IPs that does not come from a bridge (i.e. not from a container)
10771082
// nor from a local process to be forwarded to the service.
@@ -1080,11 +1085,11 @@ func (proxier *Proxier) syncProxyRules() {
10801085
externalTrafficOnlyArgs := append(args,
10811086
"-m", "physdev", "!", "--physdev-is-in",
10821087
"-m", "addrtype", "!", "--src-type", "LOCAL")
1083-
writeLine(proxier.natRules, append(externalTrafficOnlyArgs, "-j", string(svcChain))...)
1088+
writeLine(proxier.natRules, append(externalTrafficOnlyArgs, "-j", string(destChain))...)
10841089
dstLocalOnlyArgs := append(args, "-m", "addrtype", "--dst-type", "LOCAL")
10851090
// Allow traffic bound for external IPs that happen to be recognized as local IPs to stay local.
10861091
// This covers cases like GCE load-balancers which get added to the local routing table.
1087-
writeLine(proxier.natRules, append(dstLocalOnlyArgs, "-j", string(svcChain))...)
1092+
writeLine(proxier.natRules, append(dstLocalOnlyArgs, "-j", string(destChain))...)
10881093
} else {
10891094
// No endpoints.
10901095
writeLine(proxier.filterRules,

pkg/proxy/iptables/proxier_test.go

Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,9 @@ import (
3434
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
3535
"k8s.io/apimachinery/pkg/types"
3636
"k8s.io/apimachinery/pkg/util/intstr"
37+
utilfeature "k8s.io/apiserver/pkg/util/feature"
38+
featuregatetesting "k8s.io/component-base/featuregate/testing"
39+
"k8s.io/kubernetes/pkg/features"
3740
"k8s.io/kubernetes/pkg/proxy"
3841
"k8s.io/kubernetes/pkg/proxy/healthcheck"
3942
utilproxy "k8s.io/kubernetes/pkg/proxy/util"
@@ -832,6 +835,80 @@ func TestExternalIPsReject(t *testing.T) {
832835
}
833836
}
834837

838+
func TestOnlyLocalExternalIPs(t *testing.T) {
839+
// TODO(freehan): remove this in k8s 1.19
840+
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.ExternalPolicyForExternalIP, true)()
841+
842+
ipt := iptablestest.NewFake()
843+
fp := NewFakeProxier(ipt, false)
844+
svcIP := "10.20.30.41"
845+
svcPort := 80
846+
svcExternalIPs := "50.60.70.81"
847+
svcPortName := proxy.ServicePortName{
848+
NamespacedName: makeNSN("ns1", "svc1"),
849+
Port: "p80",
850+
}
851+
852+
makeServiceMap(fp,
853+
makeTestService(svcPortName.Namespace, svcPortName.Name, func(svc *v1.Service) {
854+
svc.Spec.Type = "NodePort"
855+
svc.Spec.ExternalTrafficPolicy = v1.ServiceExternalTrafficPolicyTypeLocal
856+
svc.Spec.ClusterIP = svcIP
857+
svc.Spec.ExternalIPs = []string{svcExternalIPs}
858+
svc.Spec.Ports = []v1.ServicePort{{
859+
Name: svcPortName.Port,
860+
Port: int32(svcPort),
861+
Protocol: v1.ProtocolTCP,
862+
TargetPort: intstr.FromInt(svcPort),
863+
}}
864+
}),
865+
)
866+
makeEndpointsMap(fp)
867+
epIP1 := "10.180.0.1"
868+
epIP2 := "10.180.2.1"
869+
epStrLocal := fmt.Sprintf("%s:%d", epIP1, svcPort)
870+
epStrNonLocal := fmt.Sprintf("%s:%d", epIP2, svcPort)
871+
makeEndpointsMap(fp,
872+
makeTestEndpoints(svcPortName.Namespace, svcPortName.Name, func(ept *v1.Endpoints) {
873+
ept.Subsets = []v1.EndpointSubset{{
874+
Addresses: []v1.EndpointAddress{{
875+
IP: epIP1,
876+
NodeName: nil,
877+
}, {
878+
IP: epIP2,
879+
NodeName: utilpointer.StringPtr(testHostname),
880+
}},
881+
Ports: []v1.EndpointPort{{
882+
Name: svcPortName.Port,
883+
Port: int32(svcPort),
884+
Protocol: v1.ProtocolTCP,
885+
}},
886+
}}
887+
}),
888+
)
889+
890+
fp.syncProxyRules()
891+
892+
proto := strings.ToLower(string(v1.ProtocolTCP))
893+
lbChain := string(serviceLBChainName(svcPortName.String(), proto))
894+
895+
nonLocalEpChain := string(servicePortEndpointChainName(svcPortName.String(), strings.ToLower(string(v1.ProtocolTCP)), epStrLocal))
896+
localEpChain := string(servicePortEndpointChainName(svcPortName.String(), strings.ToLower(string(v1.ProtocolTCP)), epStrNonLocal))
897+
898+
kubeSvcRules := ipt.GetRules(string(kubeServicesChain))
899+
if !hasJump(kubeSvcRules, lbChain, svcExternalIPs, svcPort) {
900+
errorf(fmt.Sprintf("Failed to find jump to xlb chain %v", lbChain), kubeSvcRules, t)
901+
}
902+
903+
lbRules := ipt.GetRules(lbChain)
904+
if hasJump(lbRules, nonLocalEpChain, "", 0) {
905+
errorf(fmt.Sprintf("Found jump from lb chain %v to non-local ep %v", lbChain, epStrLocal), lbRules, t)
906+
}
907+
if !hasJump(lbRules, localEpChain, "", 0) {
908+
errorf(fmt.Sprintf("Didn't find jump from lb chain %v to local ep %v", lbChain, epStrNonLocal), lbRules, t)
909+
}
910+
}
911+
835912
func TestNodePortReject(t *testing.T) {
836913
ipt := iptablestest.NewFake()
837914
fp := NewFakeProxier(ipt, false)

pkg/proxy/ipvs/BUILD

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ go_test(
1515
],
1616
embed = [":go_default_library"],
1717
deps = [
18+
"//pkg/features:go_default_library",
1819
"//pkg/proxy:go_default_library",
1920
"//pkg/proxy/healthcheck:go_default_library",
2021
"//pkg/proxy/ipvs/testing:go_default_library",
@@ -34,6 +35,8 @@ go_test(
3435
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",
3536
"//staging/src/k8s.io/apimachinery/pkg/util/intstr:go_default_library",
3637
"//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library",
38+
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
39+
"//staging/src/k8s.io/component-base/featuregate/testing:go_default_library",
3740
"//vendor/github.com/stretchr/testify/assert:go_default_library",
3841
"//vendor/k8s.io/utils/exec:go_default_library",
3942
"//vendor/k8s.io/utils/exec/testing:go_default_library",

pkg/proxy/ipvs/ipset.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,9 @@ const (
4040
kubeExternalIPSetComment = "Kubernetes service external ip + port for masquerade and filter purpose"
4141
kubeExternalIPSet = "KUBE-EXTERNAL-IP"
4242

43+
kubeExternalIPLocalSetComment = "Kubernetes service external ip + port with externalTrafficPolicy=local"
44+
kubeExternalIPLocalSet = "KUBE-EXTERNAL-IP-LOCAL"
45+
4346
kubeLoadBalancerSetComment = "Kubernetes service lb portal"
4447
kubeLoadBalancerSet = "KUBE-LOAD-BALANCER"
4548

pkg/proxy/ipvs/proxier.go

Lines changed: 45 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -140,6 +140,7 @@ var ipsetInfo = []struct {
140140
{kubeLoopBackIPSet, utilipset.HashIPPortIP, kubeLoopBackIPSetComment},
141141
{kubeClusterIPSet, utilipset.HashIPPort, kubeClusterIPSetComment},
142142
{kubeExternalIPSet, utilipset.HashIPPort, kubeExternalIPSetComment},
143+
{kubeExternalIPLocalSet, utilipset.HashIPPort, kubeExternalIPLocalSetComment},
143144
{kubeLoadBalancerSet, utilipset.HashIPPort, kubeLoadBalancerSetComment},
144145
{kubeLoadbalancerFWSet, utilipset.HashIPPort, kubeLoadbalancerFWSetComment},
145146
{kubeLoadBalancerLocalSet, utilipset.HashIPPort, kubeLoadBalancerLocalSetComment},
@@ -1236,12 +1237,21 @@ func (proxier *Proxier) syncProxyRules() {
12361237
Protocol: protocol,
12371238
SetType: utilipset.HashIPPort,
12381239
}
1239-
// We have to SNAT packets to external IPs.
1240-
if valid := proxier.ipsetList[kubeExternalIPSet].validateEntry(entry); !valid {
1241-
klog.Errorf("%s", fmt.Sprintf(EntryInvalidErr, entry, proxier.ipsetList[kubeExternalIPSet].Name))
1242-
continue
1240+
1241+
if utilfeature.DefaultFeatureGate.Enabled(features.ExternalPolicyForExternalIP) && svcInfo.OnlyNodeLocalEndpoints() {
1242+
if valid := proxier.ipsetList[kubeExternalIPLocalSet].validateEntry(entry); !valid {
1243+
klog.Errorf("%s", fmt.Sprintf(EntryInvalidErr, entry, proxier.ipsetList[kubeExternalIPLocalSet].Name))
1244+
continue
1245+
}
1246+
proxier.ipsetList[kubeExternalIPLocalSet].activeEntries.Insert(entry.String())
1247+
} else {
1248+
// We have to SNAT packets to external IPs.
1249+
if valid := proxier.ipsetList[kubeExternalIPSet].validateEntry(entry); !valid {
1250+
klog.Errorf("%s", fmt.Sprintf(EntryInvalidErr, entry, proxier.ipsetList[kubeExternalIPSet].Name))
1251+
continue
1252+
}
1253+
proxier.ipsetList[kubeExternalIPSet].activeEntries.Insert(entry.String())
12431254
}
1244-
proxier.ipsetList[kubeExternalIPSet].activeEntries.Insert(entry.String())
12451255

12461256
// ipvs call
12471257
serv := &utilipvs.VirtualServer{
@@ -1257,7 +1267,12 @@ func (proxier *Proxier) syncProxyRules() {
12571267
if err := proxier.syncService(svcNameString, serv, true); err == nil {
12581268
activeIPVSServices[serv.String()] = true
12591269
activeBindAddrs[serv.Address.String()] = true
1260-
if err := proxier.syncEndpoint(svcName, false, serv); err != nil {
1270+
1271+
onlyNodeLocalEndpoints := false
1272+
if utilfeature.DefaultFeatureGate.Enabled(features.ExternalPolicyForExternalIP) {
1273+
onlyNodeLocalEndpoints = svcInfo.OnlyNodeLocalEndpoints()
1274+
}
1275+
if err := proxier.syncEndpoint(svcName, onlyNodeLocalEndpoints, serv); err != nil {
12611276
klog.Errorf("Failed to sync endpoint for service: %v, err: %v", serv, err)
12621277
}
12631278
} else {
@@ -1668,15 +1683,8 @@ func (proxier *Proxier) writeIptablesRules() {
16681683
}
16691684
}
16701685

1671-
if !proxier.ipsetList[kubeExternalIPSet].isEmpty() {
1672-
// Build masquerade rules for packets to external IPs.
1673-
args = append(args[:0],
1674-
"-A", string(kubeServicesChain),
1675-
"-m", "comment", "--comment", proxier.ipsetList[kubeExternalIPSet].getComment(),
1676-
"-m", "set", "--match-set", proxier.ipsetList[kubeExternalIPSet].Name,
1677-
"dst,dst",
1678-
)
1679-
writeLine(proxier.natRules, append(args, "-j", string(KubeMarkMasqChain))...)
1686+
// externalIPRules adds iptables rules applies to Service ExternalIPs
1687+
externalIPRules := func(args []string) {
16801688
// Allow traffic for external IPs that does not come from a bridge (i.e. not from a container)
16811689
// nor from a local process to be forwarded to the service.
16821690
// This rule roughly translates to "all traffic from off-machine".
@@ -1691,6 +1699,28 @@ func (proxier *Proxier) writeIptablesRules() {
16911699
writeLine(proxier.natRules, append(dstLocalOnlyArgs, "-j", "ACCEPT")...)
16921700
}
16931701

1702+
if !proxier.ipsetList[kubeExternalIPSet].isEmpty() {
1703+
// Build masquerade rules for packets to external IPs.
1704+
args = append(args[:0],
1705+
"-A", string(kubeServicesChain),
1706+
"-m", "comment", "--comment", proxier.ipsetList[kubeExternalIPSet].getComment(),
1707+
"-m", "set", "--match-set", proxier.ipsetList[kubeExternalIPSet].Name,
1708+
"dst,dst",
1709+
)
1710+
writeLine(proxier.natRules, append(args, "-j", string(KubeMarkMasqChain))...)
1711+
externalIPRules(args)
1712+
}
1713+
1714+
if !proxier.ipsetList[kubeExternalIPLocalSet].isEmpty() {
1715+
args = append(args[:0],
1716+
"-A", string(kubeServicesChain),
1717+
"-m", "comment", "--comment", proxier.ipsetList[kubeExternalIPLocalSet].getComment(),
1718+
"-m", "set", "--match-set", proxier.ipsetList[kubeExternalIPLocalSet].Name,
1719+
"dst,dst",
1720+
)
1721+
externalIPRules(args)
1722+
}
1723+
16941724
// -A KUBE-SERVICES -m addrtype --dst-type LOCAL -j KUBE-NODE-PORT
16951725
args = append(args[:0],
16961726
"-A", string(kubeServicesChain),

pkg/proxy/ipvs/proxier_test.go

Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,9 @@ import (
3333
"k8s.io/apimachinery/pkg/types"
3434
"k8s.io/apimachinery/pkg/util/intstr"
3535
"k8s.io/apimachinery/pkg/util/sets"
36+
utilfeature "k8s.io/apiserver/pkg/util/feature"
37+
featuregatetesting "k8s.io/component-base/featuregate/testing"
38+
"k8s.io/kubernetes/pkg/features"
3639
"k8s.io/kubernetes/pkg/proxy"
3740
"k8s.io/kubernetes/pkg/proxy/healthcheck"
3841
netlinktest "k8s.io/kubernetes/pkg/proxy/ipvs/testing"
@@ -1256,6 +1259,92 @@ func TestExternalIPs(t *testing.T) {
12561259
}
12571260
}
12581261

1262+
func TestOnlyLocalExternalIPs(t *testing.T) {
1263+
// TODO(freehan): remove this in k8s 1.19
1264+
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.ExternalPolicyForExternalIP, true)()
1265+
1266+
ipt := iptablestest.NewFake()
1267+
ipvs := ipvstest.NewFake()
1268+
ipset := ipsettest.NewFake(testIPSetVersion)
1269+
fp := NewFakeProxier(ipt, ipvs, ipset, nil, nil, false)
1270+
svcIP := "10.20.30.41"
1271+
svcPort := 80
1272+
svcExternalIPs := sets.NewString("50.60.70.81", "2012::51", "127.0.0.1")
1273+
svcPortName := proxy.ServicePortName{
1274+
NamespacedName: makeNSN("ns1", "svc1"),
1275+
Port: "p80",
1276+
}
1277+
1278+
makeServiceMap(fp,
1279+
makeTestService(svcPortName.Namespace, svcPortName.Name, func(svc *v1.Service) {
1280+
svc.Spec.Type = "NodePort"
1281+
svc.Spec.ClusterIP = svcIP
1282+
svc.Spec.ExternalIPs = svcExternalIPs.UnsortedList()
1283+
svc.Spec.Ports = []v1.ServicePort{{
1284+
Name: svcPortName.Port,
1285+
Port: int32(svcPort),
1286+
Protocol: v1.ProtocolTCP,
1287+
TargetPort: intstr.FromInt(svcPort),
1288+
}}
1289+
svc.Spec.ExternalTrafficPolicy = v1.ServiceExternalTrafficPolicyTypeLocal
1290+
}),
1291+
)
1292+
epIP := "10.180.0.1"
1293+
epIP1 := "10.180.1.1"
1294+
thisHostname := testHostname
1295+
otherHostname := "other-hostname"
1296+
makeEndpointsMap(fp,
1297+
makeTestEndpoints(svcPortName.Namespace, svcPortName.Name, func(ept *v1.Endpoints) {
1298+
ept.Subsets = []v1.EndpointSubset{{
1299+
Addresses: []v1.EndpointAddress{{
1300+
IP: epIP,
1301+
NodeName: utilpointer.StringPtr(thisHostname),
1302+
},
1303+
{
1304+
IP: epIP1,
1305+
NodeName: utilpointer.StringPtr(otherHostname),
1306+
},
1307+
},
1308+
Ports: []v1.EndpointPort{{
1309+
Name: svcPortName.Port,
1310+
Port: int32(svcPort),
1311+
Protocol: v1.ProtocolTCP,
1312+
}},
1313+
}}
1314+
}),
1315+
)
1316+
1317+
fp.syncProxyRules()
1318+
1319+
// check ipvs service and destinations
1320+
services, err := ipvs.GetVirtualServers()
1321+
if err != nil {
1322+
t.Errorf("Failed to get ipvs services, err: %v", err)
1323+
}
1324+
if len(services) != 4 {
1325+
t.Errorf("Expect 4 ipvs services, got %d", len(services))
1326+
}
1327+
found := false
1328+
for _, svc := range services {
1329+
if svcExternalIPs.Has(svc.Address.String()) && svc.Port == uint16(svcPort) && svc.Protocol == string(v1.ProtocolTCP) {
1330+
found = true
1331+
destinations, _ := ipvs.GetRealServers(svc)
1332+
if len(destinations) != 1 {
1333+
t.Errorf("Expect only 1 local endpoint. but got %v", len(destinations))
1334+
}
1335+
for _, dest := range destinations {
1336+
if dest.Address.String() != epIP || dest.Port != uint16(svcPort) {
1337+
t.Errorf("service Endpoint mismatch ipvs service destination")
1338+
}
1339+
}
1340+
break
1341+
}
1342+
}
1343+
if !found {
1344+
t.Errorf("Expect external ip type service, got none")
1345+
}
1346+
}
1347+
12591348
func TestLoadBalancer(t *testing.T) {
12601349
ipt, fp := buildFakeProxier()
12611350
svcIP := "10.20.30.41"
@@ -1432,6 +1521,7 @@ func TestOnlyLocalNodePorts(t *testing.T) {
14321521
}
14331522
checkIptables(t, ipt, epIpt)
14341523
}
1524+
14351525
func TestLoadBalanceSourceRanges(t *testing.T) {
14361526
ipt, fp := buildFakeProxier()
14371527

0 commit comments

Comments
 (0)