Skip to content

Commit 5087bc5

Browse files
committed
Disambiguate cluster udn and namespaced udn
1 parent cc187b8 commit 5087bc5

File tree

3 files changed

+70
-12
lines changed

3 files changed

+70
-12
lines changed

pkg/pipeline/transform/kubernetes/cni/udn.go

Lines changed: 51 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,19 @@
11
package cni
22

33
import (
4+
"context"
45
"encoding/json"
56
"fmt"
67
"strings"
78

89
"github.com/netobserv/flowlogs-pipeline/pkg/api"
910
"github.com/netobserv/flowlogs-pipeline/pkg/config"
11+
log "github.com/sirupsen/logrus"
1012
v1 "k8s.io/api/core/v1"
13+
"k8s.io/apimachinery/pkg/api/errors"
14+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
15+
"k8s.io/apimachinery/pkg/runtime/schema"
16+
"k8s.io/client-go/dynamic"
1117
)
1218

1319
const (
@@ -58,7 +64,7 @@ func (m *UDNHandler) BuildKeys(flow config.GenericMap, rule *api.K8sRule) []Seco
5864
return keys
5965
}
6066

61-
func (m *UDNHandler) GetPodUniqueKeys(pod *v1.Pod) ([]string, error) {
67+
func (m *UDNHandler) GetPodUniqueKeys(ctx context.Context, dynClient *dynamic.DynamicClient, pod *v1.Pod) ([]string, error) {
6268
// Example:
6369
// k8s.ovn.org/pod-networks: '{"default":{"ip_addresses":["10.128.2.20/23"],"mac_address":"0a:58:0a:80:02:14","routes":[{"dest":"10.128.0.0/14","nextHop":"10.128.2.1"},{"dest":"100.64.0.0/16","nextHop":"10.128.2.1"}],"ip_address":"10.128.2.20/23","role":"infrastructure-locked"},"mesh-arena/primary-udn":{"ip_addresses":["10.200.200.12/24"],"mac_address":"0a:58:0a:c8:c8:0c","gateway_ips":["10.200.200.1"],"routes":[{"dest":"172.30.0.0/16","nextHop":"10.200.200.1"},{"dest":"100.65.0.0/16","nextHop":"10.200.200.1"}],"ip_address":"10.200.200.12/24","gateway_ip":"10.200.200.1","tunnel_id":16,"role":"primary"}}'
6470
if statusAnnotationJSON, ok := pod.Annotations[ovnAnnotation]; ok {
@@ -74,6 +80,9 @@ func (m *UDNHandler) GetPodUniqueKeys(pod *v1.Pod) ([]string, error) {
7480
// IP has a CIDR prefix (bug??)
7581
parts := strings.SplitN(ip, "/", 2)
7682
if len(parts) > 0 {
83+
if dynClient != nil {
84+
label = disambiguateClusterUDN(ctx, dynClient, label)
85+
}
7786
key := UDNKey(label, parts[0])
7887
keys = append(keys, key.Key)
7988
}
@@ -86,3 +95,44 @@ func (m *UDNHandler) GetPodUniqueKeys(pod *v1.Pod) ([]string, error) {
8695
// Annotation not present => just ignore, no error
8796
return nil, nil
8897
}
98+
99+
func disambiguateClusterUDN(ctx context.Context, dynClient *dynamic.DynamicClient, name string) string {
100+
// "name" can look like this: "my-namespace/my-udn"; namespace included even for Cluster UDN
101+
parts := strings.SplitN(name, "/", 2)
102+
if len(parts) < 2 {
103+
// no disambiguation
104+
return name
105+
}
106+
ns := parts[0]
107+
udnName := parts[1]
108+
// Does it exist as a namespaced-udn?
109+
_, err := dynClient.
110+
Resource(schema.GroupVersionResource{
111+
Group: "k8s.ovn.org",
112+
Resource: "userdefinednetworks",
113+
Version: "v1",
114+
}).
115+
Namespace(ns).
116+
Get(ctx, udnName, metav1.GetOptions{})
117+
if err == nil {
118+
// found => return as is
119+
return name
120+
} else if !errors.IsNotFound(err) {
121+
log.Errorf("could not fetch UDN %s: %v", name, err)
122+
}
123+
// Does it exist as a cluster-udn?
124+
_, err = dynClient.
125+
Resource(schema.GroupVersionResource{
126+
Group: "k8s.ovn.org",
127+
Resource: "clusteruserdefinednetworks",
128+
Version: "v1",
129+
}).
130+
Get(ctx, udnName, metav1.GetOptions{})
131+
if err == nil {
132+
// found => return just the udn name part
133+
return udnName
134+
} else if !errors.IsNotFound(err) {
135+
log.Errorf("could not fetch CUDN %s: %v", udnName, err)
136+
}
137+
return name
138+
}

pkg/pipeline/transform/kubernetes/cni/udn_test.go

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package cni
22

33
import (
4+
"context"
45
"fmt"
56
"testing"
67

@@ -29,39 +30,39 @@ func udnConfigAnnotation(ip string) string {
2930

3031
func TestExtractUDNStatusKeys(t *testing.T) {
3132
// Annotation not found => no error, no key
32-
keys, err := udnHandler.GetPodUniqueKeys(&udnPod)
33+
keys, err := udnHandler.GetPodUniqueKeys(context.TODO(), nil, &udnPod)
3334
require.NoError(t, err)
3435
require.Empty(t, keys)
3536

3637
// Valid annotation => no error, valid key
3738
udnPod.Annotations = map[string]string{
3839
ovnAnnotation: udnConfigAnnotation("10.200.200.12"),
3940
}
40-
keys, err = udnHandler.GetPodUniqueKeys(&udnPod)
41+
keys, err = udnHandler.GetPodUniqueKeys(context.TODO(), nil, &udnPod)
4142
require.NoError(t, err)
4243
require.Equal(t, []string{"mesh-arena/primary-udn~10.200.200.12"}, keys)
4344

4445
// Same check with a somewhat surprising CIDR found here as an IP, but it's really the IP part that should be used
4546
udnPod.Annotations = map[string]string{
4647
ovnAnnotation: udnConfigAnnotation("10.200.200.12/24"),
4748
}
48-
keys, err = udnHandler.GetPodUniqueKeys(&udnPod)
49+
keys, err = udnHandler.GetPodUniqueKeys(context.TODO(), nil, &udnPod)
4950
require.NoError(t, err)
5051
require.Equal(t, []string{"mesh-arena/primary-udn~10.200.200.12"}, keys)
5152

5253
// Same with IPv6
5354
udnPod.Annotations = map[string]string{
5455
ovnAnnotation: udnConfigAnnotation("2001:0db8::1111"),
5556
}
56-
keys, err = udnHandler.GetPodUniqueKeys(&udnPod)
57+
keys, err = udnHandler.GetPodUniqueKeys(context.TODO(), nil, &udnPod)
5758
require.NoError(t, err)
5859
require.Equal(t, []string{"mesh-arena/primary-udn~2001:0db8::1111"}, keys)
5960

6061
// Same with IPv6 as a CIDR
6162
udnPod.Annotations = map[string]string{
6263
ovnAnnotation: udnConfigAnnotation("2001:0db8::1111/24"),
6364
}
64-
keys, err = udnHandler.GetPodUniqueKeys(&udnPod)
65+
keys, err = udnHandler.GetPodUniqueKeys(context.TODO(), nil, &udnPod)
6566
require.NoError(t, err)
6667
require.Equal(t, []string{"mesh-arena/primary-udn~2001:0db8::1111"}, keys)
6768
}

pkg/pipeline/transform/kubernetes/informers/informers.go

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package informers
1919

2020
import (
21+
"context"
2122
"fmt"
2223
"net"
2324
"time"
@@ -26,12 +27,13 @@ import (
2627
"github.com/netobserv/flowlogs-pipeline/pkg/pipeline/transform/kubernetes/cni"
2728
"github.com/netobserv/flowlogs-pipeline/pkg/pipeline/transform/kubernetes/model"
2829
"github.com/netobserv/flowlogs-pipeline/pkg/utils"
29-
"github.com/sirupsen/logrus"
3030

3131
"github.com/prometheus/client_golang/prometheus"
32+
"github.com/sirupsen/logrus"
3233
v1 "k8s.io/api/core/v1"
3334
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
3435
"k8s.io/apimachinery/pkg/runtime/schema"
36+
"k8s.io/client-go/dynamic"
3537
inf "k8s.io/client-go/informers"
3638
"k8s.io/client-go/kubernetes"
3739
"k8s.io/client-go/metadata"
@@ -253,7 +255,7 @@ func (k *Informers) initNodeInformer(informerFactory inf.SharedInformerFactory,
253255
return nil
254256
}
255257

256-
func (k *Informers) initPodInformer(informerFactory inf.SharedInformerFactory, cfg Config) error {
258+
func (k *Informers) initPodInformer(informerFactory inf.SharedInformerFactory, cfg Config, dynClient *dynamic.DynamicClient) error {
257259
pods := informerFactory.Core().V1().Pods().Informer()
258260
// Transform any *v1.Pod instance into a *Info instance to save space
259261
// in the informer's cache
@@ -280,7 +282,7 @@ func (k *Informers) initPodInformer(informerFactory inf.SharedInformerFactory, c
280282
}
281283
}
282284
if cfg.hasUDN {
283-
if udnKeys, err := udn.GetPodUniqueKeys(pod); err != nil {
285+
if udnKeys, err := udn.GetPodUniqueKeys(context.Background(), dynClient, pod); err != nil {
284286
// Log the error as Info, do not block other ips indexing
285287
log.WithError(err).Infof("UDNs cannot be identified")
286288
} else {
@@ -395,23 +397,28 @@ func (k *Informers) InitFromConfig(kubeconfig string, infConfig Config, opMetric
395397
return err
396398
}
397399

400+
dynClient, err := dynamic.NewForConfig(kconf)
401+
if err != nil {
402+
return err
403+
}
404+
398405
k.indexerHitMetric = opMetrics.CreateIndexerHitCounter()
399-
err = k.initInformers(kubeClient, metaKubeClient, infConfig)
406+
err = k.initInformers(kubeClient, metaKubeClient, dynClient, infConfig)
400407
if err != nil {
401408
return err
402409
}
403410

404411
return nil
405412
}
406413

407-
func (k *Informers) initInformers(client kubernetes.Interface, metaClient metadata.Interface, cfg Config) error {
414+
func (k *Informers) initInformers(client kubernetes.Interface, metaClient metadata.Interface, dynClient *dynamic.DynamicClient, cfg Config) error {
408415
informerFactory := inf.NewSharedInformerFactory(client, syncTime)
409416
metadataInformerFactory := metadatainformer.NewSharedInformerFactory(metaClient, syncTime)
410417
err := k.initNodeInformer(informerFactory, cfg)
411418
if err != nil {
412419
return err
413420
}
414-
err = k.initPodInformer(informerFactory, cfg)
421+
err = k.initPodInformer(informerFactory, cfg, dynClient)
415422
if err != nil {
416423
return err
417424
}

0 commit comments

Comments
 (0)