From 5087bc504b4ff4834c4e902eb87fb0948642e99e Mon Sep 17 00:00:00 2001 From: Joel Takvorian Date: Wed, 14 May 2025 13:35:58 +0200 Subject: [PATCH] Disambiguate cluster udn and namespaced udn --- pkg/pipeline/transform/kubernetes/cni/udn.go | 52 ++++++++++++++++++- .../transform/kubernetes/cni/udn_test.go | 11 ++-- .../kubernetes/informers/informers.go | 19 ++++--- 3 files changed, 70 insertions(+), 12 deletions(-) diff --git a/pkg/pipeline/transform/kubernetes/cni/udn.go b/pkg/pipeline/transform/kubernetes/cni/udn.go index ebb50bd94..4987a22d7 100644 --- a/pkg/pipeline/transform/kubernetes/cni/udn.go +++ b/pkg/pipeline/transform/kubernetes/cni/udn.go @@ -1,13 +1,19 @@ package cni import ( + "context" "encoding/json" "fmt" "strings" "github.com/netobserv/flowlogs-pipeline/pkg/api" "github.com/netobserv/flowlogs-pipeline/pkg/config" + log "github.com/sirupsen/logrus" v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/client-go/dynamic" ) const ( @@ -58,7 +64,7 @@ func (m *UDNHandler) BuildKeys(flow config.GenericMap, rule *api.K8sRule) []Seco return keys } -func (m *UDNHandler) GetPodUniqueKeys(pod *v1.Pod) ([]string, error) { +func (m *UDNHandler) GetPodUniqueKeys(ctx context.Context, dynClient *dynamic.DynamicClient, pod *v1.Pod) ([]string, error) { // Example: // k8s.ovn.org/pod-networks: '{"default":{"ip_addresses":["10.128.2.20/23"],"mac_address":"0a:58:0a:80:02:14","routes":[{"dest":"10.128.0.0/14","nextHop":"10.128.2.1"},{"dest":"100.64.0.0/16","nextHop":"10.128.2.1"}],"ip_address":"10.128.2.20/23","role":"infrastructure-locked"},"mesh-arena/primary-udn":{"ip_addresses":["10.200.200.12/24"],"mac_address":"0a:58:0a:c8:c8:0c","gateway_ips":["10.200.200.1"],"routes":[{"dest":"172.30.0.0/16","nextHop":"10.200.200.1"},{"dest":"100.65.0.0/16","nextHop":"10.200.200.1"}],"ip_address":"10.200.200.12/24","gateway_ip":"10.200.200.1","tunnel_id":16,"role":"primary"}}' if statusAnnotationJSON, ok := pod.Annotations[ovnAnnotation]; ok { @@ -74,6 +80,9 @@ func (m *UDNHandler) GetPodUniqueKeys(pod *v1.Pod) ([]string, error) { // IP has a CIDR prefix (bug??) parts := strings.SplitN(ip, "/", 2) if len(parts) > 0 { + if dynClient != nil { + label = disambiguateClusterUDN(ctx, dynClient, label) + } key := UDNKey(label, parts[0]) keys = append(keys, key.Key) } @@ -86,3 +95,44 @@ func (m *UDNHandler) GetPodUniqueKeys(pod *v1.Pod) ([]string, error) { // Annotation not present => just ignore, no error return nil, nil } + +func disambiguateClusterUDN(ctx context.Context, dynClient *dynamic.DynamicClient, name string) string { + // "name" can look like this: "my-namespace/my-udn"; namespace included even for Cluster UDN + parts := strings.SplitN(name, "/", 2) + if len(parts) < 2 { + // no disambiguation + return name + } + ns := parts[0] + udnName := parts[1] + // Does it exist as a namespaced-udn? + _, err := dynClient. + Resource(schema.GroupVersionResource{ + Group: "k8s.ovn.org", + Resource: "userdefinednetworks", + Version: "v1", + }). + Namespace(ns). + Get(ctx, udnName, metav1.GetOptions{}) + if err == nil { + // found => return as is + return name + } else if !errors.IsNotFound(err) { + log.Errorf("could not fetch UDN %s: %v", name, err) + } + // Does it exist as a cluster-udn? + _, err = dynClient. + Resource(schema.GroupVersionResource{ + Group: "k8s.ovn.org", + Resource: "clusteruserdefinednetworks", + Version: "v1", + }). + Get(ctx, udnName, metav1.GetOptions{}) + if err == nil { + // found => return just the udn name part + return udnName + } else if !errors.IsNotFound(err) { + log.Errorf("could not fetch CUDN %s: %v", udnName, err) + } + return name +} diff --git a/pkg/pipeline/transform/kubernetes/cni/udn_test.go b/pkg/pipeline/transform/kubernetes/cni/udn_test.go index 42e371494..bde20b7c8 100644 --- a/pkg/pipeline/transform/kubernetes/cni/udn_test.go +++ b/pkg/pipeline/transform/kubernetes/cni/udn_test.go @@ -1,6 +1,7 @@ package cni import ( + "context" "fmt" "testing" @@ -29,7 +30,7 @@ func udnConfigAnnotation(ip string) string { func TestExtractUDNStatusKeys(t *testing.T) { // Annotation not found => no error, no key - keys, err := udnHandler.GetPodUniqueKeys(&udnPod) + keys, err := udnHandler.GetPodUniqueKeys(context.TODO(), nil, &udnPod) require.NoError(t, err) require.Empty(t, keys) @@ -37,7 +38,7 @@ func TestExtractUDNStatusKeys(t *testing.T) { udnPod.Annotations = map[string]string{ ovnAnnotation: udnConfigAnnotation("10.200.200.12"), } - keys, err = udnHandler.GetPodUniqueKeys(&udnPod) + keys, err = udnHandler.GetPodUniqueKeys(context.TODO(), nil, &udnPod) require.NoError(t, err) require.Equal(t, []string{"mesh-arena/primary-udn~10.200.200.12"}, keys) @@ -45,7 +46,7 @@ func TestExtractUDNStatusKeys(t *testing.T) { udnPod.Annotations = map[string]string{ ovnAnnotation: udnConfigAnnotation("10.200.200.12/24"), } - keys, err = udnHandler.GetPodUniqueKeys(&udnPod) + keys, err = udnHandler.GetPodUniqueKeys(context.TODO(), nil, &udnPod) require.NoError(t, err) require.Equal(t, []string{"mesh-arena/primary-udn~10.200.200.12"}, keys) @@ -53,7 +54,7 @@ func TestExtractUDNStatusKeys(t *testing.T) { udnPod.Annotations = map[string]string{ ovnAnnotation: udnConfigAnnotation("2001:0db8::1111"), } - keys, err = udnHandler.GetPodUniqueKeys(&udnPod) + keys, err = udnHandler.GetPodUniqueKeys(context.TODO(), nil, &udnPod) require.NoError(t, err) require.Equal(t, []string{"mesh-arena/primary-udn~2001:0db8::1111"}, keys) @@ -61,7 +62,7 @@ func TestExtractUDNStatusKeys(t *testing.T) { udnPod.Annotations = map[string]string{ ovnAnnotation: udnConfigAnnotation("2001:0db8::1111/24"), } - keys, err = udnHandler.GetPodUniqueKeys(&udnPod) + keys, err = udnHandler.GetPodUniqueKeys(context.TODO(), nil, &udnPod) require.NoError(t, err) require.Equal(t, []string{"mesh-arena/primary-udn~2001:0db8::1111"}, keys) } diff --git a/pkg/pipeline/transform/kubernetes/informers/informers.go b/pkg/pipeline/transform/kubernetes/informers/informers.go index a56455a24..6263d013e 100644 --- a/pkg/pipeline/transform/kubernetes/informers/informers.go +++ b/pkg/pipeline/transform/kubernetes/informers/informers.go @@ -18,6 +18,7 @@ package informers import ( + "context" "fmt" "net" "time" @@ -26,12 +27,13 @@ import ( "github.com/netobserv/flowlogs-pipeline/pkg/pipeline/transform/kubernetes/cni" "github.com/netobserv/flowlogs-pipeline/pkg/pipeline/transform/kubernetes/model" "github.com/netobserv/flowlogs-pipeline/pkg/utils" - "github.com/sirupsen/logrus" "github.com/prometheus/client_golang/prometheus" + "github.com/sirupsen/logrus" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/client-go/dynamic" inf "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes" "k8s.io/client-go/metadata" @@ -253,7 +255,7 @@ func (k *Informers) initNodeInformer(informerFactory inf.SharedInformerFactory, return nil } -func (k *Informers) initPodInformer(informerFactory inf.SharedInformerFactory, cfg Config) error { +func (k *Informers) initPodInformer(informerFactory inf.SharedInformerFactory, cfg Config, dynClient *dynamic.DynamicClient) error { pods := informerFactory.Core().V1().Pods().Informer() // Transform any *v1.Pod instance into a *Info instance to save space // in the informer's cache @@ -280,7 +282,7 @@ func (k *Informers) initPodInformer(informerFactory inf.SharedInformerFactory, c } } if cfg.hasUDN { - if udnKeys, err := udn.GetPodUniqueKeys(pod); err != nil { + if udnKeys, err := udn.GetPodUniqueKeys(context.Background(), dynClient, pod); err != nil { // Log the error as Info, do not block other ips indexing log.WithError(err).Infof("UDNs cannot be identified") } else { @@ -395,8 +397,13 @@ func (k *Informers) InitFromConfig(kubeconfig string, infConfig Config, opMetric return err } + dynClient, err := dynamic.NewForConfig(kconf) + if err != nil { + return err + } + k.indexerHitMetric = opMetrics.CreateIndexerHitCounter() - err = k.initInformers(kubeClient, metaKubeClient, infConfig) + err = k.initInformers(kubeClient, metaKubeClient, dynClient, infConfig) if err != nil { return err } @@ -404,14 +411,14 @@ func (k *Informers) InitFromConfig(kubeconfig string, infConfig Config, opMetric return nil } -func (k *Informers) initInformers(client kubernetes.Interface, metaClient metadata.Interface, cfg Config) error { +func (k *Informers) initInformers(client kubernetes.Interface, metaClient metadata.Interface, dynClient *dynamic.DynamicClient, cfg Config) error { informerFactory := inf.NewSharedInformerFactory(client, syncTime) metadataInformerFactory := metadatainformer.NewSharedInformerFactory(metaClient, syncTime) err := k.initNodeInformer(informerFactory, cfg) if err != nil { return err } - err = k.initPodInformer(informerFactory, cfg) + err = k.initPodInformer(informerFactory, cfg, dynClient) if err != nil { return err }