Skip to content

Commit a8117b0

Browse files
committed
[poc][processor/k8sattributeprocessor]: modify test to use different addresses
Signed-off-by: RealAnna <[email protected]>
1 parent 6ea6290 commit a8117b0

File tree

4 files changed

+99
-48
lines changed

4 files changed

+99
-48
lines changed

internal/k8sconfig/config.go

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import (
2121
"k8s.io/apimachinery/pkg/watch"
2222
"k8s.io/client-go/dynamic"
2323
k8s "k8s.io/client-go/kubernetes"
24+
"k8s.io/client-go/metadata"
2425
"k8s.io/client-go/rest"
2526
"k8s.io/client-go/tools/cache"
2627
"k8s.io/client-go/tools/clientcmd"
@@ -168,6 +169,25 @@ func MakeDynamicClient(apiConf APIConfig) (dynamic.Interface, error) {
168169
return client, nil
169170
}
170171

172+
// MakeMetadataClient constructs a client-go metadata.Interface using the same APIConfig flow.
173+
// It mirrors MakeDynamicClient, but uses metadata.NewForConfig.
174+
func MakeMetadataClient(apiConf APIConfig) (metadata.Interface, error) {
175+
if err := apiConf.Validate(); err != nil {
176+
return nil, err
177+
}
178+
179+
authConf, err := CreateRestConfig(apiConf)
180+
if err != nil {
181+
return nil, err
182+
}
183+
184+
mc, err := metadata.NewForConfig(authConf)
185+
if err != nil {
186+
return nil, err
187+
}
188+
return mc, nil
189+
}
190+
171191
// MakeOpenShiftQuotaClient can take configuration if needed for other types of auth
172192
// and return an OpenShift quota API client
173193
func MakeOpenShiftQuotaClient(apiConf APIConfig) (quotaclientset.Interface, error) {

processor/k8sattributesprocessor/internal/kube/client.go

Lines changed: 1 addition & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -249,22 +249,9 @@ func New(
249249

250250
if rules.DeploymentName || rules.DeploymentUID {
251251
if informersFactory.newReplicaSetInformer == nil {
252-
informersFactory.newReplicaSetInformer = newReplicaSetSharedInformer
252+
informersFactory.newReplicaSetInformer = NewReplicaSetMetaInformerProvider(apiCfg)
253253
}
254254
c.replicasetInformer = informersFactory.newReplicaSetInformer(c.kc, c.Filters.Namespace)
255-
err = c.replicasetInformer.SetTransform(
256-
func(object any) (any, error) {
257-
originalReplicaset, success := object.(*apps_v1.ReplicaSet)
258-
if !success { // means this is a cache.DeletedFinalStateUnknown, in which case we do nothing
259-
return object, nil
260-
}
261-
262-
return removeUnnecessaryReplicaSetData(originalReplicaset), nil
263-
},
264-
)
265-
if err != nil {
266-
return nil, err
267-
}
268255
}
269256

270257
if c.extractNodeLabelsAnnotations() || c.extractNodeUID() {
@@ -1813,19 +1800,6 @@ func (c *WatchClient) addOrUpdateReplicaSet(replicaset *apps_v1.ReplicaSet) {
18131800
c.m.Unlock()
18141801
}
18151802

1816-
// This function removes all data from the ReplicaSet except what is required by extraction rules
1817-
func removeUnnecessaryReplicaSetData(replicaset *apps_v1.ReplicaSet) *apps_v1.ReplicaSet {
1818-
transformedReplicaset := apps_v1.ReplicaSet{
1819-
ObjectMeta: meta_v1.ObjectMeta{
1820-
Name: replicaset.GetName(),
1821-
Namespace: replicaset.GetNamespace(),
1822-
UID: replicaset.GetUID(),
1823-
},
1824-
}
1825-
transformedReplicaset.SetOwnerReferences(replicaset.GetOwnerReferences())
1826-
return &transformedReplicaset
1827-
}
1828-
18291803
// runInformerWithDependencies starts the given informer. The second argument is a list of other informers that should complete
18301804
// before the informer is started. This is necessary e.g. for the pod informer which requires the replica set informer
18311805
// to be finished to correctly establish the connection to the replicaset/deployment it belongs to.

processor/k8sattributesprocessor/internal/kube/client_test.go

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1081,8 +1081,7 @@ func TestExtractionRules(t *testing.T) {
10811081
maps.Copy(podCopy.Annotations, tc.additionalAnnotations)
10821082
maps.Copy(podCopy.Labels, tc.additionalLabels)
10831083
transformedPod := removeUnnecessaryPodData(podCopy, c.Rules)
1084-
transformedReplicaset := removeUnnecessaryReplicaSetData(replicaset)
1085-
c.handleReplicaSetAdd(transformedReplicaset)
1084+
c.handleReplicaSetAdd(replicaset)
10861085
c.handlePodAdd(transformedPod)
10871086
p, ok := c.GetPod(newPodIdentifier("connection", "", podCopy.Status.PodIP))
10881087
require.True(t, ok)
@@ -1237,8 +1236,7 @@ func TestReplicaSetExtractionRules(t *testing.T) {
12371236
// manually call the data removal functions here
12381237
// normally the informer does this, but fully emulating the informer in this test is annoying
12391238
transformedPod := removeUnnecessaryPodData(pod, c.Rules)
1240-
transformedReplicaset := removeUnnecessaryReplicaSetData(replicaset)
1241-
c.handleReplicaSetAdd(transformedReplicaset)
1239+
c.handleReplicaSetAdd(replicaset)
12421240
c.handlePodAdd(transformedPod)
12431241
p, ok := c.GetPod(newPodIdentifier("connection", "", pod.Status.PodIP))
12441242
require.True(t, ok)

processor/k8sattributesprocessor/internal/kube/informer.go

Lines changed: 76 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -6,15 +6,18 @@ package kube // import "github.com/open-telemetry/opentelemetry-collector-contri
66
import (
77
"context"
88

9+
"github.com/open-telemetry/opentelemetry-collector-contrib/internal/k8sconfig"
910
apps_v1 "k8s.io/api/apps/v1"
1011
batch_v1 "k8s.io/api/batch/v1"
1112
api_v1 "k8s.io/api/core/v1"
1213
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
1314
"k8s.io/apimachinery/pkg/fields"
1415
"k8s.io/apimachinery/pkg/labels"
1516
"k8s.io/apimachinery/pkg/runtime"
17+
"k8s.io/apimachinery/pkg/runtime/schema"
1618
"k8s.io/apimachinery/pkg/watch"
1719
"k8s.io/client-go/kubernetes"
20+
"k8s.io/client-go/metadata"
1821
"k8s.io/client-go/tools/cache"
1922
)
2023

@@ -88,11 +91,11 @@ func newKubeSystemSharedInformer(
8891
) cache.SharedInformer {
8992
informer := cache.NewSharedInformer(
9093
&cache.ListWatch{
91-
ListFunc: func(opts metav1.ListOptions) (runtime.Object, error) {
94+
ListWithContextFunc: func(ctx context.Context, opts metav1.ListOptions) (runtime.Object, error) {
9295
opts.FieldSelector = fields.OneTermEqualSelector("metadata.name", kubeSystemNamespace).String()
9396
return client.CoreV1().Namespaces().List(context.Background(), opts)
9497
},
95-
WatchFunc: func(opts metav1.ListOptions) (watch.Interface, error) {
98+
WatchFuncWithContext: func(ctx context.Context, opts metav1.ListOptions) (watch.Interface, error) {
9699
opts.FieldSelector = fields.OneTermEqualSelector("metadata.name", kubeSystemNamespace).String()
97100
return client.CoreV1().Namespaces().Watch(context.Background(), opts)
98101
},
@@ -129,21 +132,6 @@ func namespaceInformerWatchFunc(client kubernetes.Interface) cache.WatchFuncWith
129132
}
130133
}
131134

132-
func newReplicaSetSharedInformer(
133-
client kubernetes.Interface,
134-
namespace string,
135-
) cache.SharedInformer {
136-
informer := cache.NewSharedInformer(
137-
&cache.ListWatch{
138-
ListWithContextFunc: replicasetListFuncWithSelectors(client, namespace),
139-
WatchFuncWithContext: replicasetWatchFuncWithSelectors(client, namespace),
140-
},
141-
&apps_v1.ReplicaSet{},
142-
watchSyncPeriod,
143-
)
144-
return informer
145-
}
146-
147135
func replicasetListFuncWithSelectors(client kubernetes.Interface, namespace string) cache.ListWithContextFunc {
148136
return func(ctx context.Context, opts metav1.ListOptions) (runtime.Object, error) {
149137
return client.AppsV1().ReplicaSets(namespace).List(ctx, opts)
@@ -210,6 +198,77 @@ func statefulsetWatchFuncWithSelectors(client kubernetes.Interface, namespace st
210198
}
211199
}
212200

201+
// NewReplicaSetMetaInformerProvider returns a provider with the SAME signature
202+
// as your existing helpers, but emits metav1.PartialObjectMetadata.
203+
// It uses ListWithContextFunc and WatchFuncWithContext.
204+
func NewReplicaSetMetaInformerProvider(apiCfg k8sconfig.APIConfig) func(client kubernetes.Interface, namespace string) cache.SharedInformer {
205+
return func(client kubernetes.Interface, namespace string) cache.SharedInformer {
206+
// Build metadata client from APIConfig (mirrors your MakeDynamicClient flow)
207+
if err := apiCfg.Validate(); err != nil {
208+
return cache.NewSharedIndexInformer(
209+
&cache.ListWatch{
210+
ListWithContextFunc: func(ctx context.Context, _ metav1.ListOptions) (runtime.Object, error) { return nil, err },
211+
WatchFuncWithContext: func(ctx context.Context, _ metav1.ListOptions) (watch.Interface, error) { return nil, err },
212+
},
213+
&metav1.PartialObjectMetadata{},
214+
watchSyncPeriod,
215+
cache.Indexers{},
216+
)
217+
}
218+
219+
restCfg, err := k8sconfig.CreateRestConfig(apiCfg)
220+
if err != nil {
221+
return cache.NewSharedIndexInformer(
222+
&cache.ListWatch{
223+
ListWithContextFunc: func(ctx context.Context, _ metav1.ListOptions) (runtime.Object, error) { return nil, err },
224+
WatchFuncWithContext: func(ctx context.Context, _ metav1.ListOptions) (watch.Interface, error) { return nil, err },
225+
},
226+
&metav1.PartialObjectMetadata{},
227+
watchSyncPeriod,
228+
cache.Indexers{},
229+
)
230+
}
231+
232+
mc, err := metadata.NewForConfig(restCfg)
233+
if err != nil {
234+
return cache.NewSharedIndexInformer(
235+
&cache.ListWatch{
236+
ListWithContextFunc: func(ctx context.Context, _ metav1.ListOptions) (runtime.Object, error) { return nil, err },
237+
WatchFuncWithContext: func(ctx context.Context, _ metav1.ListOptions) (watch.Interface, error) { return nil, err },
238+
},
239+
&metav1.PartialObjectMetadata{},
240+
watchSyncPeriod,
241+
cache.Indexers{},
242+
)
243+
}
244+
245+
gvr := schema.GroupVersionResource{Group: "apps", Version: "v1", Resource: "replicasets"}
246+
247+
lw := &cache.ListWatch{
248+
ListWithContextFunc: func(ctx context.Context, opts metav1.ListOptions) (runtime.Object, error) {
249+
// Populate selectors here if you mirror your existing helpers
250+
if namespace == "" {
251+
return mc.Resource(gvr).List(ctx, opts)
252+
}
253+
return mc.Resource(gvr).Namespace(namespace).List(ctx, opts)
254+
},
255+
WatchFuncWithContext: func(ctx context.Context, opts metav1.ListOptions) (watch.Interface, error) {
256+
if namespace == "" {
257+
return mc.Resource(gvr).Watch(ctx, opts)
258+
}
259+
return mc.Resource(gvr).Namespace(namespace).Watch(ctx, opts)
260+
},
261+
}
262+
263+
return cache.NewSharedIndexInformer(
264+
lw,
265+
&metav1.PartialObjectMetadata{},
266+
watchSyncPeriod,
267+
cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
268+
)
269+
}
270+
}
271+
213272
func newDaemonSetSharedInformer(
214273
client kubernetes.Interface,
215274
namespace string,

0 commit comments

Comments
 (0)