Skip to content

Commit 961d8ab

Browse files
mk01murali-reddy
authored andcommitted
fix #639 (#670)
* - refactor / clean up / extract code dupes into methods and reuse * - fix 639 - get external IPs to withdraw as diff against previous generation of service
1 parent 7b20ae9 commit 961d8ab

File tree

1 file changed

+43
-58
lines changed

1 file changed

+43
-58
lines changed

pkg/controllers/routing/ecmp_vip.go

Lines changed: 43 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import (
1111
"github.com/osrg/gobgp/table"
1212
v1core "k8s.io/api/core/v1"
1313
"k8s.io/client-go/tools/cache"
14+
"strings"
1415
)
1516

1617
// bgpAdvertiseVIP advertises the service vip (cluster ip or load balancer ip or external IP) the configured peers
@@ -62,26 +63,25 @@ func (nrc *NetworkRoutingController) withdrawVIPs(vips []string) {
6263
func (nrc *NetworkRoutingController) newServiceEventHandler() cache.ResourceEventHandler {
6364
return cache.ResourceEventHandlerFuncs{
6465
AddFunc: func(obj interface{}) {
65-
nrc.OnServiceUpdate(obj)
66+
nrc.OnServiceCreate(obj)
6667
},
6768
UpdateFunc: func(oldObj, newObj interface{}) {
68-
nrc.OnServiceUpdate(newObj)
69+
nrc.OnServiceUpdate(newObj, oldObj)
6970
},
7071
DeleteFunc: func(obj interface{}) {
7172
nrc.OnServiceDelete(obj)
7273
},
7374
}
7475
}
7576

76-
// OnServiceUpdate handles the service relates updates from the kubernetes API server
77-
func (nrc *NetworkRoutingController) OnServiceUpdate(obj interface{}) {
78-
svc, ok := obj.(*v1core.Service)
79-
if !ok {
77+
func getServiceObject(obj interface{}) (svc *v1core.Service) {
78+
if svc, _ = obj.(*v1core.Service); svc == nil {
8079
glog.Errorf("cache indexer returned obj that is not type *v1.Service")
81-
return
8280
}
81+
return
82+
}
8383

84-
glog.V(1).Infof("Received update to service: %s/%s from watch API", svc.Namespace, svc.Name)
84+
func (nrc *NetworkRoutingController) handleServiceUpdate(svc *v1core.Service) {
8585
if !nrc.bgpServerStarted {
8686
glog.V(3).Infof("Skipping update to service: %s/%s, controller still performing bootup full-sync", svc.Namespace, svc.Name)
8787
return
@@ -99,47 +99,49 @@ func (nrc *NetworkRoutingController) OnServiceUpdate(obj interface{}) {
9999
glog.Errorf("Error adding BGP export policies: %s", err.Error())
100100
}
101101

102-
if len(toAdvertise) > 0 {
103-
nrc.advertiseVIPs(toAdvertise)
104-
}
102+
nrc.advertiseVIPs(toAdvertise)
103+
nrc.withdrawVIPs(toWithdraw)
104+
}
105105

106-
if len(toWithdraw) > 0 {
107-
nrc.withdrawVIPs(toWithdraw)
106+
func (nrc *NetworkRoutingController) tryHandleServiceUpdate(obj interface{}, logMsgFormat string) {
107+
if svc := getServiceObject(obj); svc != nil {
108+
glog.V(1).Infof(logMsgFormat, svc.Namespace, svc.Name)
109+
nrc.handleServiceUpdate(svc)
108110
}
109111
}
110112

111-
// OnServiceDelete handles the service delete updates from the kubernetes API server
112-
func (nrc *NetworkRoutingController) OnServiceDelete(obj interface{}) {
113-
if !nrc.bgpServerStarted {
114-
return
115-
}
113+
// OnServiceCreate handles new service create event from the kubernetes API server
114+
func (nrc *NetworkRoutingController) OnServiceCreate(obj interface{}) {
115+
nrc.tryHandleServiceUpdate(obj, "Received new service: %s/%s from watch API")
116+
}
116117

117-
svc, ok := obj.(*v1core.Service)
118-
if !ok {
119-
glog.Errorf("cache indexer returned obj that is not type *v1.Service")
120-
return
121-
}
118+
// OnServiceUpdate handles the service relates updates from the kubernetes API server
119+
func (nrc *NetworkRoutingController) OnServiceUpdate(objNew interface{}, objOld interface{}) {
120+
nrc.tryHandleServiceUpdate(objNew, "Received update on service: %s/%s from watch API")
122121

123-
glog.V(1).Infof("Received event to delete service: %s/%s from watch API", svc.Namespace, svc.Name)
124-
toAdvertise, toWithdraw, err := nrc.getVIPsForService(svc, true)
125-
if err != nil {
126-
glog.Errorf("failed to get clean up routes for deleted service: %s/%s", svc.Namespace, svc.Name)
127-
return
128-
}
122+
nrc.withdrawVIPs(nrc.getWithdraw(getServiceObject(objOld), getServiceObject(objNew)))
123+
}
129124

130-
// update export policies so that deleted VIP's gets removed from clusteripprefixsit
131-
err = nrc.addExportPolicies()
132-
if err != nil {
133-
glog.Errorf("Error adding BGP export policies: %s", err.Error())
125+
func (nrc *NetworkRoutingController) getWithdraw(svcOld, svcNew *v1core.Service) (out []string) {
126+
if svcOld != nil && svcNew != nil {
127+
out = getMissingPrevGen(nrc.getExternalIps(svcOld), nrc.getExternalIps(svcNew))
134128
}
129+
return
130+
}
135131

136-
if len(toAdvertise) > 0 {
137-
nrc.withdrawVIPs(toAdvertise)
132+
func getMissingPrevGen(old, new []string) (withdrawIPs []string) {
133+
lookIn := " " + strings.Join(new, " ") + " "
134+
for _, s := range old {
135+
if !strings.Contains(lookIn, " "+s+" ") {
136+
withdrawIPs = append(withdrawIPs, s)
137+
}
138138
}
139+
return
140+
}
139141

140-
if len(toWithdraw) > 0 {
141-
nrc.withdrawVIPs(toWithdraw)
142-
}
142+
// OnServiceDelete handles the service delete updates from the kubernetes API server
143+
func (nrc *NetworkRoutingController) OnServiceDelete(obj interface{}) {
144+
nrc.tryHandleServiceUpdate(obj, "Received event to delete service: %s/%s from watch API")
143145
}
144146

145147
func (nrc *NetworkRoutingController) newEndpointsEventHandler() cache.ResourceEventHandler {
@@ -201,22 +203,10 @@ func (nrc *NetworkRoutingController) OnEndpointsUpdate(obj interface{}) {
201203
return
202204
}
203205

204-
toAdvertise, toWithdraw, err := nrc.getVIPsForService(svc, true)
205-
if err != nil {
206-
glog.Errorf("error getting routes for service: %s, err: %s", svc.Name, err)
207-
return
208-
}
209-
210-
if len(toAdvertise) > 0 {
211-
nrc.advertiseVIPs(toAdvertise)
212-
}
213-
214-
if len(toWithdraw) > 0 {
215-
nrc.withdrawVIPs(toWithdraw)
216-
}
206+
nrc.tryHandleServiceUpdate(svc, "Updating service %s/%s triggered by endpoint update event")
217207
}
218208

219-
func (nrc *NetworkRoutingController) serviceForEndpoints(ep *v1core.Endpoints) (*v1core.Service, error) {
209+
func (nrc *NetworkRoutingController) serviceForEndpoints(ep *v1core.Endpoints) (interface{}, error) {
220210
key, err := cache.MetaNamespaceKeyFunc(ep)
221211
if err != nil {
222212
return nil, err
@@ -231,12 +221,7 @@ func (nrc *NetworkRoutingController) serviceForEndpoints(ep *v1core.Endpoints) (
231221
return nil, fmt.Errorf("service resource doesn't exist for endpoints: %q", ep.Name)
232222
}
233223

234-
svc, ok := item.(*v1core.Service)
235-
if !ok {
236-
return nil, errors.New("type assertion failed for object in service indexer")
237-
}
238-
239-
return svc, nil
224+
return item, nil
240225
}
241226

242227
func (nrc *NetworkRoutingController) getClusterIp(svc *v1core.Service) string {

0 commit comments

Comments
 (0)