Skip to content

Commit 230ff15

Browse files
authored
restrict externalTrafficPolicy=Local interpretation only to NodePort and LoadBalancer services (#836)
* restrict externalTrafficPolicy=Local interpretation only to NodePort and LoadBalancer services Fixes #818 * addressing review comments
1 parent 5671c3a commit 230ff15

File tree

2 files changed

+526
-371
lines changed

2 files changed

+526
-371
lines changed

pkg/controllers/proxy/network_services_controller.go

Lines changed: 0 additions & 371 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,6 @@ import (
3030
"github.com/vishvananda/netns"
3131
"golang.org/x/net/context"
3232
api "k8s.io/api/core/v1"
33-
"k8s.io/apimachinery/pkg/util/sets"
3433
"k8s.io/client-go/kubernetes"
3534
"k8s.io/client-go/tools/cache"
3635
)
@@ -835,376 +834,6 @@ func hasActiveEndpoints(svc *serviceInfo, endpoints []endpointsInfo) bool {
835834
return false
836835
}
837836

838-
// sync the ipvs service and server details configured to reflect the desired state of services and endpoint
839-
// as learned from services and endpoints information from the api server
840-
func (nsc *NetworkServicesController) syncIpvsServices(serviceInfoMap serviceInfoMap, endpointsInfoMap endpointsInfoMap) error {
841-
var ipvsSvcs []*ipvs.Service
842-
start := time.Now()
843-
844-
defer func() {
845-
endTime := time.Since(start)
846-
if nsc.MetricsEnabled {
847-
metrics.ControllerIpvsServicesSyncTime.Observe(endTime.Seconds())
848-
}
849-
glog.V(1).Infof("sync ipvs services took %v", endTime)
850-
}()
851-
852-
dummyVipInterface, err := nsc.ln.getKubeDummyInterface()
853-
if err != nil {
854-
return errors.New("Failed creating dummy interface: " + err.Error())
855-
}
856-
857-
glog.V(1).Infof("Setting up policy routing required for Direct Server Return functionality.")
858-
err = nsc.ln.setupPolicyRoutingForDSR()
859-
if err != nil {
860-
return errors.New("Failed setup PBR for DSR due to: " + err.Error())
861-
}
862-
glog.V(1).Infof("Custom routing table " + customDSRRouteTableName + " required for Direct Server Return is setup as expected.")
863-
864-
glog.V(1).Infof("Setting up custom route table required to add routes for external IP's.")
865-
err = nsc.ln.setupRoutesForExternalIPForDSR(serviceInfoMap)
866-
if err != nil {
867-
glog.Errorf("Failed setup custom routing table required to add routes for external IP's due to: " + err.Error())
868-
return errors.New("Failed setup custom routing table required to add routes for external IP's due to: " + err.Error())
869-
}
870-
glog.V(1).Infof("Custom routing table " + externalIPRouteTableName + " required for Direct Server Return is setup as expected.")
871-
872-
// map of active services and service endpoints
873-
activeServiceEndpointMap := make(map[string][]string)
874-
875-
ipvsSvcs, err = nsc.ln.ipvsGetServices()
876-
if err != nil {
877-
return errors.New("Failed get list of IPVS services due to: " + err.Error())
878-
}
879-
880-
for k, svc := range serviceInfoMap {
881-
var protocol uint16
882-
883-
switch svc.protocol {
884-
case "tcp":
885-
protocol = syscall.IPPROTO_TCP
886-
case "udp":
887-
protocol = syscall.IPPROTO_UDP
888-
default:
889-
protocol = syscall.IPPROTO_NONE
890-
}
891-
892-
endpoints := endpointsInfoMap[k]
893-
894-
if svc.local && !hasActiveEndpoints(svc, endpoints) {
895-
glog.V(1).Infof("Skipping service %s/%s as it does not have active endpoints\n", svc.namespace, svc.name)
896-
continue
897-
}
898-
899-
// assign cluster IP of the service to the dummy interface so that its routable from the pod's on the node
900-
err := nsc.ln.ipAddrAdd(dummyVipInterface, svc.clusterIP.String(), true)
901-
if err != nil {
902-
continue
903-
}
904-
905-
// create IPVS service for the service to be exposed through the cluster ip
906-
ipvsClusterVipSvc, err := nsc.ln.ipvsAddService(ipvsSvcs, svc.clusterIP, protocol, uint16(svc.port), svc.sessionAffinity, svc.scheduler, svc.flags)
907-
if err != nil {
908-
glog.Errorf("Failed to create ipvs service for cluster ip: %s", err.Error())
909-
continue
910-
}
911-
var clusterServiceId = generateIpPortId(svc.clusterIP.String(), svc.protocol, strconv.Itoa(svc.port))
912-
activeServiceEndpointMap[clusterServiceId] = make([]string, 0)
913-
914-
// create IPVS service for the service to be exposed through the nodeport
915-
var ipvsNodeportSvcs []*ipvs.Service
916-
917-
var nodeServiceIds []string
918-
919-
if svc.nodePort != 0 {
920-
if nsc.nodeportBindOnAllIp {
921-
// bind on all interfaces instead
922-
addrs, err := getAllLocalIPs()
923-
924-
if err != nil {
925-
glog.Errorf("Could not get list of system addresses for ipvs services: %s", err.Error())
926-
continue
927-
}
928-
929-
if len(addrs) == 0 {
930-
glog.Errorf("No IP addresses returned for nodeport service creation!")
931-
continue
932-
}
933-
934-
ipvsNodeportSvcs = make([]*ipvs.Service, len(addrs))
935-
nodeServiceIds = make([]string, len(addrs))
936-
937-
for i, addr := range addrs {
938-
ipvsNodeportSvcs[i], err = nsc.ln.ipvsAddService(ipvsSvcs, addr.IP, protocol, uint16(svc.nodePort), svc.sessionAffinity, svc.scheduler, svc.flags)
939-
if err != nil {
940-
glog.Errorf("Failed to create ipvs service for node port due to: %s", err.Error())
941-
continue
942-
}
943-
944-
nodeServiceIds[i] = generateIpPortId(addr.IP.String(), svc.protocol, strconv.Itoa(svc.nodePort))
945-
activeServiceEndpointMap[nodeServiceIds[i]] = make([]string, 0)
946-
}
947-
} else {
948-
ipvsNodeportSvcs = make([]*ipvs.Service, 1)
949-
ipvsNodeportSvcs[0], err = nsc.ln.ipvsAddService(ipvsSvcs, nsc.nodeIP, protocol, uint16(svc.nodePort), svc.sessionAffinity, svc.scheduler, svc.flags)
950-
if err != nil {
951-
glog.Errorf("Failed to create ipvs service for node port due to: %s", err.Error())
952-
continue
953-
}
954-
955-
nodeServiceIds = make([]string, 1)
956-
nodeServiceIds[0] = generateIpPortId(nsc.nodeIP.String(), svc.protocol, strconv.Itoa(svc.nodePort))
957-
activeServiceEndpointMap[nodeServiceIds[0]] = make([]string, 0)
958-
}
959-
}
960-
961-
externalIpServices := make([]externalIPService, 0)
962-
// create IPVS service for the service to be exposed through the external IP's
963-
// For external IP (which are meant for ingress traffic) Kube-router setsup IPVS services
964-
// based on FWMARK to enable Direct server return functionality. DSR requires a director
965-
// without a VIP http://www.austintek.com/LVS/LVS-HOWTO/HOWTO/LVS-HOWTO.routing_to_VIP-less_director.html
966-
// to avoid martian packets
967-
extIPSet := sets.NewString(svc.externalIPs...)
968-
if !svc.skipLbIps {
969-
extIPSet = extIPSet.Union(sets.NewString(svc.loadBalancerIPs...))
970-
}
971-
972-
for _, externalIP := range extIPSet.List() {
973-
var externalIpServiceId string
974-
if svc.directServerReturn && svc.directServerReturnMethod == "tunnel" {
975-
ipvsExternalIPSvc, err := nsc.ln.ipvsAddFWMarkService(net.ParseIP(externalIP), protocol, uint16(svc.port), svc.sessionAffinity, svc.scheduler, svc.flags)
976-
if err != nil {
977-
glog.Errorf("Failed to create ipvs service for External IP: %s due to: %s", externalIP, err.Error())
978-
continue
979-
}
980-
externalIpServices = append(externalIpServices, externalIPService{ipvsSvc: ipvsExternalIPSvc, externalIp: externalIP})
981-
fwMark := generateFwmark(externalIP, svc.protocol, strconv.Itoa(svc.port))
982-
externalIpServiceId = fmt.Sprint(fwMark)
983-
984-
// ensure there is iptables mangle table rule to FWMARK the packet
985-
err = setupMangleTableRule(externalIP, svc.protocol, strconv.Itoa(svc.port), externalIpServiceId)
986-
if err != nil {
987-
glog.Errorf("Failed to setup mangle table rule to FMWARD the traffic to external IP")
988-
continue
989-
}
990-
991-
// ensure VIP less director. we dont assign VIP to any interface
992-
err = nsc.ln.ipAddrDel(dummyVipInterface, externalIP)
993-
994-
// do policy routing to deliver the packet locally so that IPVS can pick the packet
995-
err = routeVIPTrafficToDirector("0x" + fmt.Sprintf("%x", fwMark))
996-
if err != nil {
997-
glog.Errorf("Failed to setup ip rule to lookup traffic to external IP: %s through custom "+
998-
"route table due to %s", externalIP, err.Error())
999-
continue
1000-
}
1001-
} else {
1002-
// ensure director with vip assigned
1003-
err := nsc.ln.ipAddrAdd(dummyVipInterface, externalIP, true)
1004-
if err != nil && err.Error() != IFACE_HAS_ADDR {
1005-
glog.Errorf("Failed to assign external ip %s to dummy interface %s due to %s", externalIP, KUBE_DUMMY_IF, err.Error())
1006-
}
1007-
1008-
// create IPVS service for the service to be exposed through the external ip
1009-
ipvsExternalIPSvc, err := nsc.ln.ipvsAddService(ipvsSvcs, net.ParseIP(externalIP), protocol, uint16(svc.port), svc.sessionAffinity, svc.scheduler, svc.flags)
1010-
if err != nil {
1011-
glog.Errorf("Failed to create ipvs service for external ip: %s due to %s", externalIP, err.Error())
1012-
continue
1013-
}
1014-
externalIpServices = append(externalIpServices, externalIPService{ipvsSvc: ipvsExternalIPSvc, externalIp: externalIP})
1015-
externalIpServiceId = generateIpPortId(externalIP, svc.protocol, strconv.Itoa(svc.port))
1016-
1017-
// ensure there is NO iptables mangle table rule to FWMARK the packet
1018-
fwMark := fmt.Sprint(generateFwmark(externalIP, svc.protocol, strconv.Itoa(svc.port)))
1019-
err = nsc.ln.cleanupMangleTableRule(externalIP, svc.protocol, strconv.Itoa(svc.port), fwMark)
1020-
if err != nil {
1021-
glog.Errorf("Failed to verify and cleanup any mangle table rule to FMWARD the traffic to external IP due to " + err.Error())
1022-
continue
1023-
}
1024-
}
1025-
1026-
activeServiceEndpointMap[externalIpServiceId] = make([]string, 0)
1027-
for _, endpoint := range endpoints {
1028-
if !svc.local || (svc.local && endpoint.isLocal) {
1029-
activeServiceEndpointMap[externalIpServiceId] = append(activeServiceEndpointMap[externalIpServiceId], endpoint.ip)
1030-
}
1031-
}
1032-
}
1033-
1034-
// add IPVS remote server to the IPVS service
1035-
for _, endpoint := range endpoints {
1036-
dst := ipvs.Destination{
1037-
Address: net.ParseIP(endpoint.ip),
1038-
AddressFamily: syscall.AF_INET,
1039-
Port: uint16(endpoint.port),
1040-
Weight: 1,
1041-
}
1042-
1043-
if !svc.local || (svc.local && endpoint.isLocal) {
1044-
err := nsc.ln.ipvsAddServer(ipvsClusterVipSvc, &dst)
1045-
if err != nil {
1046-
glog.Errorf(err.Error())
1047-
} else {
1048-
activeServiceEndpointMap[clusterServiceId] = append(activeServiceEndpointMap[clusterServiceId], endpoint.ip)
1049-
}
1050-
}
1051-
1052-
if svc.nodePort != 0 {
1053-
for i := 0; i < len(ipvsNodeportSvcs); i++ {
1054-
if !svc.local || (svc.local && endpoint.isLocal) {
1055-
err := nsc.ln.ipvsAddServer(ipvsNodeportSvcs[i], &dst)
1056-
if err != nil {
1057-
glog.Errorf(err.Error())
1058-
} else {
1059-
activeServiceEndpointMap[nodeServiceIds[i]] = append(activeServiceEndpointMap[nodeServiceIds[i]], endpoint.ip)
1060-
}
1061-
}
1062-
}
1063-
}
1064-
1065-
for _, externalIpService := range externalIpServices {
1066-
if svc.local && !endpoint.isLocal {
1067-
continue
1068-
}
1069-
1070-
if svc.directServerReturn && svc.directServerReturnMethod == "tunnel" {
1071-
dst.ConnectionFlags = ipvs.ConnectionFlagTunnel
1072-
}
1073-
1074-
// add server to IPVS service
1075-
err := nsc.ln.ipvsAddServer(externalIpService.ipvsSvc, &dst)
1076-
if err != nil {
1077-
glog.Errorf(err.Error())
1078-
}
1079-
1080-
// For now just support IPVS tunnel mode, we can add other ways of DSR in future
1081-
if svc.directServerReturn && svc.directServerReturnMethod == "tunnel" {
1082-
1083-
podObj, err := nsc.getPodObjectForEndpoint(endpoint.ip)
1084-
if err != nil {
1085-
glog.Errorf("Failed to find endpoint with ip: " + endpoint.ip + ". so skipping peparing endpoint for DSR")
1086-
continue
1087-
}
1088-
1089-
// we are only concerned with endpoint pod running on current node
1090-
if strings.Compare(podObj.Status.HostIP, nsc.nodeIP.String()) != 0 {
1091-
continue
1092-
}
1093-
1094-
containerID := strings.TrimPrefix(podObj.Status.ContainerStatuses[0].ContainerID, "docker://")
1095-
if containerID == "" {
1096-
glog.Errorf("Failed to find container id for the endpoint with ip: " + endpoint.ip + " so skipping peparing endpoint for DSR")
1097-
continue
1098-
}
1099-
1100-
err = nsc.ln.prepareEndpointForDsr(containerID, endpoint.ip, externalIpService.externalIp)
1101-
if err != nil {
1102-
glog.Errorf("Failed to prepare endpoint %s to do direct server return due to %s", endpoint.ip, err.Error())
1103-
}
1104-
}
1105-
}
1106-
}
1107-
}
1108-
1109-
// cleanup stale IPs on dummy interface
1110-
glog.V(1).Info("Cleaning up if any, old service IPs on dummy interface")
1111-
addrActive := make(map[string]bool)
1112-
for k := range activeServiceEndpointMap {
1113-
// verify active and its a generateIpPortId() type service
1114-
if strings.Contains(k, "-") {
1115-
parts := strings.SplitN(k, "-", 3)
1116-
addrActive[parts[0]] = true
1117-
}
1118-
}
1119-
1120-
var addrs []netlink.Addr
1121-
addrs, err = netlink.AddrList(dummyVipInterface, netlink.FAMILY_V4)
1122-
if err != nil {
1123-
return errors.New("Failed to list dummy interface IPs: " + err.Error())
1124-
}
1125-
for _, addr := range addrs {
1126-
isActive := addrActive[addr.IP.String()]
1127-
if !isActive {
1128-
glog.V(1).Infof("Found an IP %s which is no longer needed so cleaning up", addr.IP.String())
1129-
err := nsc.ln.ipAddrDel(dummyVipInterface, addr.IP.String())
1130-
if err != nil {
1131-
glog.Errorf("Failed to delete stale IP %s due to: %s",
1132-
addr.IP.String(), err.Error())
1133-
continue
1134-
}
1135-
}
1136-
}
1137-
1138-
// cleanup stale ipvs service and servers
1139-
glog.V(1).Info("Cleaning up if any, old ipvs service and servers which are no longer needed")
1140-
ipvsSvcs, err = nsc.ln.ipvsGetServices()
1141-
1142-
if err != nil {
1143-
return errors.New("Failed to list IPVS services: " + err.Error())
1144-
}
1145-
var protocol string
1146-
for _, ipvsSvc := range ipvsSvcs {
1147-
if ipvsSvc.Protocol == syscall.IPPROTO_TCP {
1148-
protocol = "tcp"
1149-
} else {
1150-
protocol = "udp"
1151-
}
1152-
var key string
1153-
if ipvsSvc.Address != nil {
1154-
key = generateIpPortId(ipvsSvc.Address.String(), protocol, strconv.Itoa(int(ipvsSvc.Port)))
1155-
} else if ipvsSvc.FWMark != 0 {
1156-
key = fmt.Sprint(ipvsSvc.FWMark)
1157-
} else {
1158-
continue
1159-
}
1160-
1161-
endpoints, ok := activeServiceEndpointMap[key]
1162-
// Only delete the service if it's not there anymore to prevent flapping
1163-
// old: if !ok || len(endpoints) == 0 {
1164-
if !ok {
1165-
glog.V(1).Infof("Found a IPVS service %s which is no longer needed so cleaning up",
1166-
ipvsServiceString(ipvsSvc))
1167-
err := nsc.ln.ipvsDelService(ipvsSvc)
1168-
if err != nil {
1169-
glog.Errorf("Failed to delete stale IPVS service %s due to: %s",
1170-
ipvsServiceString(ipvsSvc), err.Error())
1171-
continue
1172-
}
1173-
} else {
1174-
dsts, err := nsc.ln.ipvsGetDestinations(ipvsSvc)
1175-
if err != nil {
1176-
glog.Errorf("Failed to get list of servers from ipvs service")
1177-
}
1178-
for _, dst := range dsts {
1179-
validEp := false
1180-
for _, ep := range endpoints {
1181-
if ep == dst.Address.String() {
1182-
validEp = true
1183-
break
1184-
}
1185-
}
1186-
if !validEp {
1187-
glog.V(1).Infof("Found a destination %s in service %s which is no longer needed so cleaning up",
1188-
ipvsDestinationString(dst), ipvsServiceString(ipvsSvc))
1189-
err = nsc.ipvsDeleteDestination(ipvsSvc, dst)
1190-
if err != nil {
1191-
glog.Errorf("Failed to delete destination %s from ipvs service %s",
1192-
ipvsDestinationString(dst), ipvsServiceString(ipvsSvc))
1193-
}
1194-
}
1195-
}
1196-
}
1197-
}
1198-
1199-
err = nsc.syncIpvsFirewall()
1200-
if err != nil {
1201-
glog.Errorf("Error syncing ipvs svc iptables rules: %s", err.Error())
1202-
}
1203-
1204-
glog.V(1).Info("IPVS servers and services are synced to desired state")
1205-
return nil
1206-
}
1207-
1208837
func (nsc *NetworkServicesController) getPodObjectForEndpoint(endpointIP string) (*api.Pod, error) {
1209838
for _, obj := range nsc.podLister.List() {
1210839
pod := obj.(*api.Pod)

0 commit comments

Comments
 (0)