Skip to content

Commit f01a9a5

Browse files
authored
Revert "restrict externalTrafficPolicy=Local interpretation only to NodePort and LoadBalancer services (#819)" (#835)
This reverts commit 27ec314.
1 parent 27ec314 commit f01a9a5

File tree

2 files changed

+371
-517
lines changed

2 files changed

+371
-517
lines changed

pkg/controllers/proxy/network_services_controller.go

Lines changed: 371 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ import (
3030
"github.com/vishvananda/netns"
3131
"golang.org/x/net/context"
3232
api "k8s.io/api/core/v1"
33+
"k8s.io/apimachinery/pkg/util/sets"
3334
"k8s.io/client-go/kubernetes"
3435
"k8s.io/client-go/tools/cache"
3536
)
@@ -834,6 +835,376 @@ func hasActiveEndpoints(svc *serviceInfo, endpoints []endpointsInfo) bool {
834835
return false
835836
}
836837

838+
// sync the ipvs service and server details configured to reflect the desired state of services and endpoint
839+
// as learned from services and endpoints information from the api server
840+
func (nsc *NetworkServicesController) syncIpvsServices(serviceInfoMap serviceInfoMap, endpointsInfoMap endpointsInfoMap) error {
841+
var ipvsSvcs []*ipvs.Service
842+
start := time.Now()
843+
844+
defer func() {
845+
endTime := time.Since(start)
846+
if nsc.MetricsEnabled {
847+
metrics.ControllerIpvsServicesSyncTime.Observe(endTime.Seconds())
848+
}
849+
glog.V(1).Infof("sync ipvs services took %v", endTime)
850+
}()
851+
852+
dummyVipInterface, err := nsc.ln.getKubeDummyInterface()
853+
if err != nil {
854+
return errors.New("Failed creating dummy interface: " + err.Error())
855+
}
856+
857+
glog.V(1).Infof("Setting up policy routing required for Direct Server Return functionality.")
858+
err = nsc.ln.setupPolicyRoutingForDSR()
859+
if err != nil {
860+
return errors.New("Failed setup PBR for DSR due to: " + err.Error())
861+
}
862+
glog.V(1).Infof("Custom routing table " + customDSRRouteTableName + " required for Direct Server Return is setup as expected.")
863+
864+
glog.V(1).Infof("Setting up custom route table required to add routes for external IP's.")
865+
err = nsc.ln.setupRoutesForExternalIPForDSR(serviceInfoMap)
866+
if err != nil {
867+
glog.Errorf("Failed setup custom routing table required to add routes for external IP's due to: " + err.Error())
868+
return errors.New("Failed setup custom routing table required to add routes for external IP's due to: " + err.Error())
869+
}
870+
glog.V(1).Infof("Custom routing table " + externalIPRouteTableName + " required for Direct Server Return is setup as expected.")
871+
872+
// map of active services and service endpoints
873+
activeServiceEndpointMap := make(map[string][]string)
874+
875+
ipvsSvcs, err = nsc.ln.ipvsGetServices()
876+
if err != nil {
877+
return errors.New("Failed get list of IPVS services due to: " + err.Error())
878+
}
879+
880+
for k, svc := range serviceInfoMap {
881+
var protocol uint16
882+
883+
switch svc.protocol {
884+
case "tcp":
885+
protocol = syscall.IPPROTO_TCP
886+
case "udp":
887+
protocol = syscall.IPPROTO_UDP
888+
default:
889+
protocol = syscall.IPPROTO_NONE
890+
}
891+
892+
endpoints := endpointsInfoMap[k]
893+
894+
if svc.local && !hasActiveEndpoints(svc, endpoints) {
895+
glog.V(1).Infof("Skipping service %s/%s as it does not have active endpoints\n", svc.namespace, svc.name)
896+
continue
897+
}
898+
899+
// assign cluster IP of the service to the dummy interface so that its routable from the pod's on the node
900+
err := nsc.ln.ipAddrAdd(dummyVipInterface, svc.clusterIP.String(), true)
901+
if err != nil {
902+
continue
903+
}
904+
905+
// create IPVS service for the service to be exposed through the cluster ip
906+
ipvsClusterVipSvc, err := nsc.ln.ipvsAddService(ipvsSvcs, svc.clusterIP, protocol, uint16(svc.port), svc.sessionAffinity, svc.scheduler, svc.flags)
907+
if err != nil {
908+
glog.Errorf("Failed to create ipvs service for cluster ip: %s", err.Error())
909+
continue
910+
}
911+
var clusterServiceId = generateIpPortId(svc.clusterIP.String(), svc.protocol, strconv.Itoa(svc.port))
912+
activeServiceEndpointMap[clusterServiceId] = make([]string, 0)
913+
914+
// create IPVS service for the service to be exposed through the nodeport
915+
var ipvsNodeportSvcs []*ipvs.Service
916+
917+
var nodeServiceIds []string
918+
919+
if svc.nodePort != 0 {
920+
if nsc.nodeportBindOnAllIp {
921+
// bind on all interfaces instead
922+
addrs, err := getAllLocalIPs()
923+
924+
if err != nil {
925+
glog.Errorf("Could not get list of system addresses for ipvs services: %s", err.Error())
926+
continue
927+
}
928+
929+
if len(addrs) == 0 {
930+
glog.Errorf("No IP addresses returned for nodeport service creation!")
931+
continue
932+
}
933+
934+
ipvsNodeportSvcs = make([]*ipvs.Service, len(addrs))
935+
nodeServiceIds = make([]string, len(addrs))
936+
937+
for i, addr := range addrs {
938+
ipvsNodeportSvcs[i], err = nsc.ln.ipvsAddService(ipvsSvcs, addr.IP, protocol, uint16(svc.nodePort), svc.sessionAffinity, svc.scheduler, svc.flags)
939+
if err != nil {
940+
glog.Errorf("Failed to create ipvs service for node port due to: %s", err.Error())
941+
continue
942+
}
943+
944+
nodeServiceIds[i] = generateIpPortId(addr.IP.String(), svc.protocol, strconv.Itoa(svc.nodePort))
945+
activeServiceEndpointMap[nodeServiceIds[i]] = make([]string, 0)
946+
}
947+
} else {
948+
ipvsNodeportSvcs = make([]*ipvs.Service, 1)
949+
ipvsNodeportSvcs[0], err = nsc.ln.ipvsAddService(ipvsSvcs, nsc.nodeIP, protocol, uint16(svc.nodePort), svc.sessionAffinity, svc.scheduler, svc.flags)
950+
if err != nil {
951+
glog.Errorf("Failed to create ipvs service for node port due to: %s", err.Error())
952+
continue
953+
}
954+
955+
nodeServiceIds = make([]string, 1)
956+
nodeServiceIds[0] = generateIpPortId(nsc.nodeIP.String(), svc.protocol, strconv.Itoa(svc.nodePort))
957+
activeServiceEndpointMap[nodeServiceIds[0]] = make([]string, 0)
958+
}
959+
}
960+
961+
externalIpServices := make([]externalIPService, 0)
962+
// create IPVS service for the service to be exposed through the external IP's
963+
// For external IP (which are meant for ingress traffic) Kube-router setsup IPVS services
964+
// based on FWMARK to enable Direct server return functionality. DSR requires a director
965+
// without a VIP http://www.austintek.com/LVS/LVS-HOWTO/HOWTO/LVS-HOWTO.routing_to_VIP-less_director.html
966+
// to avoid martian packets
967+
extIPSet := sets.NewString(svc.externalIPs...)
968+
if !svc.skipLbIps {
969+
extIPSet = extIPSet.Union(sets.NewString(svc.loadBalancerIPs...))
970+
}
971+
972+
for _, externalIP := range extIPSet.List() {
973+
var externalIpServiceId string
974+
if svc.directServerReturn && svc.directServerReturnMethod == "tunnel" {
975+
ipvsExternalIPSvc, err := nsc.ln.ipvsAddFWMarkService(net.ParseIP(externalIP), protocol, uint16(svc.port), svc.sessionAffinity, svc.scheduler, svc.flags)
976+
if err != nil {
977+
glog.Errorf("Failed to create ipvs service for External IP: %s due to: %s", externalIP, err.Error())
978+
continue
979+
}
980+
externalIpServices = append(externalIpServices, externalIPService{ipvsSvc: ipvsExternalIPSvc, externalIp: externalIP})
981+
fwMark := generateFwmark(externalIP, svc.protocol, strconv.Itoa(svc.port))
982+
externalIpServiceId = fmt.Sprint(fwMark)
983+
984+
// ensure there is iptables mangle table rule to FWMARK the packet
985+
err = setupMangleTableRule(externalIP, svc.protocol, strconv.Itoa(svc.port), externalIpServiceId)
986+
if err != nil {
987+
glog.Errorf("Failed to setup mangle table rule to FMWARD the traffic to external IP")
988+
continue
989+
}
990+
991+
// ensure VIP less director. we dont assign VIP to any interface
992+
err = nsc.ln.ipAddrDel(dummyVipInterface, externalIP)
993+
994+
// do policy routing to deliver the packet locally so that IPVS can pick the packet
995+
err = routeVIPTrafficToDirector("0x" + fmt.Sprintf("%x", fwMark))
996+
if err != nil {
997+
glog.Errorf("Failed to setup ip rule to lookup traffic to external IP: %s through custom "+
998+
"route table due to %s", externalIP, err.Error())
999+
continue
1000+
}
1001+
} else {
1002+
// ensure director with vip assigned
1003+
err := nsc.ln.ipAddrAdd(dummyVipInterface, externalIP, true)
1004+
if err != nil && err.Error() != IFACE_HAS_ADDR {
1005+
glog.Errorf("Failed to assign external ip %s to dummy interface %s due to %s", externalIP, KUBE_DUMMY_IF, err.Error())
1006+
}
1007+
1008+
// create IPVS service for the service to be exposed through the external ip
1009+
ipvsExternalIPSvc, err := nsc.ln.ipvsAddService(ipvsSvcs, net.ParseIP(externalIP), protocol, uint16(svc.port), svc.sessionAffinity, svc.scheduler, svc.flags)
1010+
if err != nil {
1011+
glog.Errorf("Failed to create ipvs service for external ip: %s due to %s", externalIP, err.Error())
1012+
continue
1013+
}
1014+
externalIpServices = append(externalIpServices, externalIPService{ipvsSvc: ipvsExternalIPSvc, externalIp: externalIP})
1015+
externalIpServiceId = generateIpPortId(externalIP, svc.protocol, strconv.Itoa(svc.port))
1016+
1017+
// ensure there is NO iptables mangle table rule to FWMARK the packet
1018+
fwMark := fmt.Sprint(generateFwmark(externalIP, svc.protocol, strconv.Itoa(svc.port)))
1019+
err = nsc.ln.cleanupMangleTableRule(externalIP, svc.protocol, strconv.Itoa(svc.port), fwMark)
1020+
if err != nil {
1021+
glog.Errorf("Failed to verify and cleanup any mangle table rule to FMWARD the traffic to external IP due to " + err.Error())
1022+
continue
1023+
}
1024+
}
1025+
1026+
activeServiceEndpointMap[externalIpServiceId] = make([]string, 0)
1027+
for _, endpoint := range endpoints {
1028+
if !svc.local || (svc.local && endpoint.isLocal) {
1029+
activeServiceEndpointMap[externalIpServiceId] = append(activeServiceEndpointMap[externalIpServiceId], endpoint.ip)
1030+
}
1031+
}
1032+
}
1033+
1034+
// add IPVS remote server to the IPVS service
1035+
for _, endpoint := range endpoints {
1036+
dst := ipvs.Destination{
1037+
Address: net.ParseIP(endpoint.ip),
1038+
AddressFamily: syscall.AF_INET,
1039+
Port: uint16(endpoint.port),
1040+
Weight: 1,
1041+
}
1042+
1043+
if !svc.local || (svc.local && endpoint.isLocal) {
1044+
err := nsc.ln.ipvsAddServer(ipvsClusterVipSvc, &dst)
1045+
if err != nil {
1046+
glog.Errorf(err.Error())
1047+
} else {
1048+
activeServiceEndpointMap[clusterServiceId] = append(activeServiceEndpointMap[clusterServiceId], endpoint.ip)
1049+
}
1050+
}
1051+
1052+
if svc.nodePort != 0 {
1053+
for i := 0; i < len(ipvsNodeportSvcs); i++ {
1054+
if !svc.local || (svc.local && endpoint.isLocal) {
1055+
err := nsc.ln.ipvsAddServer(ipvsNodeportSvcs[i], &dst)
1056+
if err != nil {
1057+
glog.Errorf(err.Error())
1058+
} else {
1059+
activeServiceEndpointMap[nodeServiceIds[i]] = append(activeServiceEndpointMap[nodeServiceIds[i]], endpoint.ip)
1060+
}
1061+
}
1062+
}
1063+
}
1064+
1065+
for _, externalIpService := range externalIpServices {
1066+
if svc.local && !endpoint.isLocal {
1067+
continue
1068+
}
1069+
1070+
if svc.directServerReturn && svc.directServerReturnMethod == "tunnel" {
1071+
dst.ConnectionFlags = ipvs.ConnectionFlagTunnel
1072+
}
1073+
1074+
// add server to IPVS service
1075+
err := nsc.ln.ipvsAddServer(externalIpService.ipvsSvc, &dst)
1076+
if err != nil {
1077+
glog.Errorf(err.Error())
1078+
}
1079+
1080+
// For now just support IPVS tunnel mode, we can add other ways of DSR in future
1081+
if svc.directServerReturn && svc.directServerReturnMethod == "tunnel" {
1082+
1083+
podObj, err := nsc.getPodObjectForEndpoint(endpoint.ip)
1084+
if err != nil {
1085+
glog.Errorf("Failed to find endpoint with ip: " + endpoint.ip + ". so skipping peparing endpoint for DSR")
1086+
continue
1087+
}
1088+
1089+
// we are only concerned with endpoint pod running on current node
1090+
if strings.Compare(podObj.Status.HostIP, nsc.nodeIP.String()) != 0 {
1091+
continue
1092+
}
1093+
1094+
containerID := strings.TrimPrefix(podObj.Status.ContainerStatuses[0].ContainerID, "docker://")
1095+
if containerID == "" {
1096+
glog.Errorf("Failed to find container id for the endpoint with ip: " + endpoint.ip + " so skipping peparing endpoint for DSR")
1097+
continue
1098+
}
1099+
1100+
err = nsc.ln.prepareEndpointForDsr(containerID, endpoint.ip, externalIpService.externalIp)
1101+
if err != nil {
1102+
glog.Errorf("Failed to prepare endpoint %s to do direct server return due to %s", endpoint.ip, err.Error())
1103+
}
1104+
}
1105+
}
1106+
}
1107+
}
1108+
1109+
// cleanup stale IPs on dummy interface
1110+
glog.V(1).Info("Cleaning up if any, old service IPs on dummy interface")
1111+
addrActive := make(map[string]bool)
1112+
for k := range activeServiceEndpointMap {
1113+
// verify active and its a generateIpPortId() type service
1114+
if strings.Contains(k, "-") {
1115+
parts := strings.SplitN(k, "-", 3)
1116+
addrActive[parts[0]] = true
1117+
}
1118+
}
1119+
1120+
var addrs []netlink.Addr
1121+
addrs, err = netlink.AddrList(dummyVipInterface, netlink.FAMILY_V4)
1122+
if err != nil {
1123+
return errors.New("Failed to list dummy interface IPs: " + err.Error())
1124+
}
1125+
for _, addr := range addrs {
1126+
isActive := addrActive[addr.IP.String()]
1127+
if !isActive {
1128+
glog.V(1).Infof("Found an IP %s which is no longer needed so cleaning up", addr.IP.String())
1129+
err := nsc.ln.ipAddrDel(dummyVipInterface, addr.IP.String())
1130+
if err != nil {
1131+
glog.Errorf("Failed to delete stale IP %s due to: %s",
1132+
addr.IP.String(), err.Error())
1133+
continue
1134+
}
1135+
}
1136+
}
1137+
1138+
// cleanup stale ipvs service and servers
1139+
glog.V(1).Info("Cleaning up if any, old ipvs service and servers which are no longer needed")
1140+
ipvsSvcs, err = nsc.ln.ipvsGetServices()
1141+
1142+
if err != nil {
1143+
return errors.New("Failed to list IPVS services: " + err.Error())
1144+
}
1145+
var protocol string
1146+
for _, ipvsSvc := range ipvsSvcs {
1147+
if ipvsSvc.Protocol == syscall.IPPROTO_TCP {
1148+
protocol = "tcp"
1149+
} else {
1150+
protocol = "udp"
1151+
}
1152+
var key string
1153+
if ipvsSvc.Address != nil {
1154+
key = generateIpPortId(ipvsSvc.Address.String(), protocol, strconv.Itoa(int(ipvsSvc.Port)))
1155+
} else if ipvsSvc.FWMark != 0 {
1156+
key = fmt.Sprint(ipvsSvc.FWMark)
1157+
} else {
1158+
continue
1159+
}
1160+
1161+
endpoints, ok := activeServiceEndpointMap[key]
1162+
// Only delete the service if it's not there anymore to prevent flapping
1163+
// old: if !ok || len(endpoints) == 0 {
1164+
if !ok {
1165+
glog.V(1).Infof("Found a IPVS service %s which is no longer needed so cleaning up",
1166+
ipvsServiceString(ipvsSvc))
1167+
err := nsc.ln.ipvsDelService(ipvsSvc)
1168+
if err != nil {
1169+
glog.Errorf("Failed to delete stale IPVS service %s due to: %s",
1170+
ipvsServiceString(ipvsSvc), err.Error())
1171+
continue
1172+
}
1173+
} else {
1174+
dsts, err := nsc.ln.ipvsGetDestinations(ipvsSvc)
1175+
if err != nil {
1176+
glog.Errorf("Failed to get list of servers from ipvs service")
1177+
}
1178+
for _, dst := range dsts {
1179+
validEp := false
1180+
for _, ep := range endpoints {
1181+
if ep == dst.Address.String() {
1182+
validEp = true
1183+
break
1184+
}
1185+
}
1186+
if !validEp {
1187+
glog.V(1).Infof("Found a destination %s in service %s which is no longer needed so cleaning up",
1188+
ipvsDestinationString(dst), ipvsServiceString(ipvsSvc))
1189+
err = nsc.ipvsDeleteDestination(ipvsSvc, dst)
1190+
if err != nil {
1191+
glog.Errorf("Failed to delete destination %s from ipvs service %s",
1192+
ipvsDestinationString(dst), ipvsServiceString(ipvsSvc))
1193+
}
1194+
}
1195+
}
1196+
}
1197+
}
1198+
1199+
err = nsc.syncIpvsFirewall()
1200+
if err != nil {
1201+
glog.Errorf("Error syncing ipvs svc iptables rules: %s", err.Error())
1202+
}
1203+
1204+
glog.V(1).Info("IPVS servers and services are synced to desired state")
1205+
return nil
1206+
}
1207+
8371208
func (nsc *NetworkServicesController) getPodObjectForEndpoint(endpointIP string) (*api.Pod, error) {
8381209
for _, obj := range nsc.podLister.List() {
8391210
pod := obj.(*api.Pod)

0 commit comments

Comments
 (0)