Skip to content

Commit d042dbb

Browse files
author
Murali Reddy
committed
Add new Node api watcher which watches for add/remove nodes events.
On add/remove node events, perform refresh of peers to the peers as per the current set of active nodes. If a node is removed, delete the BGP nieghbor relation. Fixes #14
1 parent 3a3afe9 commit d042dbb

File tree

4 files changed

+217
-26
lines changed

4 files changed

+217
-26
lines changed

app/controllers/network_routes_controller.go

Lines changed: 108 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,9 @@ type NetworkRoutingController struct {
3535
asnNumber uint32
3636
peerAsnNumber uint32
3737
}
38+
var(
39+
activeNodes = make(map[string]bool)
40+
)
3841

3942
func (nrc *NetworkRoutingController) Run(stopCh <-chan struct{}, wg *sync.WaitGroup) {
4043

@@ -61,32 +64,8 @@ func (nrc *NetworkRoutingController) Run(stopCh <-chan struct{}, wg *sync.WaitGr
6164
defer t.Stop()
6265
defer wg.Done()
6366

64-
nodes, err := nrc.clientset.Core().Nodes().List(metav1.ListOptions{})
65-
if err != nil {
66-
glog.Errorf("Failed to list nodes: %s", err.Error())
67-
return
68-
}
69-
7067
glog.Infof("Starting network route controller")
7168

72-
// add the current set of nodes (excluding self) as BGP peers. Nodes form full mesh
73-
for _, node := range nodes.Items {
74-
nodeIP, _ := getNodeIP(&node)
75-
if nodeIP.String() == nrc.nodeIP.String() {
76-
continue
77-
}
78-
79-
n := &config.Neighbor{
80-
Config: config.NeighborConfig{
81-
NeighborAddress: nodeIP.String(),
82-
PeerAs: nrc.asnNumber,
83-
},
84-
}
85-
if err := nrc.bgpServer.AddNeighbor(n); err != nil {
86-
panic(err)
87-
}
88-
}
89-
9069
// if the global routing peer is configured then peer with it
9170
if len(nrc.peerRouter) != 0 {
9271
n := &config.Neighbor{
@@ -109,6 +88,9 @@ func (nrc *NetworkRoutingController) Run(stopCh <-chan struct{}, wg *sync.WaitGr
10988
default:
11089
}
11190

91+
// add the current set of nodes (excluding self) as BGP peers. Nodes form full mesh
92+
nrc.syncPeers()
93+
11294
// advertise cluster IP for the service to be reachable via host
11395
if nrc.advertiseClusterIp {
11496
glog.Infof("Advertising cluster ips")
@@ -209,7 +191,108 @@ func (nrc *NetworkRoutingController) injectRoute(path *table.Path) error {
209191
}
210192

211193
func (nrc *NetworkRoutingController) Cleanup() {
194+
}
195+
196+
// Refresh the peer relationship rest of the nodes in the cluster. Node add/remove
197+
// events should ensure peer relationship with only currently active nodes. In case
198+
// we miss any events from API server this method which is called periodically
199+
// ensure peer relationship with removed nodes is deleted.
200+
func (nrc *NetworkRoutingController) syncPeers() {
201+
202+
// get the current list of the nodes from API server
203+
nodes, err := nrc.clientset.Core().Nodes().List(metav1.ListOptions{})
204+
if err != nil {
205+
glog.Errorf("Failed to list nodes: %s", err.Error())
206+
return
207+
}
208+
209+
// establish peer with current set of nodes
210+
currentNodes := make([]string, 0)
211+
for _, node := range nodes.Items {
212+
nodeIP, _ := getNodeIP(&node)
213+
if nodeIP.String() == nrc.nodeIP.String() {
214+
continue
215+
}
216+
currentNodes = append(currentNodes, nodeIP.String())
217+
activeNodes[nodeIP.String()] = true
218+
n := &config.Neighbor{
219+
Config: config.NeighborConfig{
220+
NeighborAddress: nodeIP.String(),
221+
PeerAs: nrc.asnNumber,
222+
},
223+
}
224+
// TODO: check if a node is alredy added as nieighbour in a better way that add and catch error
225+
if err := nrc.bgpServer.AddNeighbor(n); err != nil {
226+
if !strings.Contains(err.Error(), "Can't overwrite the existing peer") {
227+
glog.Errorf("Failed to add node %s as peer due to %s", nodeIP.String(), err)
228+
}
229+
}
230+
}
231+
232+
// find the list of the node removed, from the last known list of active nodes
233+
removedNodes := make([]string, 0)
234+
for ip, _ := range activeNodes {
235+
stillActive := false
236+
for _, node := range currentNodes {
237+
if ip == node {
238+
stillActive = true
239+
break
240+
}
241+
}
242+
if !stillActive {
243+
removedNodes = append(removedNodes, ip)
244+
}
245+
}
212246

247+
// delete the neighbor for the node that is removed
248+
for _, ip := range removedNodes {
249+
n := &config.Neighbor{
250+
Config: config.NeighborConfig{
251+
NeighborAddress: ip,
252+
PeerAs: nrc.asnNumber,
253+
},
254+
}
255+
if err := nrc.bgpServer.DeleteNeighbor(n); err != nil {
256+
glog.Errorf("Failed to remove node %s as peer due to %s", ip, err)
257+
}
258+
delete(activeNodes, ip)
259+
}
260+
}
261+
262+
// Handle updates from Node watcher. Node watcher calls this method whenever there is
263+
// new node is added or old node is deleted. So peer up with new node and drop peering
264+
// from old node
265+
func (nrc *NetworkRoutingController) OnNodeUpdate(nodeUpdate *watchers.NodeUpdate) {
266+
nrc.mu.Lock()
267+
defer nrc.mu.Unlock()
268+
269+
node := nodeUpdate.Node
270+
nodeIP, _ := getNodeIP(node)
271+
if nodeUpdate.Op == watchers.ADD {
272+
glog.Infof("Received node %s added update from watch API so peer with new node", nodeIP)
273+
n := &config.Neighbor{
274+
Config: config.NeighborConfig{
275+
NeighborAddress: nodeIP.String(),
276+
PeerAs: nrc.asnNumber,
277+
},
278+
}
279+
if err := nrc.bgpServer.AddNeighbor(n); err != nil {
280+
glog.Errorf("Failed to add node %s as peer due to %s", nodeIP, err)
281+
}
282+
activeNodes[nodeIP.String()] = true
283+
} else if nodeUpdate.Op == watchers.REMOVE {
284+
glog.Infof("Received node %s removed update from watch API, so remove node from peer", nodeIP)
285+
n := &config.Neighbor{
286+
Config: config.NeighborConfig{
287+
NeighborAddress: nodeIP.String(),
288+
PeerAs: nrc.asnNumber,
289+
},
290+
}
291+
if err := nrc.bgpServer.DeleteNeighbor(n); err != nil {
292+
glog.Errorf("Failed to remove node %s as peer due to %s", nodeIP, err)
293+
}
294+
delete(activeNodes, nodeIP.String())
295+
}
213296
}
214297

215298
func NewNetworkRoutingController(clientset *kubernetes.Clientset, kubeRouterConfig *options.KubeRouterConfig) (*NetworkRoutingController, error) {
@@ -294,6 +377,7 @@ func NewNetworkRoutingController(clientset *kubernetes.Clientset, kubeRouterConf
294377
panic(err)
295378
}
296379

380+
watchers.NodeWatcher.RegisterHandler(&nrc)
297381
go nrc.watchBgpUpdates()
298382

299383
return &nrc, nil

app/server.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,11 @@ func (kr *KubeRouter) startApiWatchers() error {
8383
return errors.New("Failed to launch service api watcher: " + err.Error())
8484
}
8585

86+
_, err = watchers.StartNodeWatcher(kr.Client, kr.Config.ConfigSyncPeriod)
87+
if err != nil {
88+
return errors.New("Failed to launch nodes api watcher: " + err.Error())
89+
}
90+
8691
return nil
8792
}
8893

@@ -92,6 +97,7 @@ func (kr *KubeRouter) stopApiWatchers() {
9297
watchers.StopNetworkPolicyWatcher()
9398
watchers.StopNamespaceWatcher()
9499
watchers.StopServiceWatcher()
100+
watchers.StopNodeWatcher()
95101
}
96102

97103
func (kr *KubeRouter) Run() error {

app/watchers/namespace_watcher.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ func (nsw *namespaceWatcher) namespaceDeleteEventHandler(obj interface{}) {
4949
nsw.broadcaster.Notify(&NamespaceUpdate{Op: REMOVE, Namespace: namespace})
5050
}
5151

52-
func (nsw *namespaceWatcher) namespaceAUpdateEventHandler(oldObj, newObj interface{}) {
52+
func (nsw *namespaceWatcher) namespaceUpdateEventHandler(oldObj, newObj interface{}) {
5353
namespace, ok := newObj.(*api.Namespace)
5454
if !ok {
5555
return
@@ -81,7 +81,7 @@ func StartNamespaceWatcher(clientset *kubernetes.Clientset, resyncPeriod time.Du
8181
eventHandler := cache.ResourceEventHandlerFuncs{
8282
AddFunc: nsw.namespaceAddEventHandler,
8383
DeleteFunc: nsw.namespaceDeleteEventHandler,
84-
UpdateFunc: nsw.namespaceAUpdateEventHandler,
84+
UpdateFunc: nsw.namespaceUpdateEventHandler,
8585
}
8686

8787
nsw.clientset = clientset

app/watchers/node_watcher.go

Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
package watchers
2+
3+
import (
4+
"time"
5+
6+
"github.com/cloudnativelabs/kube-router/utils"
7+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
8+
"k8s.io/apimachinery/pkg/fields"
9+
"k8s.io/client-go/kubernetes"
10+
api "k8s.io/client-go/pkg/api/v1"
11+
cache "k8s.io/client-go/tools/cache"
12+
)
13+
14+
type NodeUpdate struct {
15+
Node *api.Node
16+
Op Operation
17+
}
18+
19+
var (
20+
NodeWatcher *nodeWatcher
21+
)
22+
23+
type nodeWatcher struct {
24+
clientset *kubernetes.Clientset
25+
nodeController cache.Controller
26+
nodeLister cache.Indexer
27+
broadcaster *utils.Broadcaster
28+
}
29+
30+
type NodeUpdatesHandler interface {
31+
OnNodeUpdate(nodeUpdate *NodeUpdate)
32+
}
33+
34+
func (nw *nodeWatcher) nodeAddEventHandler(obj interface{}) {
35+
node, ok := obj.(*api.Node)
36+
if !ok {
37+
return
38+
}
39+
nw.broadcaster.Notify(&NodeUpdate{Op: ADD, Node: node})
40+
}
41+
42+
func (nw *nodeWatcher) nodeDeleteEventHandler(obj interface{}) {
43+
node, ok := obj.(*api.Node)
44+
if !ok {
45+
return
46+
}
47+
nw.broadcaster.Notify(&NodeUpdate{Op: REMOVE, Node: node})
48+
}
49+
50+
func (nw *nodeWatcher) nodeUpdateEventHandler(oldObj, newObj interface{}) {
51+
// we are interested only node add/delete, so skip update
52+
return
53+
}
54+
55+
func (nw *nodeWatcher) RegisterHandler(handler NodeUpdatesHandler) {
56+
nw.broadcaster.Add(utils.ListenerFunc(func(instance interface{}) {
57+
handler.OnNodeUpdate(instance.(*NodeUpdate))
58+
}))
59+
}
60+
61+
func (nw *nodeWatcher) List() []*api.Node {
62+
obj_list := nw.nodeLister.List()
63+
node_instances := make([]*api.Node, len(obj_list))
64+
for i, ins := range obj_list {
65+
node_instances[i] = ins.(*api.Node)
66+
}
67+
return node_instances
68+
}
69+
70+
func (nw *nodeWatcher) HasSynced() bool {
71+
return nw.nodeController.HasSynced()
72+
}
73+
74+
var nodewatchStopCh chan struct{}
75+
76+
func StartNodeWatcher(clientset *kubernetes.Clientset, resyncPeriod time.Duration) (*nodeWatcher, error) {
77+
78+
nw := nodeWatcher{}
79+
NodeWatcher = &nw
80+
eventHandler := cache.ResourceEventHandlerFuncs{
81+
AddFunc: nw.nodeAddEventHandler,
82+
DeleteFunc: nw.nodeDeleteEventHandler,
83+
UpdateFunc: nw.nodeUpdateEventHandler,
84+
}
85+
86+
nw.clientset = clientset
87+
nw.broadcaster = utils.NewBroadcaster()
88+
lw := cache.NewListWatchFromClient(clientset.Core().RESTClient(), "nodes", metav1.NamespaceAll, fields.Everything())
89+
nw.nodeLister, nw.nodeController = cache.NewIndexerInformer(
90+
lw,
91+
&api.Node{}, resyncPeriod, eventHandler,
92+
cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
93+
)
94+
nodewatchStopCh = make(chan struct{})
95+
go nw.nodeController.Run(nodewatchStopCh)
96+
return &nw, nil
97+
}
98+
99+
func StopNodeWatcher() {
100+
nodewatchStopCh <- struct{}{}
101+
}

0 commit comments

Comments
 (0)