Skip to content

Commit 25aa7fa

Browse files
NETOBSERV-2146: Add the ability to track ownership beyond ReplicaSet (#1125)
* improvements
1 parent c86352b commit 25aa7fa

File tree

8 files changed

+323
-25
lines changed

8 files changed

+323
-25
lines changed

pkg/api/transform_network.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ type NetworkTransformKubeConfig struct {
5252
ConfigPath string `yaml:"configPath,omitempty" json:"configPath,omitempty" doc:"path to kubeconfig file (optional)"`
5353
SecondaryNetworks []SecondaryNetwork `yaml:"secondaryNetworks,omitempty" json:"secondaryNetworks,omitempty" doc:"configuration for secondary networks"`
5454
ManagedCNI []string `yaml:"managedCNI,omitempty" json:"managedCNI,omitempty" doc:"a list of CNI (network plugins) to manage, for detecting additional interfaces. Currently supported: ovn"`
55+
TrackedKinds []string `yaml:"trackedKinds,omitempty" json:"trackedKinds,omitempty" doc:"list of Kubernetes resource kinds to track for ownership chain (e.g., Deployment, Gateway, VirtualMachine). If a resource's owner is in this list, FLP will continue tracking up the ownership chain."`
5556
}
5657

5758
type TransformNetworkOperationEnum string

pkg/pipeline/transform/kubernetes/datasource/datasource.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ type Datasource struct {
1111
Informers informers.Interface
1212
}
1313

14-
func NewInformerDatasource(kubeconfig string, infConfig informers.Config, opMetrics *operational.Metrics) (*Datasource, error) {
14+
func NewInformerDatasource(kubeconfig string, infConfig *informers.Config, opMetrics *operational.Metrics) (*Datasource, error) {
1515
inf := &informers.Informers{}
1616
if err := inf.InitFromConfig(kubeconfig, infConfig, opMetrics); err != nil {
1717
return nil, err

pkg/pipeline/transform/kubernetes/enrich.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,15 +21,15 @@ const (
2121

2222
// For testing
2323
func MockInformers() {
24-
infConfig = informers.NewConfig(api.NetworkTransformKubeConfig{})
24+
infConfig = informers.NewConfig(&api.NetworkTransformKubeConfig{})
2525
ds = &datasource.Datasource{Informers: informers.NewInformersMock()}
2626
}
2727

28-
func InitInformerDatasource(config api.NetworkTransformKubeConfig, opMetrics *operational.Metrics) error {
28+
func InitInformerDatasource(config *api.NetworkTransformKubeConfig, opMetrics *operational.Metrics) error {
2929
var err error
3030
infConfig = informers.NewConfig(config)
3131
if ds == nil {
32-
ds, err = datasource.NewInformerDatasource(config.ConfigPath, infConfig, opMetrics)
32+
ds, err = datasource.NewInformerDatasource(config.ConfigPath, &infConfig, opMetrics)
3333
}
3434
return err
3535
}

pkg/pipeline/transform/kubernetes/informers/config.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,12 +19,14 @@ type Config struct {
1919
secondaryNetworks []api.SecondaryNetwork
2020
hasMultus bool
2121
hasUDN bool
22+
trackedKinds []string
2223
}
2324

24-
func NewConfig(cfg api.NetworkTransformKubeConfig) Config {
25+
func NewConfig(cfg *api.NetworkTransformKubeConfig) Config {
2526
c := Config{
2627
managedCNI: cfg.ManagedCNI,
2728
secondaryNetworks: cfg.SecondaryNetworks,
29+
trackedKinds: cfg.TrackedKinds,
2830
}
2931
if c.managedCNI == nil {
3032
c.managedCNI = []string{api.OVN}

pkg/pipeline/transform/kubernetes/informers/informers-mock.go

Lines changed: 41 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ func NewInformersMock() *Mock {
3636
return inf
3737
}
3838

39-
func (o *Mock) InitFromConfig(kubeconfig string, infConfig Config, opMetrics *operational.Metrics) error {
39+
func (o *Mock) InitFromConfig(kubeconfig string, infConfig *Config, opMetrics *operational.Metrics) error {
4040
args := o.Called(kubeconfig, infConfig, opMetrics)
4141
return args.Error(0)
4242
}
@@ -135,6 +135,24 @@ func (m *IndexerMock) MockReplicaSet(name, namespace, ownerName, ownerKind strin
135135
}, true, nil)
136136
}
137137

138+
func (m *IndexerMock) MockDeployment(name, namespace, ownerName, ownerKind string) {
139+
if ownerName == "" {
140+
// No owner
141+
m.On("GetByKey", namespace+"/"+name).Return(&metav1.ObjectMeta{
142+
Name: name,
143+
OwnerReferences: []metav1.OwnerReference{},
144+
}, true, nil)
145+
} else {
146+
m.On("GetByKey", namespace+"/"+name).Return(&metav1.ObjectMeta{
147+
Name: name,
148+
OwnerReferences: []metav1.OwnerReference{{
149+
Kind: ownerKind,
150+
Name: ownerName,
151+
}},
152+
}, true, nil)
153+
}
154+
}
155+
138156
func (m *IndexerMock) FallbackNotFound() {
139157
m.On("ByIndex", IndexIP, mock.Anything).Return([]interface{}{}, nil)
140158
}
@@ -163,6 +181,26 @@ func SetupIndexerMocks(kd *Informers) (pods, nodes, svc, rs *IndexerMock) {
163181
return
164182
}
165183

184+
func SetupIndexerMocksWithTrackedKinds(kd *Informers, trackedKinds []string) (pods, nodes, svc, rs, deploy *IndexerMock) {
185+
// Setup base informers
186+
pods, nodes, svc, rs = SetupIndexerMocks(kd)
187+
188+
// Setup additional informers based on trackedKinds
189+
for _, kind := range trackedKinds {
190+
switch kind {
191+
case "Deployment", "Gateway":
192+
// Gateway requires Deployment informer, so we initialize it for both
193+
if deploy == nil {
194+
deploy = &IndexerMock{}
195+
dim := InformerMock{}
196+
dim.On("GetIndexer").Return(deploy)
197+
kd.deployments = &dim
198+
}
199+
}
200+
}
201+
return
202+
}
203+
166204
type FakeInformers struct {
167205
Interface
168206
ipInfo map[string]*model.ResourceMetaData
@@ -171,15 +209,15 @@ type FakeInformers struct {
171209
}
172210

173211
func SetupStubs(ipInfo, customKeysInfo, nodes map[string]*model.ResourceMetaData) (Config, *FakeInformers) {
174-
cfg := NewConfig(api.NetworkTransformKubeConfig{SecondaryNetworks: secondaryNetConfig})
212+
cfg := NewConfig(&api.NetworkTransformKubeConfig{SecondaryNetworks: secondaryNetConfig})
175213
return cfg, &FakeInformers{
176214
ipInfo: ipInfo,
177215
customKeysInfo: customKeysInfo,
178216
nodes: nodes,
179217
}
180218
}
181219

182-
func (f *FakeInformers) InitFromConfig(_ string, _ Config, _ *operational.Metrics) error {
220+
func (f *FakeInformers) InitFromConfig(_ string, _ *Config, _ *operational.Metrics) error {
183221
return nil
184222
}
185223

pkg/pipeline/transform/kubernetes/informers/informers.go

Lines changed: 156 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ var (
5555
type Interface interface {
5656
IndexLookup([]cni.SecondaryNetKey, string) *model.ResourceMetaData
5757
GetNodeByName(string) (*model.ResourceMetaData, error)
58-
InitFromConfig(string, Config, *operational.Metrics) error
58+
InitFromConfig(string, *Config, *operational.Metrics) error
5959
}
6060

6161
type Informers struct {
@@ -65,7 +65,11 @@ type Informers struct {
6565
nodes cache.SharedIndexInformer
6666
services cache.SharedIndexInformer
6767
// replicaSets caches the ReplicaSets as partially-filled *ObjectMeta pointers
68-
replicaSets cache.SharedIndexInformer
68+
replicaSets cache.SharedIndexInformer
69+
// New informers for ownership tracking
70+
deployments cache.SharedIndexInformer
71+
// Config and channels
72+
config Config
6973
stopChan chan struct{}
7074
mdStopChan chan struct{}
7175
indexerHitMetric *prometheus.CounterVec
@@ -178,19 +182,120 @@ func (k *Informers) GetNodeByName(name string) (*model.ResourceMetaData, error)
178182
}
179183

180184
func (k *Informers) checkParent(info *model.ResourceMetaData) {
181-
if info.OwnerKind == "ReplicaSet" {
182-
item, ok, err := k.replicaSets.GetIndexer().GetByKey(info.Namespace + "/" + info.OwnerName)
183-
if err != nil {
184-
log.WithError(err).WithField("key", info.Namespace+"/"+info.OwnerName).
185-
Debug("can't get ReplicaSet info from informer. Ignoring")
186-
} else if ok {
187-
rsInfo := item.(*metav1.ObjectMeta)
188-
if len(rsInfo.OwnerReferences) > 0 {
189-
info.OwnerKind = rsInfo.OwnerReferences[0].Kind
190-
info.OwnerName = rsInfo.OwnerReferences[0].Name
185+
// Maximum 3 ownership hops: Pod → ReplicaSet → Deployment → Gateway
186+
// This allows tracking up to 3 levels beyond the initial resource
187+
const maxHops = 3
188+
189+
// If trackedKinds is empty, use legacy behavior (stop after ReplicaSet resolution)
190+
if len(k.config.trackedKinds) == 0 {
191+
// Legacy behavior: only resolve ReplicaSet
192+
if info.OwnerKind == "ReplicaSet" {
193+
item, ok, err := k.replicaSets.GetIndexer().GetByKey(info.Namespace + "/" + info.OwnerName)
194+
if err != nil {
195+
log.WithError(err).WithField("key", info.Namespace+"/"+info.OwnerName).
196+
Debug("can't get ReplicaSet info from informer. Ignoring")
197+
return
198+
}
199+
if ok {
200+
rsInfo := item.(*metav1.ObjectMeta)
201+
if len(rsInfo.OwnerReferences) > 0 {
202+
info.OwnerKind = rsInfo.OwnerReferences[0].Kind
203+
info.OwnerName = rsInfo.OwnerReferences[0].Name
204+
}
205+
}
206+
}
207+
return
208+
}
209+
210+
// New behavior with trackedKinds: traverse ownership chain until we find a tracked kind or hit max depth
211+
for i := 0; i < maxHops; i++ {
212+
// Check if current owner is in trackedKinds
213+
if k.isTracked(info.OwnerKind) {
214+
// This owner IS tracked. Try to get its parent to see if we can go higher.
215+
parent := k.getOwnerFromInformer(info.OwnerKind, info.Namespace, info.OwnerName)
216+
if parent == nil {
217+
// No parent exists → STOP at current tracked kind
218+
break
219+
}
220+
// Parent exists - check if parent is ALSO tracked
221+
if k.isTracked(parent.Kind) {
222+
// Parent is also tracked → update and continue (prefer higher level)
223+
info.OwnerKind = parent.Kind
224+
info.OwnerName = parent.Name
225+
continue
191226
}
227+
// Parent exists but is NOT tracked → STOP at current tracked kind
228+
break
229+
}
230+
231+
// Current owner is NOT tracked → try to find a tracked parent
232+
parent := k.getOwnerFromInformer(info.OwnerKind, info.Namespace, info.OwnerName)
233+
if parent == nil {
234+
// No parent found → STOP at current (untracked) owner
235+
break
236+
}
237+
238+
// Update to parent and continue
239+
info.OwnerKind = parent.Kind
240+
info.OwnerName = parent.Name
241+
}
242+
}
243+
244+
// isTracked returns true if the given kind is in the trackedKinds configuration
245+
func (k *Informers) isTracked(kind string) bool {
246+
for _, tracked := range k.config.trackedKinds {
247+
if tracked == kind {
248+
return true
192249
}
193250
}
251+
return false
252+
}
253+
254+
// OwnerInfo contains basic ownership information
255+
type OwnerInfo struct {
256+
Kind string
257+
Name string
258+
}
259+
260+
// getOwnerFromInformer retrieves the owner of a resource from the appropriate informer
261+
func (k *Informers) getOwnerFromInformer(kind, namespace, name string) *OwnerInfo {
262+
var informer cache.SharedIndexInformer
263+
264+
switch kind {
265+
case "ReplicaSet":
266+
informer = k.replicaSets
267+
case "Deployment":
268+
informer = k.deployments
269+
default:
270+
return nil
271+
}
272+
273+
if informer == nil {
274+
log.WithField("kind", kind).Debug("informer not initialized for this kind")
275+
return nil
276+
}
277+
278+
item, ok, err := informer.GetIndexer().GetByKey(namespace + "/" + name)
279+
if err != nil {
280+
log.WithError(err).
281+
WithField("kind", kind).
282+
WithField("key", namespace+"/"+name).
283+
Debug("can't get resource info from informer")
284+
return nil
285+
}
286+
if !ok {
287+
return nil
288+
}
289+
290+
meta := item.(*metav1.ObjectMeta)
291+
if len(meta.OwnerReferences) == 0 {
292+
return nil
293+
}
294+
295+
return &OwnerInfo{
296+
Kind: meta.OwnerReferences[0].Kind,
297+
Name: meta.OwnerReferences[0].Name,
298+
}
194299
}
195300

196301
func (k *Informers) getHostName(hostIP string) string {
@@ -202,7 +307,7 @@ func (k *Informers) getHostName(hostIP string) string {
202307
return ""
203308
}
204309

205-
func (k *Informers) initNodeInformer(informerFactory inf.SharedInformerFactory, cfg Config) error {
310+
func (k *Informers) initNodeInformer(informerFactory inf.SharedInformerFactory, cfg *Config) error {
206311
nodes := informerFactory.Core().V1().Nodes().Informer()
207312
// Transform any *v1.Node instance into a *Info instance to save space
208313
// in the informer's cache
@@ -255,7 +360,7 @@ func (k *Informers) initNodeInformer(informerFactory inf.SharedInformerFactory,
255360
return nil
256361
}
257362

258-
func (k *Informers) initPodInformer(informerFactory inf.SharedInformerFactory, cfg Config, dynClient *dynamic.DynamicClient) error {
363+
func (k *Informers) initPodInformer(informerFactory inf.SharedInformerFactory, cfg *Config, dynClient *dynamic.DynamicClient) error {
259364
pods := informerFactory.Core().V1().Pods().Informer()
260365
// Transform any *v1.Pod instance into a *Info instance to save space
261366
// in the informer's cache
@@ -379,8 +484,32 @@ func (k *Informers) initReplicaSetInformer(informerFactory metadatainformer.Shar
379484
return nil
380485
}
381486

382-
func (k *Informers) InitFromConfig(kubeconfig string, infConfig Config, opMetrics *operational.Metrics) error {
487+
func (k *Informers) initDeploymentInformer(informerFactory metadatainformer.SharedInformerFactory) error {
488+
k.deployments = informerFactory.ForResource(
489+
schema.GroupVersionResource{
490+
Group: "apps",
491+
Version: "v1",
492+
Resource: "deployments",
493+
}).Informer()
494+
if err := k.deployments.SetTransform(func(i interface{}) (interface{}, error) {
495+
deploy, ok := i.(*metav1.PartialObjectMetadata)
496+
if !ok {
497+
return nil, fmt.Errorf("was expecting a Deployment. Got: %T", i)
498+
}
499+
return &metav1.ObjectMeta{
500+
Name: deploy.Name,
501+
Namespace: deploy.Namespace,
502+
OwnerReferences: deploy.OwnerReferences,
503+
}, nil
504+
}); err != nil {
505+
return fmt.Errorf("can't set Deployments transform: %w", err)
506+
}
507+
return nil
508+
}
509+
510+
func (k *Informers) InitFromConfig(kubeconfig string, infConfig *Config, opMetrics *operational.Metrics) error {
383511
// Initialization variables
512+
k.config = *infConfig
384513
k.stopChan = make(chan struct{})
385514
k.mdStopChan = make(chan struct{})
386515

@@ -413,7 +542,7 @@ func (k *Informers) InitFromConfig(kubeconfig string, infConfig Config, opMetric
413542
return nil
414543
}
415544

416-
func (k *Informers) initInformers(client kubernetes.Interface, metaClient metadata.Interface, dynClient *dynamic.DynamicClient, cfg Config) error {
545+
func (k *Informers) initInformers(client kubernetes.Interface, metaClient metadata.Interface, dynClient *dynamic.DynamicClient, cfg *Config) error {
417546
informerFactory := inf.NewSharedInformerFactory(client, syncTime)
418547
metadataInformerFactory := metadatainformer.NewSharedInformerFactory(metaClient, syncTime)
419548
err := k.initNodeInformer(informerFactory, cfg)
@@ -433,6 +562,17 @@ func (k *Informers) initInformers(client kubernetes.Interface, metaClient metada
433562
return err
434563
}
435564

565+
// Initialize additional informers based on trackedKinds configuration
566+
for _, kind := range cfg.trackedKinds {
567+
if kind == "Deployment" {
568+
// Gateway requires Deployment informer to navigate ownership chain
569+
log.Debugf("initializing Deployment informer (trackedKinds)")
570+
if err := k.initDeploymentInformer(metadataInformerFactory); err != nil {
571+
return err
572+
}
573+
}
574+
}
575+
436576
// Informers expose an indexer
437577
log.Debugf("adding indexers")
438578
byIP := cache.Indexers{IndexIP: ipIndexer}

0 commit comments

Comments
 (0)