@@ -29,6 +29,7 @@ import (
2929 "github.com/vishvananda/netlink"
3030 "github.com/vishvananda/netns"
3131 "golang.org/x/net/context"
32+ api "k8s.io/api/core/v1"
3233 "k8s.io/client-go/kubernetes"
3334)
3435
@@ -307,7 +308,7 @@ func (nsc *NetworkServicesController) syncIpvsServices(serviceInfoMap serviceInf
307308 externalIpServices := make ([]externalIPService , 0 )
308309 // create IPVS service for the service to be exposed through the external IP's
309310 // For external IP (which are meant for ingress traffic) Kube-router setsup IPVS services
310- // based on FWMARK to enable Direct server return functionality. DSR requires a director
311+ // based on FWMARK to enable Direct server return functionality. DSR requires a director
311312 // without a VIP http://www.austintek.com/LVS/LVS-HOWTO/HOWTO/LVS-HOWTO.routing_to_VIP-less_director.html
312313 // to avoid martian packets
313314 for _ , externalIP := range svc .externalIPs {
@@ -331,8 +332,8 @@ func (nsc *NetworkServicesController) syncIpvsServices(serviceInfoMap serviceInf
331332 // to deliver the packet locally so that IPVS can pick the packet
332333 err = routeVIPTrafficToDirector ("0x" + fmt .Sprintf ("%x" , fwMark ))
333334 if err != nil {
334- glog .Errorf ("Failed to setup ip rule to lookup traffic to external IP: %s through custom " +
335- "route table due to " , externalIP , err .Error ())
335+ glog .Errorf ("Failed to setup ip rule to lookup traffic to external IP: %s through custom " +
336+ "route table due to " , externalIP , err .Error ())
336337 continue
337338 }
338339
@@ -371,18 +372,42 @@ func (nsc *NetworkServicesController) syncIpvsServices(serviceInfoMap serviceInf
371372 }
372373
373374 for _ , externalIpService := range externalIpServices {
374- // For now just support IPVS tunnel mode, we can add other ways of DSR in future
375+
375376 if svc .directServerReturn && svc .directServerReturnMethod == "tunnel" {
376377 dst .ConnectionFlags = ipvs .ConnectionFlagTunnel
377- err := prepareEndpointForDsr (nsc .nodeIP .String (), endpoint .ip , externalIpService .externalIp )
378- if err != nil {
379- glog .Errorf ("Failed to prepare endpoint %s to do direct server return due to %s" , endpoint .ip , err .Error ())
380- }
381378 }
379+
380+ // add server to IPVS service
382381 err := ipvsAddServer (externalIpService .ipvsSvc , & dst )
383382 if err != nil {
384383 glog .Errorf (err .Error ())
385384 }
385+
386+ // For now just support IPVS tunnel mode, we can add other ways of DSR in future
387+ if svc .directServerReturn && svc .directServerReturnMethod == "tunnel" {
388+
389+ podObj , err := getPodObjectForEndpoint (endpoint .ip )
390+ if err != nil {
391+ glog .Errorf ("Failed to find endpoint with ip: " + endpoint .ip + ". so skipping peparing endpoint for DSR" )
392+ continue
393+ }
394+
395+ // we are only concerned with endpoint pod running on current node
396+ if strings .Compare (podObj .Status .HostIP , nsc .nodeIP .String ()) != 0 {
397+ continue
398+ }
399+
400+ containerID := strings .TrimPrefix (podObj .Status .ContainerStatuses [0 ].ContainerID , "docker://" )
401+ if containerID == "" {
402+ glog .Errorf ("Failed to find container id for the endpoint with ip: " + endpoint .ip + " so skipping peparing endpoint for DSR" )
403+ continue
404+ }
405+
406+ err = prepareEndpointForDsr (containerID , endpoint .ip , externalIpService .externalIp )
407+ if err != nil {
408+ glog .Errorf ("Failed to prepare endpoint %s to do direct server return due to %s" , endpoint .ip , err .Error ())
409+ }
410+ }
386411 }
387412 }
388413 }
@@ -448,38 +473,35 @@ func (nsc *NetworkServicesController) syncIpvsServices(serviceInfoMap serviceInf
448473 return nil
449474}
450475
476+ func getPodObjectForEndpoint (endpointIP string ) (* api.Pod , error ) {
477+ for _ , pod := range watchers .PodWatcher .List () {
478+ if strings .Compare (pod .Status .PodIP , endpointIP ) == 0 {
479+ return pod , nil
480+ }
481+ }
482+ return nil , errors .New ("Failed to find pod with ip " + endpointIP )
483+ }
484+
451485// This function does the following
452486// - get the pod corresponding to the endpoint ip
453487// - get the container id from pod spec
454488// - from the container id, use docker client to get the pid
455489// - enter process network namespace and create ipip tunnel
456490// - add VIP to the tunnel interface
457491// - disable rp_filter
458- func prepareEndpointForDsr (nodeip string , endpointIP string , vip string ) error {
492+ func prepareEndpointForDsr (containerId string , endpointIP string , vip string ) error {
459493
460494 currentNamespaceHandle , err := netns .Get ()
461495 if err != nil {
462496 return errors .New ("Failed to get namespace due to " + err .Error ())
463497 }
464- defer netns .Set (currentNamespaceHandle )
465-
466- containerID := ""
467- for _ , pod := range watchers .PodWatcher .List () {
468- if strings .Compare (pod .Status .HostIP , nodeip ) == 0 && strings .Compare (pod .Status .PodIP , endpointIP ) == 0 {
469- containerID = strings .TrimPrefix (pod .Status .ContainerStatuses [0 ].ContainerID , "docker://" )
470- break
471- }
472- }
473- if containerID == "" {
474- return errors .New ("Failed to get container id, so not preparing endpoint for DSR" )
475- }
476498
477499 client , err := client .NewEnvClient ()
478500 if err != nil {
479501 return errors .New ("Failed to get docker client due to " + err .Error ())
480502 }
481503
482- containerSpec , err := client .ContainerInspect (context .Background (), containerID )
504+ containerSpec , err := client .ContainerInspect (context .Background (), containerId )
483505 if err != nil {
484506 return errors .New ("Failed to get docker container spec due to " + err .Error ())
485507 }
@@ -495,10 +517,14 @@ func prepareEndpointForDsr(nodeip string, endpointIP string, vip string) error {
495517 return errors .New ("Failed to enter to endpoint namespace due to " + err .Error ())
496518 }
497519
520+ // TODO: fix boilerplate `netns.Set(currentNamespaceHandle)` code. Need a robust
521+ // way to switch back to old namespace, pretty much many things will go wrong
522+
498523 // create a ipip tunnel interface inside the endpoint container
499524 tunIf , err := netlink .LinkByName (KUBE_TUNNEL_IF )
500525 if err != nil {
501526 if err .Error () != IFACE_NOT_FOUND {
527+ netns .Set (currentNamespaceHandle )
502528 return errors .New ("Failed to verify if ipip tunnel interface exists in endpoint " + endpointIP + " namespace due to " + err .Error ())
503529 }
504530
@@ -509,11 +535,22 @@ func prepareEndpointForDsr(nodeip string, endpointIP string, vip string) error {
509535 }
510536 err = netlink .LinkAdd (& ipTunLink )
511537 if err != nil {
538+ netns .Set (currentNamespaceHandle )
512539 return errors .New ("Failed to add ipip tunnel interface in endpoint namespace due to " + err .Error ())
513540 }
514- time .Sleep (1000 * time .Millisecond )
515- tunIf , err = netlink .LinkByName (KUBE_TUNNEL_IF )
541+
542+ // TODO: this is ugly, but ran into issue multiple times where interface did not come up quickly.
543+ // need to find the root cause
544+ for retry := 0 ; retry < 60 ; retry ++ {
545+ time .Sleep (1000 * time .Millisecond )
546+ tunIf , err = netlink .LinkByName (KUBE_TUNNEL_IF )
547+ if err != nil && err .Error () == IFACE_NOT_FOUND {
548+ continue
549+ }
550+ }
551+
516552 if err != nil {
553+ netns .Set (currentNamespaceHandle )
517554 return errors .New ("Failed to get " + KUBE_TUNNEL_IF + " tunnel interface handle due to " + err .Error ())
518555 }
519556
@@ -523,6 +560,7 @@ func prepareEndpointForDsr(nodeip string, endpointIP string, vip string) error {
523560 // bring the tunnel interface up
524561 err = netlink .LinkSetUp (tunIf )
525562 if err != nil {
563+ netns .Set (currentNamespaceHandle )
526564 return errors .New ("Failed to bring up ipip tunnel interface in endpoint namespace due to " + err .Error ())
527565 }
528566
@@ -531,17 +569,20 @@ func prepareEndpointForDsr(nodeip string, endpointIP string, vip string) error {
531569 Mask : net .IPv4Mask (255 , 255 , 255 , 255 )}, Scope : syscall .RT_SCOPE_LINK }
532570 err = netlink .AddrAdd (tunIf , netlinkVip )
533571 if err != nil && err .Error () != IFACE_HAS_ADDR {
572+ netns .Set (currentNamespaceHandle )
534573 return errors .New ("Failed to assign vip " + vip + " to kube-tunnel-if interface " )
535574 }
536575 glog .Infof ("Successfully assinged VIP: " + vip + " in endpoint " + endpointIP + "." )
537576
538- // disable rp_filter on KUBE_TUNNEL_IF interface
539- err = ioutil .WriteFile ("/proc/sys/net/ipv4/conf/kube-tunnel-if /rp_filter" , []byte (strconv .Itoa (0 )), 0640 )
577+ // disable rp_filter on all interface
578+ err = ioutil .WriteFile ("/proc/sys/net/ipv4/conf/all /rp_filter" , []byte (strconv .Itoa (0 )), 0640 )
540579 if err != nil {
580+ netns .Set (currentNamespaceHandle )
541581 return errors .New ("Failed to disable rp_filter in the endpoint container" )
542582 }
543583 glog .Infof ("Successfully disabled rp_filter in endpoint " + endpointIP + "." )
544584
585+ netns .Set (currentNamespaceHandle )
545586 return nil
546587}
547588
@@ -1009,7 +1050,7 @@ func ipvsAddService(vip net.IP, protocol, port uint16, persistent bool, schedule
10091050
10101051// generateFwmark: generate a uint32 hash value using the IP address, port, protocol information
10111052// TODO: collision can rarely happen but still need to be ruled out
1012- // TODO: I ran into issues with FWMARK for any value above 2^15. Either policy
1053+ // TODO: I ran into issues with FWMARK for any value above 2^15. Either policy
10131054// routing and IPVS FWMARK service was not functioning with value above 2^15
10141055func generateFwmark (ip , protocol , port string ) uint32 {
10151056 h := fnv .New32a ()
@@ -1093,6 +1134,11 @@ func ipvsAddServer(service *ipvs.Service, dest *ipvs.Destination) error {
10931134 }
10941135
10951136 if strings .Contains (err .Error (), IPVS_SERVER_EXISTS ) {
1137+ err = h .UpdateDestination (service , dest )
1138+ if err != nil {
1139+ return fmt .Errorf ("Failed to update ipvs destination %s to the ipvs service %s due to : %s" , dest .Address ,
1140+ ipvsDestinationString (dest ), ipvsServiceString (service ), err .Error ())
1141+ }
10961142 // TODO: Make this debug output when we get log levels
10971143 // glog.Infof("ipvs destination %s already exists in the ipvs service %s so not adding destination",
10981144 // ipvsDestinationString(dest), ipvsServiceString(service))
@@ -1109,7 +1155,7 @@ const (
11091155)
11101156
11111157// setupMangleTableRule: setsup iptable rule to FWMARK the traffic to exteranl IP vip
1112- func setupMangleTableRule (ip string , protocol string , port string , fwmark string ) error {
1158+ func setupMangleTableRule (ip string , protocol string , port string , fwmark string ) error {
11131159 iptablesCmdHandler , err := iptables .New ()
11141160 if err != nil {
11151161 return errors .New ("Failed to initialize iptables executor" + err .Error ())
@@ -1133,8 +1179,8 @@ func routeVIPTrafficToDirector(fwmark string) error {
11331179 if ! strings .Contains (string (out ), fwmark ) {
11341180 err = exec .Command ("ip" , "rule" , "add" , "fwmark" , fwmark , "table" , customDSRRouteTableID ).Run ()
11351181 if err != nil {
1136- return errors .New ("Failed to add policy rule to lookup traffic to VIP through the custom " +
1137- " routing table due to " + err .Error ())
1182+ return errors .New ("Failed to add policy rule to lookup traffic to VIP through the custom " +
1183+ " routing table due to " + err .Error ())
11381184 }
11391185 }
11401186 return nil
@@ -1160,11 +1206,11 @@ func setupPolicyRoutingForDSR() error {
11601206 out , err := exec .Command ("ip" , "route" , "list" , "table" , customDSRRouteTableID ).Output ()
11611207 if err != nil {
11621208 return errors .New ("Failed to verify required default route exists. " +
1163- "Failed to setup policy routing required for DSR due to " + err .Error ())
1209+ "Failed to setup policy routing required for DSR due to " + err .Error ())
11641210 }
11651211 if ! strings .Contains (string (out ), " lo " ) {
11661212 if err = exec .Command ("ip" , "route" , "add" , "local" , "default" , "dev" , "lo" , "table" ,
1167- customDSRRouteTableID ).Run (); err != nil {
1213+ customDSRRouteTableID ).Run (); err != nil {
11681214 return errors .New ("Failed to add route in custom route table due to: " + err .Error ())
11691215 }
11701216 }
0 commit comments