diff --git a/pkg/api/transform_network.go b/pkg/api/transform_network.go index 68a2e99c0..848784fdf 100644 --- a/pkg/api/transform_network.go +++ b/pkg/api/transform_network.go @@ -52,6 +52,7 @@ type NetworkTransformKubeConfig struct { ConfigPath string `yaml:"configPath,omitempty" json:"configPath,omitempty" doc:"path to kubeconfig file (optional)"` SecondaryNetworks []SecondaryNetwork `yaml:"secondaryNetworks,omitempty" json:"secondaryNetworks,omitempty" doc:"configuration for secondary networks"` ManagedCNI []string `yaml:"managedCNI,omitempty" json:"managedCNI,omitempty" doc:"a list of CNI (network plugins) to manage, for detecting additional interfaces. Currently supported: ovn"` + TrackedKinds []string `yaml:"trackedKinds,omitempty" json:"trackedKinds,omitempty" doc:"list of Kubernetes resource kinds to track for ownership chain (e.g., Deployment, Gateway, VirtualMachine). If a resource's owner is in this list, FLP will continue tracking up the ownership chain."` } type TransformNetworkOperationEnum string diff --git a/pkg/pipeline/transform/kubernetes/datasource/datasource.go b/pkg/pipeline/transform/kubernetes/datasource/datasource.go index bf52fd5eb..4b3803474 100644 --- a/pkg/pipeline/transform/kubernetes/datasource/datasource.go +++ b/pkg/pipeline/transform/kubernetes/datasource/datasource.go @@ -11,7 +11,7 @@ type Datasource struct { Informers informers.Interface } -func NewInformerDatasource(kubeconfig string, infConfig informers.Config, opMetrics *operational.Metrics) (*Datasource, error) { +func NewInformerDatasource(kubeconfig string, infConfig *informers.Config, opMetrics *operational.Metrics) (*Datasource, error) { inf := &informers.Informers{} if err := inf.InitFromConfig(kubeconfig, infConfig, opMetrics); err != nil { return nil, err diff --git a/pkg/pipeline/transform/kubernetes/enrich.go b/pkg/pipeline/transform/kubernetes/enrich.go index 34b9570a0..c71c2cda6 100644 --- a/pkg/pipeline/transform/kubernetes/enrich.go +++ b/pkg/pipeline/transform/kubernetes/enrich.go @@ -17,15 +17,15 @@ var infConfig informers.Config // For testing func MockInformers() { - infConfig = informers.NewConfig(api.NetworkTransformKubeConfig{}) + infConfig = informers.NewConfig(&api.NetworkTransformKubeConfig{}) ds = &datasource.Datasource{Informers: informers.NewInformersMock()} } -func InitInformerDatasource(config api.NetworkTransformKubeConfig, opMetrics *operational.Metrics) error { +func InitInformerDatasource(config *api.NetworkTransformKubeConfig, opMetrics *operational.Metrics) error { var err error infConfig = informers.NewConfig(config) if ds == nil { - ds, err = datasource.NewInformerDatasource(config.ConfigPath, infConfig, opMetrics) + ds, err = datasource.NewInformerDatasource(config.ConfigPath, &infConfig, opMetrics) } return err } diff --git a/pkg/pipeline/transform/kubernetes/informers/config.go b/pkg/pipeline/transform/kubernetes/informers/config.go index 80bc35be3..c773f9e26 100644 --- a/pkg/pipeline/transform/kubernetes/informers/config.go +++ b/pkg/pipeline/transform/kubernetes/informers/config.go @@ -19,12 +19,14 @@ type Config struct { secondaryNetworks []api.SecondaryNetwork hasMultus bool hasUDN bool + trackedKinds []string } -func NewConfig(cfg api.NetworkTransformKubeConfig) Config { +func NewConfig(cfg *api.NetworkTransformKubeConfig) Config { c := Config{ managedCNI: cfg.ManagedCNI, secondaryNetworks: cfg.SecondaryNetworks, + trackedKinds: cfg.TrackedKinds, } if c.managedCNI == nil { c.managedCNI = []string{api.OVN} diff --git a/pkg/pipeline/transform/kubernetes/informers/informers-mock.go b/pkg/pipeline/transform/kubernetes/informers/informers-mock.go index cad81c48f..0ef89e7f7 100644 --- a/pkg/pipeline/transform/kubernetes/informers/informers-mock.go +++ b/pkg/pipeline/transform/kubernetes/informers/informers-mock.go @@ -36,7 +36,7 @@ func NewInformersMock() *Mock { return inf } -func (o *Mock) InitFromConfig(kubeconfig string, infConfig Config, opMetrics *operational.Metrics) error { +func (o *Mock) InitFromConfig(kubeconfig string, infConfig *Config, opMetrics *operational.Metrics) error { args := o.Called(kubeconfig, infConfig, opMetrics) return args.Error(0) } @@ -135,6 +135,24 @@ func (m *IndexerMock) MockReplicaSet(name, namespace, ownerName, ownerKind strin }, true, nil) } +func (m *IndexerMock) MockDeployment(name, namespace, ownerName, ownerKind string) { + if ownerName == "" { + // No owner + m.On("GetByKey", namespace+"/"+name).Return(&metav1.ObjectMeta{ + Name: name, + OwnerReferences: []metav1.OwnerReference{}, + }, true, nil) + } else { + m.On("GetByKey", namespace+"/"+name).Return(&metav1.ObjectMeta{ + Name: name, + OwnerReferences: []metav1.OwnerReference{{ + Kind: ownerKind, + Name: ownerName, + }}, + }, true, nil) + } +} + func (m *IndexerMock) FallbackNotFound() { m.On("ByIndex", IndexIP, mock.Anything).Return([]interface{}{}, nil) } @@ -163,6 +181,26 @@ func SetupIndexerMocks(kd *Informers) (pods, nodes, svc, rs *IndexerMock) { return } +func SetupIndexerMocksWithTrackedKinds(kd *Informers, trackedKinds []string) (pods, nodes, svc, rs, deploy *IndexerMock) { + // Setup base informers + pods, nodes, svc, rs = SetupIndexerMocks(kd) + + // Setup additional informers based on trackedKinds + for _, kind := range trackedKinds { + switch kind { + case "Deployment", "Gateway": + // Gateway requires Deployment informer, so we initialize it for both + if deploy == nil { + deploy = &IndexerMock{} + dim := InformerMock{} + dim.On("GetIndexer").Return(deploy) + kd.deployments = &dim + } + } + } + return +} + type FakeInformers struct { Interface ipInfo map[string]*model.ResourceMetaData @@ -171,7 +209,7 @@ type FakeInformers struct { } func SetupStubs(ipInfo, customKeysInfo, nodes map[string]*model.ResourceMetaData) (Config, *FakeInformers) { - cfg := NewConfig(api.NetworkTransformKubeConfig{SecondaryNetworks: secondaryNetConfig}) + cfg := NewConfig(&api.NetworkTransformKubeConfig{SecondaryNetworks: secondaryNetConfig}) return cfg, &FakeInformers{ ipInfo: ipInfo, customKeysInfo: customKeysInfo, @@ -179,7 +217,7 @@ func SetupStubs(ipInfo, customKeysInfo, nodes map[string]*model.ResourceMetaData } } -func (f *FakeInformers) InitFromConfig(_ string, _ Config, _ *operational.Metrics) error { +func (f *FakeInformers) InitFromConfig(_ string, _ *Config, _ *operational.Metrics) error { return nil } diff --git a/pkg/pipeline/transform/kubernetes/informers/informers.go b/pkg/pipeline/transform/kubernetes/informers/informers.go index 546b65b8a..db5ea1b61 100644 --- a/pkg/pipeline/transform/kubernetes/informers/informers.go +++ b/pkg/pipeline/transform/kubernetes/informers/informers.go @@ -55,7 +55,7 @@ var ( type Interface interface { IndexLookup([]cni.SecondaryNetKey, string) *model.ResourceMetaData GetNodeByName(string) (*model.ResourceMetaData, error) - InitFromConfig(string, Config, *operational.Metrics) error + InitFromConfig(string, *Config, *operational.Metrics) error } type Informers struct { @@ -65,7 +65,11 @@ type Informers struct { nodes cache.SharedIndexInformer services cache.SharedIndexInformer // replicaSets caches the ReplicaSets as partially-filled *ObjectMeta pointers - replicaSets cache.SharedIndexInformer + replicaSets cache.SharedIndexInformer + // New informers for ownership tracking + deployments cache.SharedIndexInformer + // Config and channels + config Config stopChan chan struct{} mdStopChan chan struct{} indexerHitMetric *prometheus.CounterVec @@ -178,19 +182,120 @@ func (k *Informers) GetNodeByName(name string) (*model.ResourceMetaData, error) } func (k *Informers) checkParent(info *model.ResourceMetaData) { - if info.OwnerKind == "ReplicaSet" { - item, ok, err := k.replicaSets.GetIndexer().GetByKey(info.Namespace + "/" + info.OwnerName) - if err != nil { - log.WithError(err).WithField("key", info.Namespace+"/"+info.OwnerName). - Debug("can't get ReplicaSet info from informer. Ignoring") - } else if ok { - rsInfo := item.(*metav1.ObjectMeta) - if len(rsInfo.OwnerReferences) > 0 { - info.OwnerKind = rsInfo.OwnerReferences[0].Kind - info.OwnerName = rsInfo.OwnerReferences[0].Name + // Maximum 3 ownership hops: Pod → ReplicaSet → Deployment → Gateway + // This allows tracking up to 3 levels beyond the initial resource + const maxHops = 3 + + // If trackedKinds is empty, use legacy behavior (stop after ReplicaSet resolution) + if len(k.config.trackedKinds) == 0 { + // Legacy behavior: only resolve ReplicaSet + if info.OwnerKind == "ReplicaSet" { + item, ok, err := k.replicaSets.GetIndexer().GetByKey(info.Namespace + "/" + info.OwnerName) + if err != nil { + log.WithError(err).WithField("key", info.Namespace+"/"+info.OwnerName). + Debug("can't get ReplicaSet info from informer. Ignoring") + return + } + if ok { + rsInfo := item.(*metav1.ObjectMeta) + if len(rsInfo.OwnerReferences) > 0 { + info.OwnerKind = rsInfo.OwnerReferences[0].Kind + info.OwnerName = rsInfo.OwnerReferences[0].Name + } + } + } + return + } + + // New behavior with trackedKinds: traverse ownership chain until we find a tracked kind or hit max depth + for i := 0; i < maxHops; i++ { + // Check if current owner is in trackedKinds + if k.isTracked(info.OwnerKind) { + // This owner IS tracked. Try to get its parent to see if we can go higher. + parent := k.getOwnerFromInformer(info.OwnerKind, info.Namespace, info.OwnerName) + if parent == nil { + // No parent exists → STOP at current tracked kind + break + } + // Parent exists - check if parent is ALSO tracked + if k.isTracked(parent.Kind) { + // Parent is also tracked → update and continue (prefer higher level) + info.OwnerKind = parent.Kind + info.OwnerName = parent.Name + continue } + // Parent exists but is NOT tracked → STOP at current tracked kind + break + } + + // Current owner is NOT tracked → try to find a tracked parent + parent := k.getOwnerFromInformer(info.OwnerKind, info.Namespace, info.OwnerName) + if parent == nil { + // No parent found → STOP at current (untracked) owner + break + } + + // Update to parent and continue + info.OwnerKind = parent.Kind + info.OwnerName = parent.Name + } +} + +// isTracked returns true if the given kind is in the trackedKinds configuration +func (k *Informers) isTracked(kind string) bool { + for _, tracked := range k.config.trackedKinds { + if tracked == kind { + return true } } + return false +} + +// OwnerInfo contains basic ownership information +type OwnerInfo struct { + Kind string + Name string +} + +// getOwnerFromInformer retrieves the owner of a resource from the appropriate informer +func (k *Informers) getOwnerFromInformer(kind, namespace, name string) *OwnerInfo { + var informer cache.SharedIndexInformer + + switch kind { + case "ReplicaSet": + informer = k.replicaSets + case "Deployment": + informer = k.deployments + default: + return nil + } + + if informer == nil { + log.WithField("kind", kind).Debug("informer not initialized for this kind") + return nil + } + + item, ok, err := informer.GetIndexer().GetByKey(namespace + "/" + name) + if err != nil { + log.WithError(err). + WithField("kind", kind). + WithField("key", namespace+"/"+name). + Debug("can't get resource info from informer") + return nil + } + if !ok { + return nil + } + + meta := item.(*metav1.ObjectMeta) + if len(meta.OwnerReferences) == 0 { + return nil + } + + return &OwnerInfo{ + Kind: meta.OwnerReferences[0].Kind, + Name: meta.OwnerReferences[0].Name, + } } func (k *Informers) getHostName(hostIP string) string { @@ -202,7 +307,7 @@ func (k *Informers) getHostName(hostIP string) string { return "" } -func (k *Informers) initNodeInformer(informerFactory inf.SharedInformerFactory, cfg Config) error { +func (k *Informers) initNodeInformer(informerFactory inf.SharedInformerFactory, cfg *Config) error { nodes := informerFactory.Core().V1().Nodes().Informer() // Transform any *v1.Node instance into a *Info instance to save space // in the informer's cache @@ -254,7 +359,7 @@ func (k *Informers) initNodeInformer(informerFactory inf.SharedInformerFactory, return nil } -func (k *Informers) initPodInformer(informerFactory inf.SharedInformerFactory, cfg Config, dynClient *dynamic.DynamicClient) error { +func (k *Informers) initPodInformer(informerFactory inf.SharedInformerFactory, cfg *Config, dynClient *dynamic.DynamicClient) error { pods := informerFactory.Core().V1().Pods().Informer() // Transform any *v1.Pod instance into a *Info instance to save space // in the informer's cache @@ -376,8 +481,32 @@ func (k *Informers) initReplicaSetInformer(informerFactory metadatainformer.Shar return nil } -func (k *Informers) InitFromConfig(kubeconfig string, infConfig Config, opMetrics *operational.Metrics) error { +func (k *Informers) initDeploymentInformer(informerFactory metadatainformer.SharedInformerFactory) error { + k.deployments = informerFactory.ForResource( + schema.GroupVersionResource{ + Group: "apps", + Version: "v1", + Resource: "deployments", + }).Informer() + if err := k.deployments.SetTransform(func(i interface{}) (interface{}, error) { + deploy, ok := i.(*metav1.PartialObjectMetadata) + if !ok { + return nil, fmt.Errorf("was expecting a Deployment. Got: %T", i) + } + return &metav1.ObjectMeta{ + Name: deploy.Name, + Namespace: deploy.Namespace, + OwnerReferences: deploy.OwnerReferences, + }, nil + }); err != nil { + return fmt.Errorf("can't set Deployments transform: %w", err) + } + return nil +} + +func (k *Informers) InitFromConfig(kubeconfig string, infConfig *Config, opMetrics *operational.Metrics) error { // Initialization variables + k.config = *infConfig k.stopChan = make(chan struct{}) k.mdStopChan = make(chan struct{}) @@ -410,7 +539,7 @@ func (k *Informers) InitFromConfig(kubeconfig string, infConfig Config, opMetric return nil } -func (k *Informers) initInformers(client kubernetes.Interface, metaClient metadata.Interface, dynClient *dynamic.DynamicClient, cfg Config) error { +func (k *Informers) initInformers(client kubernetes.Interface, metaClient metadata.Interface, dynClient *dynamic.DynamicClient, cfg *Config) error { informerFactory := inf.NewSharedInformerFactory(client, syncTime) metadataInformerFactory := metadatainformer.NewSharedInformerFactory(metaClient, syncTime) err := k.initNodeInformer(informerFactory, cfg) @@ -430,6 +559,17 @@ func (k *Informers) initInformers(client kubernetes.Interface, metaClient metada return err } + // Initialize additional informers based on trackedKinds configuration + for _, kind := range cfg.trackedKinds { + if kind == "Deployment" { + // Gateway requires Deployment informer to navigate ownership chain + log.Debugf("initializing Deployment informer (trackedKinds)") + if err := k.initDeploymentInformer(metadataInformerFactory); err != nil { + return err + } + } + } + // Informers expose an indexer log.Debugf("adding indexers") byIP := cache.Indexers{IndexIP: ipIndexer} diff --git a/pkg/pipeline/transform/kubernetes/informers/informers_test.go b/pkg/pipeline/transform/kubernetes/informers/informers_test.go index d28f20305..dd7c18ae2 100644 --- a/pkg/pipeline/transform/kubernetes/informers/informers_test.go +++ b/pkg/pipeline/transform/kubernetes/informers/informers_test.go @@ -121,3 +121,120 @@ func TestGetInfo(t *testing.T) { info = kubeData.IndexLookup(nil, "1.2.3.200") require.Nil(t, info) } + +// TestOwnershipTracking_GatewayAPI tests the ownership chain: Pod → ReplicaSet → Deployment → Gateway +func TestOwnershipTracking_GatewayAPI(t *testing.T) { + metrics := operational.NewMetrics(&config.MetricsSettings{}) + kubeData := Informers{ + indexerHitMetric: metrics.CreateIndexerHitCounter(), + config: Config{ + trackedKinds: []string{"Deployment", "Gateway"}, + }, + } + + pidx, hidx, sidx, ridx, didx := SetupIndexerMocksWithTrackedKinds(&kubeData, []string{"Deployment", "Gateway"}) + + // Setup mocks for the ownership chain + ridx.MockReplicaSet("rs1", "test-ns", "deploy1", "Deployment") + ridx.FallbackNotFound() + didx.MockDeployment("deploy1", "test-ns", "gateway1", "Gateway") + + pidx.MockPod("1.2.3.4", "", "", "pod1", "test-ns", "10.0.0.1", "rs1", "ReplicaSet") + pidx.FallbackNotFound() + hidx.MockNode("10.0.0.1", "node1") + hidx.FallbackNotFound() + sidx.FallbackNotFound() + + // Test: Pod should resolve to Gateway as final owner + info := kubeData.IndexLookup(nil, "1.2.3.4") + require.NotNil(t, info) + require.Equal(t, "Gateway", info.OwnerKind) + require.Equal(t, "gateway1", info.OwnerName) +} + +// TestOwnershipTracking_OnlyDeployment tests when only Deployment is tracked (not Gateway) +func TestOwnershipTracking_OnlyDeployment(t *testing.T) { + metrics := operational.NewMetrics(&config.MetricsSettings{}) + kubeData := Informers{ + indexerHitMetric: metrics.CreateIndexerHitCounter(), + config: Config{ + trackedKinds: []string{"Deployment"}, // Gateway NOT tracked + }, + } + + pidx, hidx, sidx, ridx, didx := SetupIndexerMocksWithTrackedKinds(&kubeData, []string{"Deployment"}) + + ridx.MockReplicaSet("rs1", "test-ns", "deploy1", "Deployment") + ridx.FallbackNotFound() + didx.MockDeployment("deploy1", "test-ns", "gateway1", "Gateway") + + pidx.MockPod("1.2.3.4", "", "", "pod1", "test-ns", "10.0.0.1", "rs1", "ReplicaSet") + pidx.FallbackNotFound() + hidx.MockNode("10.0.0.1", "node1") + hidx.FallbackNotFound() + sidx.FallbackNotFound() + + // Test: Pod should resolve to Deployment (stops there because Gateway is not tracked) + info := kubeData.IndexLookup(nil, "1.2.3.4") + require.NotNil(t, info) + require.Equal(t, "Deployment", info.OwnerKind) + require.Equal(t, "deploy1", info.OwnerName) +} + +// TestOwnershipTracking_NoTrackedKinds tests backward compatibility (no trackedKinds configured) +func TestOwnershipTracking_NoTrackedKinds(t *testing.T) { + metrics := operational.NewMetrics(&config.MetricsSettings{}) + kubeData := Informers{ + indexerHitMetric: metrics.CreateIndexerHitCounter(), + config: Config{ + trackedKinds: []string{}, // Empty list + }, + } + + pidx, hidx, sidx, ridx := SetupIndexerMocks(&kubeData) + + ridx.MockReplicaSet("rs1", "test-ns", "deploy1", "Deployment") + ridx.FallbackNotFound() + + pidx.MockPod("1.2.3.4", "", "", "pod1", "test-ns", "10.0.0.1", "rs1", "ReplicaSet") + pidx.FallbackNotFound() + hidx.MockNode("10.0.0.1", "node1") + hidx.FallbackNotFound() + sidx.FallbackNotFound() + + // Test: Pod should resolve to Deployment (ReplicaSet is always processed) + info := kubeData.IndexLookup(nil, "1.2.3.4") + require.NotNil(t, info) + require.Equal(t, "Deployment", info.OwnerKind) + require.Equal(t, "deploy1", info.OwnerName) +} + +// TestOwnershipTracking_MaxDepth tests that ownership tracking stops at 3 levels +func TestOwnershipTracking_MaxDepth(t *testing.T) { + metrics := operational.NewMetrics(&config.MetricsSettings{}) + kubeData := Informers{ + indexerHitMetric: metrics.CreateIndexerHitCounter(), + config: Config{ + trackedKinds: []string{"Deployment", "Gateway"}, + }, + } + + pidx, hidx, sidx, ridx, didx := SetupIndexerMocksWithTrackedKinds(&kubeData, []string{"Deployment", "Gateway"}) + + ridx.MockReplicaSet("rs1", "test-ns", "deploy1", "Deployment") + ridx.FallbackNotFound() + // Deployment owned by Gateway (which we can't traverse further without Gateway informer) + didx.MockDeployment("deploy1", "test-ns", "gateway1", "Gateway") + + pidx.MockPod("1.2.3.4", "", "", "pod1", "test-ns", "10.0.0.1", "rs1", "ReplicaSet") + pidx.FallbackNotFound() + hidx.MockNode("10.0.0.1", "node1") + hidx.FallbackNotFound() + sidx.FallbackNotFound() + + // Test: Should stop at Gateway (3rd level), not continue to 4th level + info := kubeData.IndexLookup(nil, "1.2.3.4") + require.NotNil(t, info) + require.Equal(t, "Gateway", info.OwnerKind) + require.Equal(t, "gateway1", info.OwnerName) +} diff --git a/pkg/pipeline/transform/transform_network.go b/pkg/pipeline/transform/transform_network.go index 45daba68e..9e806f780 100644 --- a/pkg/pipeline/transform/transform_network.go +++ b/pkg/pipeline/transform/transform_network.go @@ -219,7 +219,7 @@ func NewTransformNetwork(params config.StageParam, opMetrics *operational.Metric } if needToInitKubeData { - err := kubernetes.InitInformerDatasource(jsonNetworkTransform.KubeConfig, opMetrics) + err := kubernetes.InitInformerDatasource(&jsonNetworkTransform.KubeConfig, opMetrics) if err != nil { return nil, err }