diff --git a/internal/k8sconfig/config.go b/internal/k8sconfig/config.go index b99c14173c02e..f52f11d272df2 100644 --- a/internal/k8sconfig/config.go +++ b/internal/k8sconfig/config.go @@ -21,6 +21,7 @@ import ( "k8s.io/apimachinery/pkg/watch" "k8s.io/client-go/dynamic" k8s "k8s.io/client-go/kubernetes" + "k8s.io/client-go/metadata" "k8s.io/client-go/rest" "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/clientcmd" @@ -168,6 +169,25 @@ func MakeDynamicClient(apiConf APIConfig) (dynamic.Interface, error) { return client, nil } +// MakeMetadataClient constructs a client-go metadata.Interface using the same APIConfig flow. +// It mirrors MakeDynamicClient, but uses metadata.NewForConfig. +func MakeMetadataClient(apiConf APIConfig) (metadata.Interface, error) { + if err := apiConf.Validate(); err != nil { + return nil, err + } + + authConf, err := CreateRestConfig(apiConf) + if err != nil { + return nil, err + } + + mc, err := metadata.NewForConfig(authConf) + if err != nil { + return nil, err + } + return mc, nil +} + // MakeOpenShiftQuotaClient can take configuration if needed for other types of auth // and return an OpenShift quota API client func MakeOpenShiftQuotaClient(apiConf APIConfig) (quotaclientset.Interface, error) { diff --git a/processor/k8sattributesprocessor/internal/kube/client.go b/processor/k8sattributesprocessor/internal/kube/client.go index aafe458b12d38..a8c16fcf36719 100644 --- a/processor/k8sattributesprocessor/internal/kube/client.go +++ b/processor/k8sattributesprocessor/internal/kube/client.go @@ -249,22 +249,9 @@ func New( if rules.DeploymentName || rules.DeploymentUID { if informersFactory.newReplicaSetInformer == nil { - informersFactory.newReplicaSetInformer = newReplicaSetSharedInformer + informersFactory.newReplicaSetInformer = NewReplicaSetMetaInformerProvider(apiCfg) } c.replicasetInformer = informersFactory.newReplicaSetInformer(c.kc, c.Filters.Namespace) - err = c.replicasetInformer.SetTransform( - func(object any) (any, error) { - originalReplicaset, success := object.(*apps_v1.ReplicaSet) - if !success { // means this is a cache.DeletedFinalStateUnknown, in which case we do nothing - return object, nil - } - - return removeUnnecessaryReplicaSetData(originalReplicaset), nil - }, - ) - if err != nil { - return nil, err - } } if c.extractNodeLabelsAnnotations() || c.extractNodeUID() { @@ -1760,17 +1747,70 @@ func needContainerAttributes(rules ExtractionRules) bool { } func (c *WatchClient) handleReplicaSetAdd(obj any) { - c.telemetryBuilder.OtelsvcK8sReplicasetAdded.Add(context.Background(), 1) - if replicaset, ok := obj.(*apps_v1.ReplicaSet); ok { - c.addOrUpdateReplicaSet(replicaset) - } else { - c.logger.Error("object received was not of type apps_v1.ReplicaSet", zap.Any("received", obj)) + switch rs := obj.(type) { + case *apps_v1.ReplicaSet: + c.addOrUpdateReplicaSetTyped(rs) + case *meta_v1.PartialObjectMetadata: + c.addOrUpdateReplicaSetMeta(rs) + case cache.DeletedFinalStateUnknown: + if inner, ok := rs.Obj.(*apps_v1.ReplicaSet); ok { + c.addOrUpdateReplicaSetTyped(inner) + } else if inner, ok := rs.Obj.(*meta_v1.PartialObjectMetadata); ok { + c.addOrUpdateReplicaSetMeta(inner) + } + default: + c.logger.Warn("object received was not ReplicaSet", zap.Any("obj", obj)) + } +} + +func (c *WatchClient) addOrUpdateReplicaSetTyped(rs *apps_v1.ReplicaSet) { + newRS := &ReplicaSet{ + Name: rs.GetName(), + Namespace: rs.GetNamespace(), + UID: string(rs.GetUID()), + } + for _, owner := range rs.OwnerReferences { + if owner.Kind == "Deployment" && owner.Controller != nil && *owner.Controller { + newRS.Deployment = Deployment{ + Name: owner.Name, + UID: string(owner.UID), + } + break + } + } + c.m.Lock() + if rs.GetUID() != "" { + c.ReplicaSets[string(rs.GetUID())] = newRS + } + c.m.Unlock() +} + +func (c *WatchClient) addOrUpdateReplicaSetMeta(rs *meta_v1.PartialObjectMetadata) { + newRS := &ReplicaSet{ + Name: rs.GetName(), + Namespace: rs.GetNamespace(), + UID: string(rs.GetUID()), + } + for _, owner := range rs.GetOwnerReferences() { + if owner.Kind == "Deployment" && owner.Controller != nil && *owner.Controller { + newRS.Deployment = Deployment{ + Name: owner.Name, + UID: string(owner.UID), + } + break + } } + + c.m.Lock() + if uid := rs.GetUID(); uid != "" { + c.ReplicaSets[string(uid)] = newRS + } + c.m.Unlock() } func (c *WatchClient) handleReplicaSetUpdate(_, newRS any) { c.telemetryBuilder.OtelsvcK8sReplicasetUpdated.Add(context.Background(), 1) - if replicaset, ok := newRS.(*apps_v1.ReplicaSet); ok { + if replicaset, ok := newRS.(*meta_v1.PartialObjectMetadata); ok { c.addOrUpdateReplicaSet(replicaset) } else { c.logger.Error("object received was not of type apps_v1.ReplicaSet", zap.Any("received", newRS)) @@ -1779,7 +1819,7 @@ func (c *WatchClient) handleReplicaSetUpdate(_, newRS any) { func (c *WatchClient) handleReplicaSetDelete(obj any) { c.telemetryBuilder.OtelsvcK8sReplicasetDeleted.Add(context.Background(), 1) - if replicaset, ok := ignoreDeletedFinalStateUnknown(obj).(*apps_v1.ReplicaSet); ok { + if replicaset, ok := ignoreDeletedFinalStateUnknown(obj).(*meta_v1.PartialObjectMetadata); ok { c.m.Lock() key := string(replicaset.UID) delete(c.ReplicaSets, key) @@ -1789,7 +1829,7 @@ func (c *WatchClient) handleReplicaSetDelete(obj any) { } } -func (c *WatchClient) addOrUpdateReplicaSet(replicaset *apps_v1.ReplicaSet) { +func (c *WatchClient) addOrUpdateReplicaSet(replicaset *meta_v1.PartialObjectMetadata) { newReplicaSet := &ReplicaSet{ Name: replicaset.Name, Namespace: replicaset.Namespace, @@ -1813,19 +1853,6 @@ func (c *WatchClient) addOrUpdateReplicaSet(replicaset *apps_v1.ReplicaSet) { c.m.Unlock() } -// This function removes all data from the ReplicaSet except what is required by extraction rules -func removeUnnecessaryReplicaSetData(replicaset *apps_v1.ReplicaSet) *apps_v1.ReplicaSet { - transformedReplicaset := apps_v1.ReplicaSet{ - ObjectMeta: meta_v1.ObjectMeta{ - Name: replicaset.GetName(), - Namespace: replicaset.GetNamespace(), - UID: replicaset.GetUID(), - }, - } - transformedReplicaset.SetOwnerReferences(replicaset.GetOwnerReferences()) - return &transformedReplicaset -} - // runInformerWithDependencies starts the given informer. The second argument is a list of other informers that should complete // before the informer is started. This is necessary e.g. for the pod informer which requires the replica set informer // to be finished to correctly establish the connection to the replicaset/deployment it belongs to. diff --git a/processor/k8sattributesprocessor/internal/kube/client_test.go b/processor/k8sattributesprocessor/internal/kube/client_test.go index d29ef1cf490f0..5dc10ceae030f 100644 --- a/processor/k8sattributesprocessor/internal/kube/client_test.go +++ b/processor/k8sattributesprocessor/internal/kube/client_test.go @@ -232,12 +232,12 @@ func TestReplicaSetHandler(t *testing.T) { c, _ := newTestClient(t) assert.Empty(t, c.ReplicaSets) - replicaset := &apps_v1.ReplicaSet{} + replicaset := &meta_v1.PartialObjectMetadata{} c.handleReplicaSetAdd(replicaset) assert.Empty(t, c.ReplicaSets) // test add replicaset - replicaset = &apps_v1.ReplicaSet{} + replicaset = &meta_v1.PartialObjectMetadata{} replicaset.Name = "deployment-aaa" replicaset.Namespace = "namespaceA" replicaset.UID = "aaaaaaaa-bbbb-cccc-dddd-eeeeeeeeeeee" @@ -685,7 +685,7 @@ func TestExtractionRules(t *testing.T) { } isController := true - replicaset := &apps_v1.ReplicaSet{ + replicaset := &meta_v1.PartialObjectMetadata{ ObjectMeta: meta_v1.ObjectMeta{ Name: "auth-service-66f5996c7c", Namespace: "ns1", @@ -1081,8 +1081,7 @@ func TestExtractionRules(t *testing.T) { maps.Copy(podCopy.Annotations, tc.additionalAnnotations) maps.Copy(podCopy.Labels, tc.additionalLabels) transformedPod := removeUnnecessaryPodData(podCopy, c.Rules) - transformedReplicaset := removeUnnecessaryReplicaSetData(replicaset) - c.handleReplicaSetAdd(transformedReplicaset) + c.handleReplicaSetAdd(replicaset) c.handlePodAdd(transformedPod) p, ok := c.GetPod(newPodIdentifier("connection", "", podCopy.Status.PodIP)) require.True(t, ok) @@ -1237,8 +1236,7 @@ func TestReplicaSetExtractionRules(t *testing.T) { // manually call the data removal functions here // normally the informer does this, but fully emulating the informer in this test is annoying transformedPod := removeUnnecessaryPodData(pod, c.Rules) - transformedReplicaset := removeUnnecessaryReplicaSetData(replicaset) - c.handleReplicaSetAdd(transformedReplicaset) + c.handleReplicaSetAdd(replicaset) c.handlePodAdd(transformedPod) p, ok := c.GetPod(newPodIdentifier("connection", "", pod.Status.PodIP)) require.True(t, ok) diff --git a/processor/k8sattributesprocessor/internal/kube/informer.go b/processor/k8sattributesprocessor/internal/kube/informer.go index 825fcc0dbf4a8..265a67119e1cb 100644 --- a/processor/k8sattributesprocessor/internal/kube/informer.go +++ b/processor/k8sattributesprocessor/internal/kube/informer.go @@ -6,6 +6,7 @@ package kube // import "github.com/open-telemetry/opentelemetry-collector-contri import ( "context" + "github.com/open-telemetry/opentelemetry-collector-contrib/internal/k8sconfig" apps_v1 "k8s.io/api/apps/v1" batch_v1 "k8s.io/api/batch/v1" api_v1 "k8s.io/api/core/v1" @@ -13,8 +14,10 @@ import ( "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/watch" "k8s.io/client-go/kubernetes" + "k8s.io/client-go/metadata" "k8s.io/client-go/tools/cache" ) @@ -88,11 +91,11 @@ func newKubeSystemSharedInformer( ) cache.SharedInformer { informer := cache.NewSharedInformer( &cache.ListWatch{ - ListFunc: func(opts metav1.ListOptions) (runtime.Object, error) { + ListWithContextFunc: func(ctx context.Context, opts metav1.ListOptions) (runtime.Object, error) { opts.FieldSelector = fields.OneTermEqualSelector("metadata.name", kubeSystemNamespace).String() return client.CoreV1().Namespaces().List(context.Background(), opts) }, - WatchFunc: func(opts metav1.ListOptions) (watch.Interface, error) { + WatchFuncWithContext: func(ctx context.Context, opts metav1.ListOptions) (watch.Interface, error) { opts.FieldSelector = fields.OneTermEqualSelector("metadata.name", kubeSystemNamespace).String() return client.CoreV1().Namespaces().Watch(context.Background(), opts) }, @@ -129,21 +132,6 @@ func namespaceInformerWatchFunc(client kubernetes.Interface) cache.WatchFuncWith } } -func newReplicaSetSharedInformer( - client kubernetes.Interface, - namespace string, -) cache.SharedInformer { - informer := cache.NewSharedInformer( - &cache.ListWatch{ - ListWithContextFunc: replicasetListFuncWithSelectors(client, namespace), - WatchFuncWithContext: replicasetWatchFuncWithSelectors(client, namespace), - }, - &apps_v1.ReplicaSet{}, - watchSyncPeriod, - ) - return informer -} - func replicasetListFuncWithSelectors(client kubernetes.Interface, namespace string) cache.ListWithContextFunc { return func(ctx context.Context, opts metav1.ListOptions) (runtime.Object, error) { return client.AppsV1().ReplicaSets(namespace).List(ctx, opts) @@ -210,6 +198,77 @@ func statefulsetWatchFuncWithSelectors(client kubernetes.Interface, namespace st } } +// NewReplicaSetMetaInformerProvider returns a provider with the SAME signature +// as your existing helpers, but emits metav1.PartialObjectMetadata. +// It uses ListWithContextFunc and WatchFuncWithContext. +func NewReplicaSetMetaInformerProvider(apiCfg k8sconfig.APIConfig) func(client kubernetes.Interface, namespace string) cache.SharedInformer { + return func(client kubernetes.Interface, namespace string) cache.SharedInformer { + // Build metadata client from APIConfig (mirrors your MakeDynamicClient flow) + if err := apiCfg.Validate(); err != nil { + return cache.NewSharedIndexInformer( + &cache.ListWatch{ + ListWithContextFunc: func(context.Context, metav1.ListOptions) (runtime.Object, error) { return nil, err }, + WatchFuncWithContext: func(context.Context, metav1.ListOptions) (watch.Interface, error) { return nil, err }, + }, + &metav1.PartialObjectMetadata{}, + watchSyncPeriod, + cache.Indexers{}, + ) + } + + restCfg, err := k8sconfig.CreateRestConfig(apiCfg) + if err != nil { + return cache.NewSharedIndexInformer( + &cache.ListWatch{ + ListWithContextFunc: func(context.Context, metav1.ListOptions) (runtime.Object, error) { return nil, err }, + WatchFuncWithContext: func(context.Context, metav1.ListOptions) (watch.Interface, error) { return nil, err }, + }, + &metav1.PartialObjectMetadata{}, + watchSyncPeriod, + cache.Indexers{}, + ) + } + + mc, err := metadata.NewForConfig(restCfg) + if err != nil { + return cache.NewSharedIndexInformer( + &cache.ListWatch{ + ListWithContextFunc: func(ctx context.Context, _ metav1.ListOptions) (runtime.Object, error) { return nil, err }, + WatchFuncWithContext: func(ctx context.Context, _ metav1.ListOptions) (watch.Interface, error) { return nil, err }, + }, + &metav1.PartialObjectMetadata{}, + watchSyncPeriod, + cache.Indexers{}, + ) + } + + gvr := schema.GroupVersionResource{Group: "apps", Version: "v1", Resource: "replicasets"} + + lw := &cache.ListWatch{ + ListWithContextFunc: func(ctx context.Context, opts metav1.ListOptions) (runtime.Object, error) { + // Populate selectors here if you mirror your existing helpers + if namespace == "" { + return mc.Resource(gvr).List(ctx, opts) + } + return mc.Resource(gvr).Namespace(namespace).List(ctx, opts) + }, + WatchFuncWithContext: func(ctx context.Context, opts metav1.ListOptions) (watch.Interface, error) { + if namespace == "" { + return mc.Resource(gvr).Watch(ctx, opts) + } + return mc.Resource(gvr).Namespace(namespace).Watch(ctx, opts) + }, + } + + return cache.NewSharedIndexInformer( + lw, + &metav1.PartialObjectMetadata{}, + watchSyncPeriod, + cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, + ) + } +} + func newDaemonSetSharedInformer( client kubernetes.Interface, namespace string,