Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 18 additions & 32 deletions calico-vpp-agent/cmd/calico_vpp_dataplane.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,11 +154,9 @@ func main() {
serviceServer := services.NewServiceServer(vpp, k8sclient, log.WithFields(logrus.Fields{"component": "services"}))
prometheusServer := prometheus.NewPrometheusServer(vpp, log.WithFields(logrus.Fields{"component": "prometheus"}))
localSIDWatcher := watchers.NewLocalSIDWatcher(vpp, clientv3, log.WithFields(logrus.Fields{"subcomponent": "localsid-watcher"}))
felixServer, err := felix.NewFelixServer(vpp, log.WithFields(logrus.Fields{"component": "policy"}))
if err != nil {
log.Fatalf("Failed to create policy server %s", err)
}
err = felix.InstallFelixPlugin()
felixServer := felix.NewFelixServer(vpp, clientv3, log.WithFields(logrus.Fields{"component": "policy"}))
felixWatcher := watchers.NewFelixWatcher(felixServer.GetFelixServerEventChan(), log.WithFields(logrus.Fields{"component": "felix watcher"}))
err = watchers.InstallFelixPlugin()
if err != nil {
log.Fatalf("could not install felix plugin: %s", err)
}
Expand All @@ -175,8 +173,10 @@ func main() {
peerWatcher.SetBGPConf(bgpConf)
routingServer.SetBGPConf(bgpConf)
serviceServer.SetBGPConf(bgpConf)
felixServer.SetBGPConf(bgpConf)

Go(felixServer.ServeFelix)
Go(felixWatcher.WatchFelix)

/*
* Mark as unhealthy while waiting for Felix config
Expand All @@ -188,19 +188,17 @@ func main() {
ticker := time.NewTicker(10 * time.Second)
defer ticker.Stop()

var felixConfig interface{}
var ourBGPSpec interface{}
var felixConfig *felixconfig.Config
var ourBGPSpec *common.LocalNodeSpec
felixConfigReceived := false
bgpSpecReceived := false

for !felixConfigReceived || !bgpSpecReceived {
select {
case value := <-felixServer.FelixConfigChan:
felixConfig = value
case felixConfig = <-felixServer.FelixConfigChan:
felixConfigReceived = true
log.Info("FelixConfig received from calico pod")
case value := <-felixServer.GotOurNodeBGPchan:
ourBGPSpec = value
case ourBGPSpec = <-felixServer.GotOurNodeBGPchan():
bgpSpecReceived = true
log.Info("BGP spec received from node add")
case <-t.Dying():
Expand All @@ -220,19 +218,13 @@ func main() {
healthServer.SetComponentStatus(health.ComponentFelix, true, "Felix config received")
log.Info("Felix configuration received")

if ourBGPSpec != nil {
bgpSpec, ok := ourBGPSpec.(*common.LocalNodeSpec)
if !ok {
panic("ourBGPSpec is not *common.LocalNodeSpec")
}
prefixWatcher.SetOurBGPSpec(bgpSpec)
connectivityServer.SetOurBGPSpec(bgpSpec)
routingServer.SetOurBGPSpec(bgpSpec)
serviceServer.SetOurBGPSpec(bgpSpec)
localSIDWatcher.SetOurBGPSpec(bgpSpec)
netWatcher.SetOurBGPSpec(bgpSpec)
cniServer.SetOurBGPSpec(bgpSpec)
}
prefixWatcher.SetOurBGPSpec(ourBGPSpec)
connectivityServer.SetOurBGPSpec(ourBGPSpec)
routingServer.SetOurBGPSpec(ourBGPSpec)
serviceServer.SetOurBGPSpec(ourBGPSpec)
localSIDWatcher.SetOurBGPSpec(ourBGPSpec)
netWatcher.SetOurBGPSpec(ourBGPSpec)
cniServer.SetOurBGPSpec(ourBGPSpec)

if *config.GetCalicoVppFeatureGates().MultinetEnabled {
Go(netWatcher.WatchNetworks)
Expand All @@ -246,14 +238,8 @@ func main() {
}
}

if felixConfig != nil {
felixCfg, ok := felixConfig.(*felixconfig.Config)
if !ok {
panic("ourBGPSpec is not *felixconfig.Config")
}
cniServer.SetFelixConfig(felixCfg)
connectivityServer.SetFelixConfig(felixCfg)
}
cniServer.SetFelixConfig(felixConfig)
connectivityServer.SetFelixConfig(felixConfig)

Go(routeWatcher.WatchRoutes)
Go(linkWatcher.WatchLinks)
Expand Down
9 changes: 4 additions & 5 deletions calico-vpp-agent/cni/cni_pod_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ import (
"github.com/projectcalico/vpp-dataplane/v3/calico-vpp-agent/common"
"github.com/projectcalico/vpp-dataplane/v3/calico-vpp-agent/tests/mocks"
"github.com/projectcalico/vpp-dataplane/v3/calico-vpp-agent/testutils"
"github.com/projectcalico/vpp-dataplane/v3/calico-vpp-agent/watchers"
"github.com/projectcalico/vpp-dataplane/v3/config"
"github.com/projectcalico/vpp-dataplane/v3/vpplink"
"github.com/projectcalico/vpp-dataplane/v3/vpplink/types"
Expand Down Expand Up @@ -323,7 +322,7 @@ var _ = Describe("Pod-related functionality of CNI", func() {

Context("With MultiNet configuration (and multinet VRF and loopback already configured)", func() {
var (
networkDefinition *watchers.NetworkDefinition
networkDefinition *common.NetworkDefinition
pubSubHandlerMock *mocks.PubSubHandlerMock
)

Expand Down Expand Up @@ -355,9 +354,9 @@ var _ = Describe("Pod-related functionality of CNI", func() {
}
// NetworkDefinition CRD information caught by NetWatcher and send with additional information
// (VRF and loopback created by watcher) to the cni server as common.NetAdded CalicoVPPEvent
networkDefinition = &watchers.NetworkDefinition{
VRF: watchers.VRF{Tables: tables},
PodVRF: watchers.VRF{Tables: podTables},
networkDefinition = &common.NetworkDefinition{
VRF: common.VRF{Tables: tables},
PodVRF: common.VRF{Tables: podTables},
Vni: uint32(0), // important only for VXLAN tunnel going out of node
Name: networkName,
Range: "10.1.1.0/24", // IP range for secondary network defined by multinet
Expand Down
35 changes: 21 additions & 14 deletions calico-vpp-agent/cni/cni_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ import (
"github.com/projectcalico/vpp-dataplane/v3/calico-vpp-agent/cni/model"
"github.com/projectcalico/vpp-dataplane/v3/calico-vpp-agent/cni/podinterface"
"github.com/projectcalico/vpp-dataplane/v3/calico-vpp-agent/common"
"github.com/projectcalico/vpp-dataplane/v3/calico-vpp-agent/watchers"
"github.com/projectcalico/vpp-dataplane/v3/config"
"github.com/projectcalico/vpp-dataplane/v3/vpplink"
"github.com/projectcalico/vpp-dataplane/v3/vpplink/types"
Expand All @@ -53,7 +52,7 @@ type Server struct {

podInterfaceMap map[string]model.LocalPodSpec
lock sync.Mutex /* protects Add/DelVppInterace/RescanState */
cniEventChan chan common.CalicoVppEvent
cniEventChan chan any

memifDriver *podinterface.MemifPodInterfaceDriver
tuntapDriver *podinterface.TunTapPodInterfaceDriver
Expand All @@ -65,7 +64,7 @@ type Server struct {
RedirectToHostClassifyTableIndex uint32

networkDefinitions sync.Map
cniMultinetEventChan chan common.CalicoVppEvent
cniMultinetEventChan chan any
nodeBGPSpec *common.LocalNodeSpec
}

Expand Down Expand Up @@ -96,9 +95,9 @@ func (s *Server) Add(ctx context.Context, request *cniproto.AddRequest) (*cnipro
if !ok {
return nil, fmt.Errorf("trying to create a pod in an unexisting network %s", podSpec.NetworkName)
} else {
networkDefinition, ok := value.(*watchers.NetworkDefinition)
networkDefinition, ok := value.(*common.NetworkDefinition)
if !ok || networkDefinition == nil {
panic("Value is not of type *watchers.NetworkDefinition")
panic("Value is not of type *common.NetworkDefinition")
}
_, route, err := net.ParseCIDR(networkDefinition.Range)
if err == nil {
Expand Down Expand Up @@ -292,7 +291,7 @@ func NewCNIServer(vpp *vpplink.VppLink, felixServerIpam common.FelixServerIpam,
log: log,

felixServerIpam: felixServerIpam,
cniEventChan: make(chan common.CalicoVppEvent, common.ChanSize),
cniEventChan: make(chan any, common.ChanSize),

grpcServer: grpc.NewServer(),
podInterfaceMap: make(map[string]model.LocalPodSpec),
Expand All @@ -301,7 +300,7 @@ func NewCNIServer(vpp *vpplink.VppLink, felixServerIpam common.FelixServerIpam,
vclDriver: podinterface.NewVclPodInterfaceDriver(vpp, log, felixServerIpam),
loopbackDriver: podinterface.NewLoopbackPodInterfaceDriver(vpp, log, felixServerIpam),

cniMultinetEventChan: make(chan common.CalicoVppEvent, common.ChanSize),
cniMultinetEventChan: make(chan any, common.ChanSize),
}
reg := common.RegisterHandler(server.cniEventChan, "CNI server events")
reg.ExpectEvents(
Expand All @@ -322,7 +321,11 @@ forloop:
select {
case <-t.Dying():
break forloop
case evt := <-s.cniEventChan:
case msg := <-s.cniEventChan:
evt, ok := msg.(common.CalicoVppEvent)
if !ok {
continue
}
switch evt.Type {
case common.FelixConfChanged:
if new, _ := evt.New.(*felixConfig.Config); new != nil {
Expand Down Expand Up @@ -437,21 +440,25 @@ func (s *Server) ServeCNI(t *tomb.Tomb) error {
case <-t.Dying():
s.log.Warn("Cni server asked to exit")
return
case event := <-s.cniMultinetEventChan:
case msg := <-s.cniMultinetEventChan:
event, ok := msg.(common.CalicoVppEvent)
if !ok {
continue
}
switch event.Type {
case common.NetsSynced:
netsSynced <- true
case common.NetAddedOrUpdated:
netDef, ok := event.New.(*watchers.NetworkDefinition)
netDef, ok := event.New.(*common.NetworkDefinition)
if !ok {
s.log.Errorf("event.New is not a *watchers.NetworkDefinition %v", event.New)
s.log.Errorf("event.New is not a *common.NetworkDefinition %v", event.New)
continue
}
s.networkDefinitions.Store(netDef.Name, netDef)
case common.NetDeleted:
netDef, ok := event.Old.(*watchers.NetworkDefinition)
netDef, ok := event.Old.(*common.NetworkDefinition)
if !ok {
s.log.Errorf("event.Old is not a *watchers.NetworkDefinition %v", event.Old)
s.log.Errorf("event.Old is not a *common.NetworkDefinition %v", event.Old)
continue
}
s.networkDefinitions.Delete(netDef.Name)
Expand Down Expand Up @@ -491,6 +498,6 @@ func (s *Server) ServeCNI(t *tomb.Tomb) error {

// ForceAddingNetworkDefinition will add another NetworkDefinition to this CNI server.
// The usage is mainly for testing purposes.
func (s *Server) ForceAddingNetworkDefinition(networkDefinition *watchers.NetworkDefinition) {
func (s *Server) ForceAddingNetworkDefinition(networkDefinition *common.NetworkDefinition) {
s.networkDefinitions.Store(networkDefinition.Name, networkDefinition)
}
9 changes: 4 additions & 5 deletions calico-vpp-agent/cni/network_vpp.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (

"github.com/projectcalico/vpp-dataplane/v3/calico-vpp-agent/cni/model"
"github.com/projectcalico/vpp-dataplane/v3/calico-vpp-agent/common"
"github.com/projectcalico/vpp-dataplane/v3/calico-vpp-agent/watchers"
"github.com/projectcalico/vpp-dataplane/v3/config"
"github.com/projectcalico/vpp-dataplane/v3/vpplink"
"github.com/projectcalico/vpp-dataplane/v3/vpplink/types"
Expand Down Expand Up @@ -248,9 +247,9 @@ func (s *Server) AddVppInterface(podSpec *model.LocalPodSpec, doHostSideConf boo
if !ok {
s.log.Errorf("network not found %s", podSpec.NetworkName)
} else {
networkDefinition, ok := value.(*watchers.NetworkDefinition)
networkDefinition, ok := value.(*common.NetworkDefinition)
if !ok || networkDefinition == nil {
panic("networkDefinition not of type *watchers.NetworkDefinition")
panic("networkDefinition not of type *common.NetworkDefinition")
}
vni = networkDefinition.Vni
}
Expand Down Expand Up @@ -320,9 +319,9 @@ func (s *Server) DelVppInterface(podSpec *model.LocalPodSpec) {
if !ok {
deleteLocalPodAddress = false
} else {
networkDefinition, ok := value.(*watchers.NetworkDefinition)
networkDefinition, ok := value.(*common.NetworkDefinition)
if !ok || networkDefinition == nil {
panic("networkDefinition not of type *watchers.NetworkDefinition")
panic("networkDefinition not of type *common.NetworkDefinition")
}
vni = networkDefinition.Vni
}
Expand Down
17 changes: 8 additions & 9 deletions calico-vpp-agent/cni/network_vpp_routes.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (

"github.com/projectcalico/vpp-dataplane/v3/calico-vpp-agent/cni/model"
"github.com/projectcalico/vpp-dataplane/v3/calico-vpp-agent/common"
"github.com/projectcalico/vpp-dataplane/v3/calico-vpp-agent/watchers"
"github.com/projectcalico/vpp-dataplane/v3/vpplink"
"github.com/projectcalico/vpp-dataplane/v3/vpplink/types"
)
Expand All @@ -37,9 +36,9 @@ func (s *Server) RoutePodInterface(podSpec *model.LocalPodSpec, stack *vpplink.C
if !ok {
s.log.Errorf("network not found %s", podSpec.NetworkName)
} else {
networkDefinition, ok := value.(*watchers.NetworkDefinition)
networkDefinition, ok := value.(*common.NetworkDefinition)
if !ok || networkDefinition == nil {
panic("networkDefinition not of type *watchers.NetworkDefinition")
panic("networkDefinition not of type *common.NetworkDefinition")
}
table = networkDefinition.VRF.Tables[idx]
}
Expand Down Expand Up @@ -88,9 +87,9 @@ func (s *Server) UnroutePodInterface(podSpec *model.LocalPodSpec, swIfIndex uint
if !ok {
s.log.Errorf("network not found %s", podSpec.NetworkName)
} else {
networkDefinition, ok := value.(*watchers.NetworkDefinition)
networkDefinition, ok := value.(*common.NetworkDefinition)
if !ok || networkDefinition == nil {
panic("networkDefinition not of type *watchers.NetworkDefinition")
panic("networkDefinition not of type *common.NetworkDefinition")
}
table = networkDefinition.VRF.Tables[idx]
}
Expand Down Expand Up @@ -242,9 +241,9 @@ func (s *Server) CreatePodVRF(podSpec *model.LocalPodSpec, stack *vpplink.Cleanu
if !ok {
return errors.Errorf("network not found %s", podSpec.NetworkName)
}
networkDefinition, ok := value.(*watchers.NetworkDefinition)
networkDefinition, ok := value.(*common.NetworkDefinition)
if !ok || networkDefinition == nil {
panic("networkDefinition not of type *watchers.NetworkDefinition")
panic("networkDefinition not of type *common.NetworkDefinition")
}
vrfIndex = networkDefinition.PodVRF.Tables[idx]
}
Expand Down Expand Up @@ -402,9 +401,9 @@ func (s *Server) DeletePodVRF(podSpec *model.LocalPodSpec) {
if !ok {
s.log.Errorf("network not found %s", podSpec.NetworkName)
} else {
networkDefinition, ok := value.(*watchers.NetworkDefinition)
networkDefinition, ok := value.(*common.NetworkDefinition)
if !ok || networkDefinition == nil {
panic("networkDefinition not of type *watchers.NetworkDefinition")
panic("networkDefinition not of type *common.NetworkDefinition")
}
vrfIndex = networkDefinition.PodVRF.Tables[idx]
}
Expand Down
16 changes: 6 additions & 10 deletions calico-vpp-agent/common/pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,31 +85,27 @@ type PubSubHandlerRegistration struct {
/* Name for the registration, for logging & debugging */
name string
/* Channel where to send events */
channel chan CalicoVppEvent
channel chan any
/* Receive only these events. If empty we'll receive all */
expectedEvents map[CalicoVppEventType]bool
/* Receive all events */
expectAllEvents bool
}

func (reg *PubSubHandlerRegistration) ExpectEvents(eventTypes ...CalicoVppEventType) {
for _, eventType := range eventTypes {
reg.expectedEvents[eventType] = true
}
reg.expectAllEvents = false
}

type PubSub struct {
log *log.Entry
pubSubHandlerRegistrations []*PubSubHandlerRegistration
}

func RegisterHandler(channel chan CalicoVppEvent, name string) *PubSubHandlerRegistration {
func RegisterHandler(channel chan any, name string) *PubSubHandlerRegistration {
reg := &PubSubHandlerRegistration{
channel: channel,
name: name,
expectedEvents: make(map[CalicoVppEventType]bool),
expectAllEvents: true, /* By default receive everything, unless we ask for a filter */
channel: channel,
name: name,
expectedEvents: make(map[CalicoVppEventType]bool),
}
ThePubSub.pubSubHandlerRegistrations = append(ThePubSub.pubSubHandlerRegistrations, reg)
return reg
Expand All @@ -128,7 +124,7 @@ func redactPassword(event CalicoVppEvent) string {
func SendEvent(event CalicoVppEvent) {
ThePubSub.log.Debugf("Broadcasting event %s", redactPassword(event))
for _, reg := range ThePubSub.pubSubHandlerRegistrations {
if reg.expectAllEvents || reg.expectedEvents[event.Type] {
if reg.expectedEvents[event.Type] {
reg.channel <- event
}
}
Expand Down
Loading
Loading