Skip to content

Commit ab08c31

Browse files
authored
add service/endpoint event handler for routes controller (#384)
1 parent e94bf3d commit ab08c31

File tree

3 files changed

+237
-55
lines changed

3 files changed

+237
-55
lines changed

app/controllers/network_routes_controller.go

Lines changed: 225 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,9 @@ type NetworkRoutingController struct {
101101
svcLister cache.Indexer
102102
epLister cache.Indexer
103103

104-
NodeEventHandler cache.ResourceEventHandler
104+
NodeEventHandler cache.ResourceEventHandler
105+
ServiceEventHandler cache.ResourceEventHandler
106+
EndpointsEventHandler cache.ResourceEventHandler
105107
}
106108

107109
// Run runs forever until we are notified on stop channel
@@ -253,12 +255,17 @@ func (nrc *NetworkRoutingController) Run(healthChan chan<- *ControllerHeartbeat,
253255
}
254256
}
255257

256-
// [un]advertise IPs for the services to be reachable via host
257-
toAdvertise, toUnAdvertise, _ := nrc.getIpsToAdvertise(true)
258-
nrc.advertiseIPs(toAdvertise, toUnAdvertise)
258+
// advertise or withdraw IPs for the services to be reachable via host
259+
toAdvertise, toWithdraw, err := nrc.getActiveVIPs()
260+
if err != nil {
261+
glog.Errorf("failed to get routes to advertise/withdraw %s", err)
262+
}
263+
264+
nrc.advertiseVIPs(toAdvertise)
265+
nrc.withdrawVIPs(toWithdraw)
259266

260267
glog.V(1).Info("Performing periodic sync of the routes")
261-
err = nrc.advertiseRoute()
268+
err = nrc.advertisePodRoute()
262269
if err != nil {
263270
glog.Errorf("Error advertising route: %s", err.Error())
264271
}
@@ -375,7 +382,7 @@ func (nrc *NetworkRoutingController) watchBgpUpdates() {
375382
}
376383
}
377384

378-
func (nrc *NetworkRoutingController) advertiseRoute() error {
385+
func (nrc *NetworkRoutingController) advertisePodRoute() error {
379386
cidr, err := utils.GetPodCidrFromNodeSpec(nrc.clientset, nrc.hostnameOverride)
380387
if err != nil {
381388
return err
@@ -441,59 +448,88 @@ func (nrc *NetworkRoutingController) getLoadBalancerIps(svc *v1core.Service) []s
441448
return loadBalancerIpList
442449
}
443450

444-
func (nrc *NetworkRoutingController) getIpsToAdvertise(verifyEndpoints bool) ([]string, []string, error) {
445-
ipsToAdvertise := make([]string, 0)
446-
ipsToUnAdvertise := make([]string, 0)
451+
func (nrc *NetworkRoutingController) getAllVIPs() ([]string, []string, error) {
452+
return nrc.getVIPs(false)
453+
}
454+
455+
func (nrc *NetworkRoutingController) getActiveVIPs() ([]string, []string, error) {
456+
return nrc.getVIPs(true)
457+
}
458+
459+
func (nrc *NetworkRoutingController) getVIPs(onlyActiveEndpoints bool) ([]string, []string, error) {
460+
toAdvertiseList := make([]string, 0)
461+
toWithdrawList := make([]string, 0)
462+
447463
for _, obj := range nrc.svcLister.List() {
448464
svc := obj.(*v1core.Service)
449465

450-
ipList := make([]string, 0)
451-
var err error
452-
nodeHasEndpoints := true
453-
if verifyEndpoints {
454-
if svc.Spec.ExternalTrafficPolicy == v1core.ServiceExternalTrafficPolicyTypeLocal {
455-
nodeHasEndpoints, err = nrc.nodeHasEndpointsForService(svc)
456-
if err != nil {
457-
glog.Errorf("error determining if node has endpoints for svc: %q error: %v", svc.Name, err)
458-
continue
459-
}
460-
}
466+
toAdvertise, toWithdraw, err := nrc.getVIPsForService(svc, onlyActiveEndpoints)
467+
if err != nil {
468+
return nil, nil, err
461469
}
462-
if nrc.advertiseClusterIp {
463-
clusterIp := nrc.getClusterIp(svc)
464-
if clusterIp != "" {
465-
ipList = append(ipList, clusterIp)
466-
}
470+
471+
if len(toAdvertise) > 0 {
472+
toAdvertiseList = append(toAdvertiseList, toAdvertise...)
467473
}
468-
if nrc.advertiseExternalIp {
469-
ipList = append(ipList, nrc.getExternalIps(svc)...)
474+
475+
if len(toWithdraw) > 0 {
476+
toWithdrawList = append(toWithdrawList, toWithdraw...)
470477
}
471-
if nrc.advertiseLoadBalancerIp {
472-
ipList = append(ipList, nrc.getLoadBalancerIps(svc)...)
478+
}
479+
480+
return toAdvertiseList, toWithdrawList, nil
481+
}
482+
483+
func (nrc *NetworkRoutingController) getVIPsForService(svc *v1core.Service, onlyActiveEndpoints bool) ([]string, []string, error) {
484+
ipList := make([]string, 0)
485+
var err error
486+
487+
nodeHasEndpoints := true
488+
if onlyActiveEndpoints {
489+
if svc.Spec.ExternalTrafficPolicy == v1core.ServiceExternalTrafficPolicyTypeLocal {
490+
nodeHasEndpoints, err = nrc.nodeHasEndpointsForService(svc)
491+
if err != nil {
492+
return nil, nil, err
493+
}
473494
}
474-
if nodeHasEndpoints {
475-
ipsToAdvertise = append(ipsToAdvertise, ipList...)
476-
} else {
477-
ipsToUnAdvertise = append(ipsToUnAdvertise, ipList...)
495+
}
496+
497+
if nrc.advertiseClusterIp {
498+
clusterIp := nrc.getClusterIp(svc)
499+
if clusterIp != "" {
500+
ipList = append(ipList, clusterIp)
478501
}
479502
}
480-
return ipsToAdvertise, ipsToUnAdvertise, nil
503+
if nrc.advertiseExternalIp {
504+
ipList = append(ipList, nrc.getExternalIps(svc)...)
505+
}
506+
if nrc.advertiseLoadBalancerIp {
507+
ipList = append(ipList, nrc.getLoadBalancerIps(svc)...)
508+
}
509+
510+
if !nodeHasEndpoints {
511+
return nil, ipList, nil
512+
}
513+
514+
return ipList, nil, nil
481515
}
482516

483-
func (nrc *NetworkRoutingController) advertiseIPs(toAdvertise []string, toUnAdvertise []string) error {
484-
for _, ip := range toAdvertise {
485-
err := nrc.AdvertiseClusterIp(ip)
517+
func (nrc *NetworkRoutingController) advertiseVIPs(vips []string) {
518+
for _, vip := range vips {
519+
err := nrc.bgpAdvertiseVIP(vip)
486520
if err != nil {
487-
glog.Errorf("error advertising IP: %q, error: %v", ip, err)
521+
glog.Errorf("error advertising IP: %q, error: %v", vip, err)
488522
}
489523
}
490-
for _, ip := range toUnAdvertise {
491-
err := nrc.UnadvertiseClusterIp(ip)
524+
}
525+
526+
func (nrc *NetworkRoutingController) withdrawVIPs(vips []string) {
527+
for _, vip := range vips {
528+
err := nrc.bgpWithdrawVIP(vip)
492529
if err != nil {
493-
glog.Errorf("error unadvertising IP: %q, error: %v", ip, err)
530+
glog.Errorf("error withdrawing IP: %q, error: %v", vip, err)
494531
}
495532
}
496-
return nil
497533
}
498534

499535
// nodeHasEndpointsForService will get the corresponding Endpoints resource for a given Service
@@ -530,6 +566,29 @@ func (nrc *NetworkRoutingController) nodeHasEndpointsForService(svc *v1core.Serv
530566
return false, nil
531567
}
532568

569+
func (nrc *NetworkRoutingController) serviceForEndpoints(ep *v1core.Endpoints) (*v1core.Service, error) {
570+
key, err := cache.MetaNamespaceKeyFunc(ep)
571+
if err != nil {
572+
return nil, err
573+
}
574+
575+
item, exists, err := nrc.svcLister.GetByKey(key)
576+
if err != nil {
577+
return nil, err
578+
}
579+
580+
if !exists {
581+
return nil, fmt.Errorf("service resource doesn't exist for endpoints: %q", ep.Name)
582+
}
583+
584+
svc, ok := item.(*v1core.Service)
585+
if !ok {
586+
return nil, errors.New("type assertion failed for object in service indexer")
587+
}
588+
589+
return svc, nil
590+
}
591+
533592
// Used for processing Annotations that may contain multiple items
534593
// Pass this the string and the delimiter
535594
func stringToSlice(s, d string) []string {
@@ -673,27 +732,27 @@ func connectToExternalBGPPeers(server *gobgp.BgpServer, peerConfigs []*config.Ne
673732
}
674733

675734
// AdvertiseClusterIp advertises the service cluster ip the configured peers
676-
func (nrc *NetworkRoutingController) AdvertiseClusterIp(clusterIp string) error {
735+
func (nrc *NetworkRoutingController) bgpAdvertiseVIP(vip string) error {
677736

678737
attrs := []bgp.PathAttributeInterface{
679738
bgp.NewPathAttributeOrigin(0),
680739
bgp.NewPathAttributeNextHop(nrc.nodeIP.String()),
681740
}
682741

683-
glog.V(2).Infof("Advertising route: '%s/%s via %s' to peers", clusterIp, strconv.Itoa(32), nrc.nodeIP.String())
742+
glog.V(2).Infof("Advertising route: '%s/%s via %s' to peers", vip, strconv.Itoa(32), nrc.nodeIP.String())
684743

685744
_, err := nrc.bgpServer.AddPath("", []*table.Path{table.NewPath(nil, bgp.NewIPAddrPrefix(uint8(32),
686-
clusterIp), false, attrs, time.Now(), false)})
745+
vip), false, attrs, time.Now(), false)})
687746

688747
return err
689748
}
690749

691750
// UnadvertiseClusterIP unadvertises the service cluster ip
692-
func (nrc *NetworkRoutingController) UnadvertiseClusterIp(clusterIp string) error {
693-
glog.V(2).Infof("Unadvertising route: '%s/%s via %s' to peers", clusterIp, strconv.Itoa(32), nrc.nodeIP.String())
751+
func (nrc *NetworkRoutingController) bgpWithdrawVIP(vip string) error {
752+
glog.V(2).Infof("Withdrawing route: '%s/%s via %s' to peers", vip, strconv.Itoa(32), nrc.nodeIP.String())
694753

695754
pathList := []*table.Path{table.NewPath(nil, bgp.NewIPAddrPrefix(uint8(32),
696-
clusterIp), true, nil, time.Now(), false)}
755+
vip), true, nil, time.Now(), false)}
697756

698757
err := nrc.bgpServer.DeletePath([]byte(nil), 0, "", pathList)
699758

@@ -736,7 +795,7 @@ func (nrc *NetworkRoutingController) addExportPolicies() error {
736795

737796
// creates prefix set to represent all the advertisable IP associated with the services
738797
advIpPrefixList := make([]config.Prefix, 0)
739-
advIps, _, _ := nrc.getIpsToAdvertise(false)
798+
advIps, _, _ := nrc.getAllVIPs()
740799
for _, ip := range advIps {
741800
advIpPrefixList = append(advIpPrefixList, config.Prefix{IpPrefix: ip + "/32"})
742801
}
@@ -1367,6 +1426,90 @@ func (nrc *NetworkRoutingController) OnNodeUpdate(obj interface{}) {
13671426
}
13681427
}
13691428

1429+
func (nrc *NetworkRoutingController) OnServiceUpdate(obj interface{}) {
1430+
if !nrc.bgpServerStarted {
1431+
return
1432+
}
1433+
1434+
svc, ok := obj.(*v1core.Service)
1435+
if !ok {
1436+
glog.Errorf("cache indexer returned obj that is not type *v1.Service")
1437+
return
1438+
}
1439+
1440+
toAdvertise, toWithdraw, err := nrc.getVIPsForService(svc, true)
1441+
if err != nil {
1442+
glog.Errorf("error getting routes for service: %s, err: %s", svc.Name, err)
1443+
return
1444+
}
1445+
1446+
if len(toAdvertise) > 0 {
1447+
nrc.advertiseVIPs(toAdvertise)
1448+
}
1449+
1450+
if len(toWithdraw) > 0 {
1451+
nrc.withdrawVIPs(toWithdraw)
1452+
}
1453+
}
1454+
1455+
func (nrc *NetworkRoutingController) OnServiceDelete(obj interface{}) {
1456+
if !nrc.bgpServerStarted {
1457+
return
1458+
}
1459+
1460+
svc, ok := obj.(*v1core.Service)
1461+
if !ok {
1462+
glog.Errorf("cache indexer returned obj that is not type *v1.Service")
1463+
return
1464+
}
1465+
1466+
toAdvertise, toWithdraw, err := nrc.getVIPsForService(svc, true)
1467+
if err != nil {
1468+
glog.Errorf("failed to get clean up routes for deleted service %s", svc.Name)
1469+
return
1470+
}
1471+
1472+
if len(toAdvertise) > 0 {
1473+
nrc.withdrawVIPs(toWithdraw)
1474+
}
1475+
1476+
if len(toWithdraw) > 0 {
1477+
nrc.withdrawVIPs(toWithdraw)
1478+
}
1479+
}
1480+
1481+
func (nrc *NetworkRoutingController) OnEndpointsUpdate(obj interface{}) {
1482+
if !nrc.bgpServerStarted {
1483+
return
1484+
}
1485+
1486+
ep, ok := obj.(*v1core.Endpoints)
1487+
if !ok {
1488+
glog.Errorf("cache indexer returned obj that is not type *v1.Endpoints")
1489+
return
1490+
}
1491+
1492+
svc, err := nrc.serviceForEndpoints(ep)
1493+
if err != nil {
1494+
glog.Errorf("failed to convert endpoints resource to service: %s", err)
1495+
return
1496+
}
1497+
1498+
toAdvertise, toWithdraw, err := nrc.getVIPsForService(svc, true)
1499+
if err != nil {
1500+
glog.Errorf("error getting routes for service: %s, err: %s", svc.Name, err)
1501+
return
1502+
}
1503+
1504+
if len(toAdvertise) > 0 {
1505+
nrc.advertiseVIPs(toAdvertise)
1506+
}
1507+
1508+
if len(toWithdraw) > 0 {
1509+
nrc.withdrawVIPs(toWithdraw)
1510+
}
1511+
}
1512+
13701513
func (nrc *NetworkRoutingController) startBgpServer() error {
13711514
var nodeAsnNumber uint32
13721515
node, err := utils.GetNodeObject(nrc.clientset, nrc.hostnameOverride)
@@ -1583,6 +1726,36 @@ func (nrc *NetworkRoutingController) newNodeEventHandler() cache.ResourceEventHa
15831726
}
15841727
}
15851728

1729+
func (nrc *NetworkRoutingController) newServiceEventHandler() cache.ResourceEventHandler {
1730+
return cache.ResourceEventHandlerFuncs{
1731+
AddFunc: func(obj interface{}) {
1732+
nrc.OnServiceUpdate(obj)
1733+
},
1734+
UpdateFunc: func(oldObj, newObj interface{}) {
1735+
nrc.OnServiceUpdate(newObj)
1736+
},
1737+
DeleteFunc: func(obj interface{}) {
1738+
nrc.OnServiceDelete(obj)
1739+
},
1740+
}
1741+
}
1742+
1743+
func (nrc *NetworkRoutingController) newEndpointsEventHandler() cache.ResourceEventHandler {
1744+
return cache.ResourceEventHandlerFuncs{
1745+
AddFunc: func(obj interface{}) {
1746+
nrc.OnEndpointsUpdate(obj)
1747+
},
1748+
UpdateFunc: func(oldObj, newObj interface{}) {
1749+
nrc.OnEndpointsUpdate(newObj)
1750+
},
1751+
DeleteFunc: func(obj interface{}) {
1752+
// don't do anything if an endpoints resource is deleted since
1753+
// the service delete event handles route withdrawls
1754+
return
1755+
},
1756+
}
1757+
}
1758+
15861759
// func (nrc *NetworkRoutingController) getExternalNodeIPs(
15871760

15881761
// NewNetworkRoutingController returns new NetworkRoutingController object
@@ -1703,7 +1876,10 @@ func NewNetworkRoutingController(clientset kubernetes.Interface,
17031876
}
17041877

17051878
nrc.svcLister = svcInformer.GetIndexer()
1879+
nrc.ServiceEventHandler = nrc.newServiceEventHandler()
1880+
17061881
nrc.epLister = epInformer.GetIndexer()
1882+
nrc.EndpointsEventHandler = nrc.newEndpointsEventHandler()
17071883

17081884
nrc.nodeLister = nodeInformer.GetIndexer()
17091885
nrc.NodeEventHandler = nrc.newNodeEventHandler()

0 commit comments

Comments
 (0)