Skip to content

Commit b6d1d35

Browse files
author
Aritra Basu
committed
refactor: decouple routingServer into BGPWatcher and routingHandler
This commit replaces the routingServer with separate BGPWatcher and routingHandler components coordinated by a central BGP manager. This eliminates the pubsub event-driven model in favor of direct component communication via dedicated handler interfaces. Key changes: - Split routingServer responsibilities between BGPWatcher (monitoring) and routingHandler (VPP programming) - Add BGPManager as central orchestrator coordinating both components - BGPWatcher now focuses only on monitoring GoBGP without Felix cache access - routingHandler retains Felix cache access for VPP programming decisions - Replace multiple pubsub events with direct interface calls with the goal of eventually moving away from the pubsub infra Signed-off-by: Aritra Basu <[email protected]>
1 parent 2c22c24 commit b6d1d35

21 files changed

+1720
-1281
lines changed

calico-vpp-agent/cmd/calico_vpp_dataplane.go

Lines changed: 19 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -36,8 +36,8 @@ import (
3636

3737
"github.com/projectcalico/vpp-dataplane/v3/calico-vpp-agent/common"
3838
"github.com/projectcalico/vpp-dataplane/v3/calico-vpp-agent/felix"
39+
"github.com/projectcalico/vpp-dataplane/v3/calico-vpp-agent/felix/routing"
3940
"github.com/projectcalico/vpp-dataplane/v3/calico-vpp-agent/prometheus"
40-
"github.com/projectcalico/vpp-dataplane/v3/calico-vpp-agent/routing"
4141
"github.com/projectcalico/vpp-dataplane/v3/config"
4242

4343
watchdog "github.com/projectcalico/vpp-dataplane/v3/calico-vpp-agent/watch_dog"
@@ -138,7 +138,6 @@ func main() {
138138
prefixWatcher := watchers.NewPrefixWatcher(client, log.WithFields(logrus.Fields{"subcomponent": "prefix-watcher"}))
139139
bgpFilterWatcher := watchers.NewBGPFilterWatcher(clientv3, k8sclient, log.WithFields(logrus.Fields{"subcomponent": "BGPFilter-watcher"}))
140140
netWatcher := watchers.NewNetWatcher(vpp, log.WithFields(logrus.Fields{"component": "net-watcher"}))
141-
routingServer := routing.NewRoutingServer(vpp, bgpServer, log.WithFields(logrus.Fields{"component": "routing"}))
142141
prometheusServer := prometheus.NewPrometheusServer(vpp, log.WithFields(logrus.Fields{"component": "prometheus"}))
143142
localSIDWatcher := watchers.NewLocalSIDWatcher(vpp, clientv3, log.WithFields(logrus.Fields{"subcomponent": "localsid-watcher"}))
144143
felixServer := felix.NewFelixServer(vpp, clientv3, log.WithFields(logrus.Fields{"component": "policy"}))
@@ -158,13 +157,23 @@ func main() {
158157
log.Fatalf("cannot get default BGP config %s", err)
159158
}
160159

161-
peerManager, err := felix.NewPeerManager(clientv3, k8sclient, felixServer, log.WithFields(logrus.Fields{"component": "peer-manager"}))
162-
if err != nil {
163-
log.Fatalf("could not create peer manager: %s", err)
164-
}
160+
// Create peer manager
161+
// This also creates (i) secret watcher (ii) peer watcher (iii) peer handler with the peer manager as the orchestrator
162+
peerManager := routing.NewPeerManager(clientv3, k8sclient, felixServer, log.WithFields(logrus.Fields{"component": "peer-manager"}))
163+
164+
// Create BGP manager
165+
// This also creates (i) BGP watcher (ii) BGP handler (iii) routing handler with the BGP manager as the orchestrator
166+
bgpManager := routing.NewBGPManager(bgpServer, vpp, felixServer.GetFelixServerEventChan(), log.WithFields(logrus.Fields{"component": "bgp-manager"}))
167+
168+
// Wire up BGP handlers after all components are created
169+
prefixWatcher.SetBGPDefinedSetHandler(bgpManager)
170+
prefixWatcher.SetBGPPathHandler(bgpManager)
171+
bgpFilterWatcher.SetBGPFilterHandler(bgpManager)
172+
localSIDWatcher.SetBGPPathHandler(bgpManager)
173+
peerManager.SetBGPPeerHandler(bgpManager)
165174

166-
routingServer.SetBGPConf(bgpConf)
167175
peerManager.SetBGPConf(bgpConf)
176+
bgpManager.SetBGPConf(bgpConf)
168177

169178
watchDog := watchdog.NewWatchDog(log.WithFields(logrus.Fields{"component": "watchDog"}), &t)
170179
Go(felixServer.ServeFelix)
@@ -181,7 +190,7 @@ func main() {
181190
panic("ourBGPSpec is not *common.LocalNodeSpec")
182191
}
183192
prefixWatcher.SetOurBGPSpec(bgpSpec)
184-
routingServer.SetOurBGPSpec(bgpSpec)
193+
bgpManager.SetOurBGPSpec(bgpSpec)
185194
localSIDWatcher.SetOurBGPSpec(bgpSpec)
186195
netWatcher.SetOurBGPSpec(bgpSpec)
187196
}
@@ -197,7 +206,8 @@ func main() {
197206
Go(prefixWatcher.WatchPrefix)
198207
Go(peerManager.Start)
199208
Go(bgpFilterWatcher.WatchBGPFilters)
200-
Go(routingServer.ServeRouting)
209+
Go(bgpManager.StartBGPWatcher)
210+
Go(bgpManager.StartRoutingHandler)
201211
Go(serviceServer.ServeService)
202212
Go(cniServer.ServeCNI)
203213
Go(prometheusServer.ServePrometheus)

calico-vpp-agent/common/bgp.go

Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
// Copyright (C) 2025 Cisco Systems Inc.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
12+
// implied.
13+
// See the License for the specific language governing permissions and
14+
// limitations under the License.
15+
16+
package common
17+
18+
import (
19+
bgpapi "github.com/osrg/gobgp/v3/api"
20+
calicov3 "github.com/projectcalico/api/pkg/apis/projectcalico/v3"
21+
)
22+
23+
// LocalBGPPeer represents a BGP peer with its configuration and policies
24+
type LocalBGPPeer struct {
25+
Peer *bgpapi.Peer
26+
BGPFilterNames []string
27+
BGPPolicies map[string]*ImpExpPol
28+
NeighborSet *bgpapi.DefinedSet
29+
}
30+
31+
// BGPPrefixesPolicyAndAssignment contains BGP policy and prefix information
32+
type BGPPrefixesPolicyAndAssignment struct {
33+
PolicyAssignment *bgpapi.PolicyAssignment
34+
Policy *bgpapi.Policy
35+
Prefixes []*bgpapi.DefinedSet
36+
}
37+
38+
// ImpExpPol contains import and export policies
39+
type ImpExpPol struct {
40+
Imp *BGPPrefixesPolicyAndAssignment
41+
Exp *BGPPrefixesPolicyAndAssignment
42+
}
43+
44+
// BGPConnectivityHandler interface for handling connectivity events
45+
type BGPConnectivityHandler interface {
46+
OnConnectivityAdded(connectivity *NodeConnectivity) error
47+
OnConnectivityDeleted(connectivity *NodeConnectivity) error
48+
OnSRv6PolicyAdded(connectivity *NodeConnectivity) error
49+
OnSRv6PolicyDeleted(connectivity *NodeConnectivity) error
50+
}
51+
52+
// BGPPathHandler interface for handling BGP path operations
53+
type BGPPathHandler interface {
54+
HandleBGPPathAdded(path *bgpapi.Path) error
55+
HandleBGPPathDeleted(path *bgpapi.Path) error
56+
}
57+
58+
// BGPPeerHandler interface for handling BGP peer operations
59+
type BGPPeerHandler interface {
60+
HandleBGPPeerAdded(localPeer *LocalBGPPeer) error
61+
HandleBGPPeerUpdated(localPeer *LocalBGPPeer, oldPeer *LocalBGPPeer) error
62+
HandleBGPPeerDeleted(peerIP string) error
63+
}
64+
65+
// BGPFilterHandler interface for handling BGP filter operations
66+
type BGPFilterHandler interface {
67+
HandleBGPFilterAddedOrUpdated(filter calicov3.BGPFilter) error
68+
HandleBGPFilterDeleted(filter calicov3.BGPFilter) error
69+
}
70+
71+
// BGPDefinedSetHandler interface for handling BGP defined set operations
72+
type BGPDefinedSetHandler interface {
73+
HandleBGPDefinedSetAdded(definedSet *bgpapi.DefinedSet) error
74+
HandleBGPDefinedSetDeleted(definedSet *bgpapi.DefinedSet) error
75+
}
76+
77+
// BGPHandler interface for handling different BGP operations
78+
type BGPHandler interface {
79+
BGPPathHandler
80+
BGPPeerHandler
81+
BGPFilterHandler
82+
BGPDefinedSetHandler
83+
84+
SetBGPConnectivityHandler(handler BGPConnectivityHandler)
85+
InjectRoute(path *bgpapi.Path) error
86+
InjectSRv6Policy(path *bgpapi.Path) error
87+
InitialPolicySetting(isv6 bool) error
88+
}

calico-vpp-agent/common/common.go

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@ import (
2121
"math"
2222
"net"
2323
"os"
24+
"reflect"
25+
"sort"
2426
"strconv"
2527
"strings"
2628
"time"
@@ -534,3 +536,17 @@ func CompareIPList(newIPList, oldIPList []net.IP) (added []net.IP, deleted []net
534536
changed = len(added)+len(deleted) > 0
535537
return
536538
}
539+
540+
// CompareStringSlices compares two string slices for equality (order-independent)
541+
func CompareStringSlices(slice1, slice2 []string) bool {
542+
if len(slice1) != len(slice2) {
543+
return false
544+
}
545+
546+
// Sort the slices in ascending order
547+
sort.Strings(slice1)
548+
sort.Strings(slice2)
549+
550+
// Compare the sorted slices
551+
return reflect.DeepEqual(slice1, slice2)
552+
}

calico-vpp-agent/common/pubsub.go

Lines changed: 1 addition & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,6 @@
1616
package common
1717

1818
import (
19-
"fmt"
20-
2119
log "github.com/sirupsen/logrus"
2220
)
2321

@@ -29,12 +27,6 @@ const (
2927
IpamConfChanged CalicoVppEventType = "IpamConfChanged"
3028
BGPConfChanged CalicoVppEventType = "BGPConfChanged"
3129

32-
ConnectivityAdded CalicoVppEventType = "ConnectivityAdded"
33-
ConnectivityDeleted CalicoVppEventType = "ConnectivityDeleted"
34-
35-
SRv6PolicyAdded CalicoVppEventType = "SRv6PolicyAdded"
36-
SRv6PolicyDeleted CalicoVppEventType = "SRv6PolicyDeleted"
37-
3830
PodAdded CalicoVppEventType = "PodAdded"
3931
PodDeleted CalicoVppEventType = "PodDeleted"
4032

@@ -44,19 +36,6 @@ const (
4436
TunnelAdded CalicoVppEventType = "TunnelAdded"
4537
TunnelDeleted CalicoVppEventType = "TunnelDeleted"
4638

47-
BGPPeerAdded CalicoVppEventType = "BGPPeerAdded"
48-
BGPPeerDeleted CalicoVppEventType = "BGPPeerDeleted"
49-
BGPPeerUpdated CalicoVppEventType = "BGPPeerUpdated"
50-
51-
BGPFilterAddedOrUpdated CalicoVppEventType = "BGPFilterAddedOrUpdated"
52-
BGPFilterDeleted CalicoVppEventType = "BGPFilterDeleted"
53-
54-
BGPDefinedSetAdded CalicoVppEventType = "BGPDefinedSetAdded"
55-
BGPDefinedSetDeleted CalicoVppEventType = "BGPDefinedSetDeleted"
56-
57-
BGPPathAdded CalicoVppEventType = "BGPPathAdded"
58-
BGPPathDeleted CalicoVppEventType = "BGPPathDeleted"
59-
6039
NetAddedOrUpdated CalicoVppEventType = "NetAddedOrUpdated"
6140
NetDeleted CalicoVppEventType = "NetDeleted"
6241
NetsSynced CalicoVppEventType = "NetsSynced"
@@ -106,18 +85,8 @@ func RegisterHandler(channel chan any, name string) *PubSubHandlerRegistration {
10685
return reg
10786
}
10887

109-
func redactPassword(event CalicoVppEvent) string {
110-
switch event.Type {
111-
case BGPPeerAdded:
112-
return string(event.Type)
113-
default:
114-
return fmt.Sprintf("%+v", event)
115-
}
116-
117-
}
118-
11988
func SendEvent(event CalicoVppEvent) {
120-
ThePubSub.log.Debugf("Broadcasting event %s", redactPassword(event))
89+
ThePubSub.log.Debugf("Broadcasting event %+v", event)
12190
for _, reg := range ThePubSub.pubSubHandlerRegistrations {
12291
if reg.expectedEvents[event.Type] {
12392
reg.channel <- event

calico-vpp-agent/felix/felix_server.go

Lines changed: 0 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -87,10 +87,6 @@ func NewFelixServer(vpp *vpplink.VppLink, clientv3 calicov3cli.Interface, log *l
8787
common.TunnelDeleted,
8888
common.NetAddedOrUpdated,
8989
common.NetDeleted,
90-
common.ConnectivityAdded,
91-
common.ConnectivityDeleted,
92-
common.SRv6PolicyAdded,
93-
common.SRv6PolicyDeleted,
9490
)
9591

9692
return server
@@ -369,42 +365,6 @@ func (s *Server) handleFelixServerEvents(msg interface{}) (err error) {
369365
return fmt.Errorf("evt.Old not a uint32 %v", evt.Old)
370366
}
371367
s.policiesHandler.OnTunnelDelete(swIfIndex)
372-
case common.ConnectivityAdded:
373-
new, ok := evt.New.(*common.NodeConnectivity)
374-
if !ok {
375-
s.log.Errorf("evt.New is not a *common.NodeConnectivity %v", evt.New)
376-
}
377-
err := s.connectivityHandler.UpdateIPConnectivity(new, false /* isWithdraw */)
378-
if err != nil {
379-
s.log.Errorf("Error while adding connectivity %s", err)
380-
}
381-
case common.ConnectivityDeleted:
382-
old, ok := evt.Old.(*common.NodeConnectivity)
383-
if !ok {
384-
s.log.Errorf("evt.Old is not a *common.NodeConnectivity %v", evt.Old)
385-
}
386-
err := s.connectivityHandler.UpdateIPConnectivity(old, true /* isWithdraw */)
387-
if err != nil {
388-
s.log.Errorf("Error while deleting connectivity %s", err)
389-
}
390-
case common.SRv6PolicyAdded:
391-
new, ok := evt.New.(*common.NodeConnectivity)
392-
if !ok {
393-
s.log.Errorf("evt.New is not a *common.NodeConnectivity %v", evt.New)
394-
}
395-
err := s.connectivityHandler.UpdateSRv6Policy(new, false /* isWithdraw */)
396-
if err != nil {
397-
s.log.Errorf("Error while adding SRv6 Policy %s", err)
398-
}
399-
case common.SRv6PolicyDeleted:
400-
old, ok := evt.Old.(*common.NodeConnectivity)
401-
if !ok {
402-
s.log.Errorf("evt.Old is not a *common.NodeConnectivity %v", evt.Old)
403-
}
404-
err := s.connectivityHandler.UpdateSRv6Policy(old, true /* isWithdraw */)
405-
if err != nil {
406-
s.log.Errorf("Error while deleting SRv6 Policy %s", err)
407-
}
408368
default:
409369
s.log.Warnf("Unhandled CalicoVppEvent.Type: %s", evt.Type)
410370
}

0 commit comments

Comments
 (0)