Skip to content

Commit 4574268

Browse files
author
Aritra Basu
committed
refactor: split routeWatcher into watcher and handler components
Signed-off-by: Aritra Basu <[email protected]>
1 parent 05e1278 commit 4574268

File tree

7 files changed

+220
-155
lines changed

7 files changed

+220
-155
lines changed

calico-vpp-agent/cmd/calico_vpp_dataplane.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -147,6 +147,9 @@ func main() {
147147
cniServer := watchers.NewCNIServer(felixServer.GetFelixServerEventChan(), log.WithFields(logrus.Fields{"component": "cni"}))
148148
serviceServer := watchers.NewServiceServer(felixServer.GetFelixServerEventChan(), k8sclient, log.WithFields(logrus.Fields{"component": "services"}))
149149

150+
felixServer.GetRouteHandler().SetRouteWatcher(routeWatcher)
151+
netWatcher.SetRouteHandler(felixServer.GetRouteHandler())
152+
150153
err = watchers.InstallFelixPlugin()
151154
if err != nil {
152155
log.Fatalf("could not install felix plugin: %s", err)

calico-vpp-agent/common/pubsub.go

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@ const (
2727
ChanSize = 500
2828

2929
PeerNodeStateChanged CalicoVppEventType = "PeerNodeStateChanged"
30-
IpamConfChanged CalicoVppEventType = "IpamConfChanged"
3130
BGPConfChanged CalicoVppEventType = "BGPConfChanged"
3231

3332
ConnectivityAdded CalicoVppEventType = "ConnectivityAdded"
@@ -61,7 +60,6 @@ const (
6160

6261
NetAddedOrUpdated CalicoVppEventType = "NetAddedOrUpdated"
6362
NetDeleted CalicoVppEventType = "NetDeleted"
64-
NetsSynced CalicoVppEventType = "NetsSynced"
6563

6664
IpamPoolUpdate CalicoVppEventType = "IpamPoolUpdate"
6765
IpamPoolRemove CalicoVppEventType = "IpamPoolRemove"

calico-vpp-agent/felix/felix_server.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@ type Server struct {
5959
cniHandler *cni.CNIHandler
6060
connectivityHandler *connectivity.ConnectivityHandler
6161
serviceHandler *services.ServiceHandler
62+
routeHandler *RouteHandler
6263
}
6364

6465
// NewFelixServer creates a felix server
@@ -77,6 +78,7 @@ func NewFelixServer(vpp *vpplink.VppLink, clientv3 calicov3cli.Interface, log *l
7778
cniHandler: cni.NewCNIHandler(vpp, cache, log),
7879
connectivityHandler: connectivity.NewConnectivityHandler(vpp, cache, clientv3, log),
7980
serviceHandler: services.NewServiceHandler(vpp, cache, log),
81+
routeHandler: NewRouteHandler(log),
8082
}
8183

8284
reg := common.RegisterHandler(server.felixServerEventChan, "felix server events")
@@ -96,6 +98,10 @@ func NewFelixServer(vpp *vpplink.VppLink, clientv3 calicov3cli.Interface, log *l
9698
return server
9799
}
98100

101+
func (s *Server) GetRouteHandler() *RouteHandler {
102+
return s.routeHandler
103+
}
104+
99105
func (s *Server) GetFelixServerEventChan() chan any {
100106
return s.felixServerEventChan
101107
}

calico-vpp-agent/felix/ipam.go

Lines changed: 18 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,6 @@ import (
2121
"github.com/pkg/errors"
2222

2323
"github.com/projectcalico/calico/felix/proto"
24-
25-
"github.com/projectcalico/vpp-dataplane/v3/calico-vpp-agent/common"
2624
)
2725

2826
func (s *Server) handleIpamPoolUpdate(msg *proto.IPAMPoolUpdate) (err error) {
@@ -49,11 +47,12 @@ func (s *Server) handleIpamPoolUpdate(msg *proto.IPAMPoolUpdate) (err error) {
4947
}
5048
s.connectivityHandler.OnIpamConfChanged(oldIpamPool, newIpamPool)
5149
s.cniHandler.OnIpamConfChanged(oldIpamPool, newIpamPool)
52-
common.SendEvent(common.CalicoVppEvent{
53-
Type: common.IpamConfChanged,
54-
Old: ipamPoolCopy(oldIpamPool),
55-
New: ipamPoolCopy(newIpamPool),
56-
})
50+
if s.routeHandler != nil {
51+
err := s.routeHandler.OnIpamConfChanged(oldIpamPool, newIpamPool)
52+
if err != nil {
53+
s.log.Errorf("Failed to handle IPAM update in RouteHandler: %v", err)
54+
}
55+
}
5756
}
5857
} else {
5958
s.log.Infof("Adding pool: %s, nat:%t", msg.GetId(), newIpamPool.GetMasquerade())
@@ -65,10 +64,12 @@ func (s *Server) handleIpamPoolUpdate(msg *proto.IPAMPoolUpdate) (err error) {
6564
}
6665
s.connectivityHandler.OnIpamConfChanged(nil /*old*/, newIpamPool)
6766
s.cniHandler.OnIpamConfChanged(nil /*old*/, newIpamPool)
68-
common.SendEvent(common.CalicoVppEvent{
69-
Type: common.IpamConfChanged,
70-
New: ipamPoolCopy(newIpamPool),
71-
})
67+
if s.routeHandler != nil {
68+
err := s.routeHandler.OnIpamConfChanged(nil, newIpamPool)
69+
if err != nil {
70+
s.log.Errorf("Failed to handle IPAM addition in RouteHandler: %v", err)
71+
}
72+
}
7273
}
7374
return nil
7475
}
@@ -88,13 +89,14 @@ func (s *Server) handleIpamPoolRemove(msg *proto.IPAMPoolRemove) (err error) {
8889
if err != nil {
8990
return errors.Wrap(err, "error handling ipam deletion")
9091
}
91-
common.SendEvent(common.CalicoVppEvent{
92-
Type: common.IpamConfChanged,
93-
Old: ipamPoolCopy(oldIpamPool),
94-
New: nil,
95-
})
9692
s.connectivityHandler.OnIpamConfChanged(oldIpamPool, nil /* new */)
9793
s.cniHandler.OnIpamConfChanged(oldIpamPool, nil /* new */)
94+
if s.routeHandler != nil {
95+
err := s.routeHandler.OnIpamConfChanged(oldIpamPool, nil)
96+
if err != nil {
97+
s.log.Errorf("Failed to handle IPAM deletion in RouteHandler: %v", err)
98+
}
99+
}
98100
} else {
99101
s.log.Warnf("Deleting unknown ippool")
100102
return nil
Lines changed: 149 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,149 @@
1+
// Copyright (C) 2025 Cisco Systems Inc.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
12+
// implied.
13+
// See the License for the specific language governing permissions and
14+
// limitations under the License.
15+
16+
package felix
17+
18+
import (
19+
"net"
20+
"syscall"
21+
22+
"github.com/pkg/errors"
23+
"github.com/projectcalico/calico/felix/proto"
24+
"github.com/sirupsen/logrus"
25+
"github.com/vishvananda/netlink"
26+
27+
"github.com/projectcalico/vpp-dataplane/v3/calico-vpp-agent/common"
28+
"github.com/projectcalico/vpp-dataplane/v3/calico-vpp-agent/watchers"
29+
)
30+
31+
// RouteHandler handles network and IPAM events by updating VPP routing configuration
32+
type RouteHandler struct {
33+
log *logrus.Entry
34+
routeWatcher *watchers.RouteWatcher
35+
}
36+
37+
// NewRouteHandler creates a new RouteHandler instance
38+
func NewRouteHandler(log *logrus.Entry) *RouteHandler {
39+
return &RouteHandler{
40+
log: log,
41+
routeWatcher: nil,
42+
}
43+
}
44+
45+
// SetRouteWatcher sets the route watcher for performing route operations
46+
func (h *RouteHandler) SetRouteWatcher(routeWatcher *watchers.RouteWatcher) {
47+
h.routeWatcher = routeWatcher
48+
}
49+
50+
// OnNetDeleted handles network deletion events
51+
func (h *RouteHandler) OnNetDeleted(netDef *common.NetworkDefinition) error {
52+
key := netDef.Range
53+
routes, err := h.getNetworkRoute(key, netDef.PhysicalNetworkName)
54+
if err != nil {
55+
h.log.Errorf("Error getting route from network deletion: %v", err)
56+
return err
57+
}
58+
for _, route := range routes {
59+
err = h.routeWatcher.DelRoute(route)
60+
if err != nil {
61+
h.log.Errorf("Cannot delete pool route %s through vpp tap: %v", key, err)
62+
return err
63+
}
64+
}
65+
return nil
66+
}
67+
68+
// OnNetAddedOrUpdated handles network addition/update events
69+
func (h *RouteHandler) OnNetAddedOrUpdated(netDef *common.NetworkDefinition) error {
70+
key := netDef.Range
71+
routes, err := h.getNetworkRoute(key, netDef.PhysicalNetworkName)
72+
if err != nil {
73+
h.log.Errorf("Error getting route from network addition/update: %v", err)
74+
return err
75+
}
76+
for _, route := range routes {
77+
err = h.routeWatcher.AddRoute(route)
78+
if err != nil {
79+
h.log.Errorf("Cannot add pool route %s through vpp tap: %v", key, err)
80+
return err
81+
}
82+
}
83+
return nil
84+
}
85+
86+
// OnIpamConfChanged handles IPAM configuration changes
87+
func (h *RouteHandler) OnIpamConfChanged(oldPool, newPool *proto.IPAMPool) error {
88+
h.log.Debugf("Received IPAM config update in route handler old:%+v new:%+v", oldPool, newPool)
89+
if newPool == nil && oldPool != nil {
90+
routes, err := h.getNetworkRoute(oldPool.Cidr, "")
91+
if err != nil {
92+
h.log.Errorf("Error getting route from ipam update: %v", err)
93+
return err
94+
}
95+
for _, route := range routes {
96+
err = h.routeWatcher.DelRoute(route)
97+
if err != nil {
98+
h.log.Errorf("Cannot delete pool route %s through vpp tap: %v", oldPool.Cidr, err)
99+
return err
100+
}
101+
}
102+
} else if newPool != nil {
103+
routes, err := h.getNetworkRoute(newPool.Cidr, "")
104+
if err != nil {
105+
h.log.Errorf("Error getting route from ipam update: %v", err)
106+
return err
107+
}
108+
for _, route := range routes {
109+
err = h.routeWatcher.AddRoute(route)
110+
if err != nil {
111+
h.log.Errorf("Cannot add pool route %s through vpp tap: %v", newPool.Cidr, err)
112+
return err
113+
}
114+
}
115+
}
116+
return nil
117+
}
118+
119+
func (h *RouteHandler) getNetworkRoute(network string, physicalNet string) (route []*netlink.Route, err error) {
120+
_, cidr, err := net.ParseCIDR(network)
121+
if err != nil {
122+
return nil, errors.Wrapf(err, "error parsing %s", network)
123+
}
124+
var routes []*netlink.Route
125+
var order int
126+
for _, uplinkStatus := range common.VppManagerInfo.UplinkStatuses {
127+
if uplinkStatus.PhysicalNetworkName == physicalNet {
128+
gw := uplinkStatus.FakeNextHopIP4
129+
if cidr.IP.To4() == nil {
130+
gw = uplinkStatus.FakeNextHopIP6
131+
}
132+
var priority int
133+
if uplinkStatus.IsMain {
134+
priority = 0
135+
} else {
136+
order += 1
137+
priority = order
138+
}
139+
routes = append(routes, &netlink.Route{
140+
Dst: cidr,
141+
Gw: gw,
142+
Protocol: syscall.RTPROT_STATIC,
143+
MTU: watchers.GetUplinkMtu(),
144+
Priority: priority,
145+
})
146+
}
147+
}
148+
return routes, nil
149+
}

0 commit comments

Comments
 (0)