Skip to content

Commit 3a7370a

Browse files
author
Aritra Basu
committed
refactor: decouple routingServer into BGPWatcher, BGPHandler and routingHandler
The refactoring splits the monolithic routingServer into three focused components: i) BGPWatcher - observes GoBGP RIB state ii) BGPHandler - handles BGP protocol business logic iii) RoutingHandler - handles route installation business logic There is a clear separation of monitoring (BGPWatcher) and business logic: i) GoBGP programming via the BGPHandler ii) Linux Kernel programming via the routingHandler iii) VPP programming via the connectivityHandler Signed-off-by: Aritra Basu <[email protected]>
1 parent eb5adeb commit 3a7370a

File tree

12 files changed

+1261
-1160
lines changed

12 files changed

+1261
-1160
lines changed

calico-vpp-agent/cmd/calico_vpp_dataplane.go

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,6 @@ import (
3737
"github.com/projectcalico/vpp-dataplane/v3/calico-vpp-agent/common"
3838
"github.com/projectcalico/vpp-dataplane/v3/calico-vpp-agent/felix"
3939
"github.com/projectcalico/vpp-dataplane/v3/calico-vpp-agent/prometheus"
40-
"github.com/projectcalico/vpp-dataplane/v3/calico-vpp-agent/routing"
4140
"github.com/projectcalico/vpp-dataplane/v3/config"
4241

4342
watchdog "github.com/projectcalico/vpp-dataplane/v3/calico-vpp-agent/watch_dog"
@@ -138,10 +137,10 @@ func main() {
138137
prefixWatcher := watchers.NewPrefixWatcher(client, log.WithFields(logrus.Fields{"subcomponent": "prefix-watcher"}))
139138
bgpFilterWatcher := watchers.NewBGPFilterWatcher(clientv3, k8sclient, log.WithFields(logrus.Fields{"subcomponent": "BGPFilter-watcher"}))
140139
netWatcher := watchers.NewNetWatcher(vpp, log.WithFields(logrus.Fields{"component": "net-watcher"}))
141-
routingServer := routing.NewRoutingServer(vpp, bgpServer, log.WithFields(logrus.Fields{"component": "routing"}))
142140
prometheusServer := prometheus.NewPrometheusServer(vpp, log.WithFields(logrus.Fields{"component": "prometheus"}))
143141
localSIDWatcher := watchers.NewLocalSIDWatcher(vpp, clientv3, log.WithFields(logrus.Fields{"subcomponent": "localsid-watcher"}))
144142
felixServer := felix.NewFelixServer(vpp, clientv3, log.WithFields(logrus.Fields{"component": "policy"}))
143+
felixServer.SetBGPServer(bgpServer)
145144
felixWatcher := watchers.NewFelixWatcher(felixServer.GetFelixServerEventChan(), log.WithFields(logrus.Fields{"component": "felix watcher"}))
146145
cniServer := watchers.NewCNIServer(felixServer.GetFelixServerEventChan(), log.WithFields(logrus.Fields{"component": "cni"}))
147146
serviceServer := watchers.NewServiceServer(felixServer.GetFelixServerEventChan(), k8sclient, log.WithFields(logrus.Fields{"component": "services"}))
@@ -158,15 +157,14 @@ func main() {
158157
log.Fatalf("cannot get default BGP config %s", err)
159158
}
160159

161-
routingServer.SetBGPConf(bgpConf)
162160
felixServer.SetBGPConf(bgpConf)
163161

162+
bgpWatcher := watchers.NewBGPWatcher(felixServer.GetCache(), log.WithFields(logrus.Fields{"component": "bgp-watcher"}))
163+
bgpWatcher.SetBGPServer(bgpServer)
164+
bgpWatcher.SetBGPHandler(felixServer.GetBGPHandler())
165+
164166
secretWatcher := watchers.NewSecretWatcher(k8sclient, log.WithFields(logrus.Fields{"component": "secret-watcher"}))
165-
// Set secret watcher in peer handler
166167
felixServer.GetPeerHandler().SetSecretWatcher(secretWatcher)
167-
// Set peer handler in routing server
168-
routingServer.SetPeerHandler(felixServer.GetPeerHandler())
169-
// Create peer watcher
170168
peerWatcher := watchers.NewPeerWatcher(clientv3, felixServer.GetPeerHandler(), secretWatcher, log.WithFields(logrus.Fields{"component": "peer-watcher"}))
171169

172170
watchDog := watchdog.NewWatchDog(log.WithFields(logrus.Fields{"component": "watchDog"}), &t)
@@ -184,7 +182,8 @@ func main() {
184182
panic("ourBGPSpec is not *common.LocalNodeSpec")
185183
}
186184
prefixWatcher.SetOurBGPSpec(bgpSpec)
187-
routingServer.SetOurBGPSpec(bgpSpec)
185+
bgpWatcher.SetOurBGPSpec(bgpSpec)
186+
felixServer.GetRoutingHandler().SetOurBGPSpec(bgpSpec)
188187
localSIDWatcher.SetOurBGPSpec(bgpSpec)
189188
netWatcher.SetOurBGPSpec(bgpSpec)
190189
}
@@ -200,7 +199,8 @@ func main() {
200199
Go(prefixWatcher.WatchPrefix)
201200
Go(peerWatcher.WatchBGPPeers)
202201
Go(bgpFilterWatcher.WatchBGPFilters)
203-
Go(routingServer.ServeRouting)
202+
Go(bgpWatcher.WatchBGPPath)
203+
Go(felixServer.GetRoutingHandler().ServeRoutingHandler)
204204
Go(serviceServer.ServeService)
205205
Go(cniServer.ServeCNI)
206206
Go(prometheusServer.ServePrometheus)

calico-vpp-agent/felix/felix_server.go

Lines changed: 107 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@ import (
1919
"fmt"
2020
"net"
2121

22+
bgpapi "github.com/osrg/gobgp/v3/api"
23+
bgpserver "github.com/osrg/gobgp/v3/pkg/server"
2224
"github.com/pkg/errors"
2325
calicov3cli "github.com/projectcalico/calico/libcalico-go/lib/clientv3"
2426
"github.com/sirupsen/logrus"
@@ -33,8 +35,8 @@ import (
3335
"github.com/projectcalico/vpp-dataplane/v3/calico-vpp-agent/felix/cni/model"
3436
"github.com/projectcalico/vpp-dataplane/v3/calico-vpp-agent/felix/connectivity"
3537
"github.com/projectcalico/vpp-dataplane/v3/calico-vpp-agent/felix/policies"
38+
"github.com/projectcalico/vpp-dataplane/v3/calico-vpp-agent/felix/routing"
3639
"github.com/projectcalico/vpp-dataplane/v3/calico-vpp-agent/felix/services"
37-
"github.com/projectcalico/vpp-dataplane/v3/calico-vpp-agent/routing"
3840
"github.com/projectcalico/vpp-dataplane/v3/config"
3941
"github.com/projectcalico/vpp-dataplane/v3/vpplink"
4042
"github.com/projectcalico/vpp-dataplane/v3/vpplink/types"
@@ -61,6 +63,8 @@ type Server struct {
6163
connectivityHandler *connectivity.ConnectivityHandler
6264
serviceHandler *services.ServiceHandler
6365
peerHandler *routing.PeerHandler
66+
routingHandler *routing.RoutingHandler
67+
bgpHandler *routing.BGPHandler
6468
}
6569

6670
// NewFelixServer creates a felix server
@@ -80,6 +84,8 @@ func NewFelixServer(vpp *vpplink.VppLink, clientv3 calicov3cli.Interface, log *l
8084
connectivityHandler: connectivity.NewConnectivityHandler(vpp, cache, clientv3, log),
8185
serviceHandler: services.NewServiceHandler(vpp, cache, log),
8286
peerHandler: routing.NewPeerHandler(cache, log),
87+
routingHandler: routing.NewRoutingHandler(vpp, cache, log),
88+
bgpHandler: routing.NewBGPHandler(log),
8389
}
8490

8591
reg := common.RegisterHandler(server.felixServerEventChan, "felix server events")
@@ -94,6 +100,17 @@ func NewFelixServer(vpp *vpplink.VppLink, clientv3 calicov3cli.Interface, log *l
94100
common.ConnectivityDeleted,
95101
common.SRv6PolicyAdded,
96102
common.SRv6PolicyDeleted,
103+
common.BGPPathAdded,
104+
common.BGPPathDeleted,
105+
common.BGPPeerAdded,
106+
common.BGPPeerUpdated,
107+
common.BGPPeerDeleted,
108+
common.BGPFilterAddedOrUpdated,
109+
common.BGPFilterDeleted,
110+
common.BGPDefinedSetAdded,
111+
common.BGPDefinedSetDeleted,
112+
common.LocalPodAddressAdded,
113+
common.LocalPodAddressDeleted,
97114
)
98115

99116
return server
@@ -111,10 +128,22 @@ func (s *Server) GetPeerHandler() *routing.PeerHandler {
111128
return s.peerHandler
112129
}
113130

131+
func (s *Server) GetRoutingHandler() *routing.RoutingHandler {
132+
return s.routingHandler
133+
}
134+
135+
func (s *Server) GetBGPHandler() *routing.BGPHandler {
136+
return s.bgpHandler
137+
}
138+
114139
func (s *Server) SetBGPConf(bgpConf *calicov3.BGPConfigurationSpec) {
115140
s.cache.BGPConf = bgpConf
116141
}
117142

143+
func (s *Server) SetBGPServer(bgpServer *bgpserver.BgpServer) {
144+
s.bgpHandler.SetBGPServer(bgpServer)
145+
}
146+
118147
func (s *Server) getMainInterface() *config.UplinkStatus {
119148
for _, i := range common.VppManagerInfo.UplinkStatuses {
120149
if i.IsMain {
@@ -378,39 +407,97 @@ func (s *Server) handleFelixServerEvents(msg interface{}) (err error) {
378407
case common.ConnectivityAdded:
379408
new, ok := evt.New.(*common.NodeConnectivity)
380409
if !ok {
381-
s.log.Errorf("evt.New is not a *common.NodeConnectivity %v", evt.New)
382-
}
383-
err := s.connectivityHandler.UpdateIPConnectivity(new, false /* isWithdraw */)
384-
if err != nil {
385-
s.log.Errorf("Error while adding connectivity %s", err)
410+
return fmt.Errorf("evt.New is not a (*common.NodeConnectivity) %v", evt.New)
386411
}
412+
err = s.connectivityHandler.UpdateIPConnectivity(new, false /* isWithdraw */)
387413
case common.ConnectivityDeleted:
388414
old, ok := evt.Old.(*common.NodeConnectivity)
389415
if !ok {
390-
s.log.Errorf("evt.Old is not a *common.NodeConnectivity %v", evt.Old)
391-
}
392-
err := s.connectivityHandler.UpdateIPConnectivity(old, true /* isWithdraw */)
393-
if err != nil {
394-
s.log.Errorf("Error while deleting connectivity %s", err)
416+
return fmt.Errorf("evt.Old is not a (*common.NodeConnectivity) %v", evt.Old)
395417
}
418+
err = s.connectivityHandler.UpdateIPConnectivity(old, true /* isWithdraw */)
396419
case common.SRv6PolicyAdded:
397420
new, ok := evt.New.(*common.NodeConnectivity)
398421
if !ok {
399-
s.log.Errorf("evt.New is not a *common.NodeConnectivity %v", evt.New)
400-
}
401-
err := s.connectivityHandler.UpdateSRv6Policy(new, false /* isWithdraw */)
402-
if err != nil {
403-
s.log.Errorf("Error while adding SRv6 Policy %s", err)
422+
return fmt.Errorf("evt.New is not a (*common.NodeConnectivity) %v", evt.New)
404423
}
424+
err = s.connectivityHandler.UpdateSRv6Policy(new, false /* isWithdraw */)
405425
case common.SRv6PolicyDeleted:
406426
old, ok := evt.Old.(*common.NodeConnectivity)
407427
if !ok {
408-
s.log.Errorf("evt.Old is not a *common.NodeConnectivity %v", evt.Old)
428+
return fmt.Errorf("evt.Old is not a (*common.NodeConnectivity) %v", evt.Old)
409429
}
410-
err := s.connectivityHandler.UpdateSRv6Policy(old, true /* isWithdraw */)
411-
if err != nil {
412-
s.log.Errorf("Error while deleting SRv6 Policy %s", err)
430+
err = s.connectivityHandler.UpdateSRv6Policy(old, true /* isWithdraw */)
431+
case common.BGPPathAdded:
432+
path, ok := evt.New.(*bgpapi.Path)
433+
if !ok {
434+
return fmt.Errorf("evt.New is not a (*bgpapi.Path) %v", evt.New)
435+
}
436+
err = s.bgpHandler.HandleBGPPathAdded(path)
437+
case common.BGPPathDeleted:
438+
path, ok := evt.Old.(*bgpapi.Path)
439+
if !ok {
440+
return fmt.Errorf("evt.Old is not a (*bgpapi.Path) %v", evt.Old)
441+
}
442+
err = s.bgpHandler.HandleBGPPathDeleted(path)
443+
case common.BGPPeerAdded:
444+
peer, ok := evt.New.(*routing.LocalBGPPeer)
445+
if !ok {
446+
return fmt.Errorf("evt.New is not a (*routing.LocalBGPPeer) %v", evt.New)
447+
}
448+
err = s.bgpHandler.HandleBGPPeerAdded(peer)
449+
case common.BGPPeerUpdated:
450+
newPeer, ok := evt.New.(*routing.LocalBGPPeer)
451+
if !ok {
452+
return fmt.Errorf("evt.New is not a (*routing.LocalBGPPeer) %v", evt.New)
453+
}
454+
oldPeer, ok := evt.Old.(*routing.LocalBGPPeer)
455+
if !ok {
456+
return fmt.Errorf("evt.Old is not a (*routing.LocalBGPPeer) %v", evt.Old)
457+
}
458+
err = s.bgpHandler.HandleBGPPeerUpdated(newPeer, oldPeer)
459+
case common.BGPPeerDeleted:
460+
peerIP, ok := evt.Old.(string)
461+
if !ok {
462+
return fmt.Errorf("evt.Old is not a string %v", evt.Old)
463+
}
464+
err = s.bgpHandler.HandleBGPPeerDeleted(peerIP)
465+
case common.BGPFilterAddedOrUpdated:
466+
filter, ok := evt.New.(calicov3.BGPFilter)
467+
if !ok {
468+
return fmt.Errorf("evt.New is not a (calicov3.BGPFilter) %v", evt.New)
469+
}
470+
err = s.bgpHandler.HandleBGPFilterAddedOrUpdated(filter)
471+
case common.BGPFilterDeleted:
472+
filter, ok := evt.Old.(calicov3.BGPFilter)
473+
if !ok {
474+
return fmt.Errorf("evt.Old is not a (calicov3.BGPFilter) %v", evt.Old)
475+
}
476+
err = s.bgpHandler.HandleBGPFilterDeleted(filter)
477+
case common.BGPDefinedSetAdded:
478+
definedSet, ok := evt.New.(*bgpapi.DefinedSet)
479+
if !ok {
480+
return fmt.Errorf("evt.New is not a (*bgpapi.DefinedSet) %v", evt.New)
481+
}
482+
err = s.bgpHandler.HandleBGPDefinedSetAdded(definedSet)
483+
case common.BGPDefinedSetDeleted:
484+
definedSet, ok := evt.Old.(*bgpapi.DefinedSet)
485+
if !ok {
486+
return fmt.Errorf("evt.Old is not a (*bgpapi.DefinedSet) %v", evt.Old)
487+
}
488+
err = s.bgpHandler.HandleBGPDefinedSetDeleted(definedSet)
489+
case common.LocalPodAddressAdded:
490+
networkPod, ok := evt.New.(cni.NetworkPod)
491+
if !ok {
492+
return fmt.Errorf("evt.New is not a (cni.NetworkPod) %v", evt.New)
493+
}
494+
err = s.routingHandler.AnnounceLocalAddress(networkPod.ContainerIP, networkPod.NetworkVni)
495+
case common.LocalPodAddressDeleted:
496+
networkPod, ok := evt.Old.(cni.NetworkPod)
497+
if !ok {
498+
return fmt.Errorf("evt.Old is not a (cni.NetworkPod) %v", evt.Old)
413499
}
500+
err = s.routingHandler.WithdrawLocalAddress(networkPod.ContainerIP, networkPod.NetworkVni)
414501
default:
415502
s.log.Warnf("Unhandled CalicoVppEvent.Type: %s", evt.Type)
416503
}

0 commit comments

Comments
 (0)