Skip to content

Commit 7c792c6

Browse files
authored
Merge pull request #91 from eranra/add_k8s_owner
add k8s owner
2 parents db0d557 + 91fcce6 commit 7c792c6

File tree

5 files changed

+185
-43
lines changed

5 files changed

+185
-43
lines changed

README.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -307,7 +307,8 @@ The fifth rule `add_kubernetes` generates new fields with kubernetes information
307307
matching the `input` value (`srcIP` in the example above) with k8s `nodes`, `pods` and `services` IPs.
308308
All the kubernetes fields will be named by appending `output` value
309309
(`srcK8S` in the example above) to the kubernetes metadata field names
310-
(e.g., `Type`, `Name` and `Namespace`)
310+
(e.g., `Namespace`, `Name`, `Type`, `OwnerName`, `OwnerType` )
311+
311312
> Note: kubernetes connection is done using the first available method:
312313
> 1. configuration parameter `KubeConfigPath` (in the example above `/tmp/config`) or
313314
> 2. using `KUBECONFIG` environment variable

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ require (
2323
gopkg.in/yaml.v2 v2.4.0
2424
honnef.co/go/netdb v0.0.0-20210921115105-e902e863d85d
2525
k8s.io/api v0.23.2
26+
k8s.io/apimachinery v0.23.2
2627
k8s.io/client-go v0.23.2
2728
)
2829

@@ -75,7 +76,6 @@ require (
7576
gopkg.in/inf.v0 v0.9.1 // indirect
7677
gopkg.in/ini.v1 v1.66.2 // indirect
7778
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b // indirect
78-
k8s.io/apimachinery v0.23.2 // indirect
7979
k8s.io/klog/v2 v2.30.0 // indirect
8080
k8s.io/kube-openapi v0.0.0-20211115234752-e816edb12b65 // indirect
8181
k8s.io/utils v0.0.0-20210930125809-cb0fa318a74b // indirect

pkg/pipeline/transform/kubernetes/kubernetes.go

Lines changed: 89 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,9 @@ package kubernetes
2020
import (
2121
"fmt"
2222
log "github.com/sirupsen/logrus"
23+
appsv1 "k8s.io/api/apps/v1"
2324
v1 "k8s.io/api/core/v1"
25+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
2426
"k8s.io/client-go/informers"
2527
"k8s.io/client-go/kubernetes"
2628
"k8s.io/client-go/rest"
@@ -37,60 +39,104 @@ const (
3739
kubeConfigEnvVariable = "KUBECONFIG"
3840
syncTime = 10 * time.Minute
3941
IndexIP = "byIP"
40-
typeNode = "node"
41-
typePod = "pod"
42-
typeService = "service"
42+
typeNode = "Node"
43+
typePod = "Pod"
44+
typeService = "Service"
4345
)
4446

4547
type KubeData struct {
46-
informers map[string]cache.SharedIndexInformer
47-
stopChan chan struct{}
48+
ipInformers map[string]cache.SharedIndexInformer
49+
replicaSetInformer cache.SharedIndexInformer
50+
stopChan chan struct{}
51+
}
52+
53+
type Owner struct {
54+
Type string
55+
Name string
4856
}
4957

5058
type Info struct {
51-
Type string
52-
Name string
53-
Namespace string
54-
Labels map[string]string
59+
Type string
60+
Name string
61+
Namespace string
62+
Labels map[string]string
63+
OwnerReferences []metav1.OwnerReference
64+
Owner Owner
5565
}
5666

57-
func (k KubeData) GetInfo(ip string) (*Info, error) {
58-
for objType, informer := range k.informers {
67+
func (k *KubeData) GetInfo(ip string) (*Info, error) {
68+
for objType, informer := range k.ipInformers {
5969
objs, err := informer.GetIndexer().ByIndex(IndexIP, ip)
6070
if err == nil && len(objs) > 0 {
71+
var info *Info
6172
switch objType {
6273
case typePod:
6374
pod := objs[0].(*v1.Pod)
64-
return &Info{
65-
Type: typePod,
66-
Name: pod.Name,
67-
Namespace: pod.Namespace,
68-
Labels: pod.Labels,
69-
}, nil
75+
info = &Info{
76+
Type: typePod,
77+
Name: pod.Name,
78+
Namespace: pod.Namespace,
79+
Labels: pod.Labels,
80+
OwnerReferences: pod.OwnerReferences,
81+
}
7082
case typeNode:
7183
node := objs[0].(*v1.Node)
72-
return &Info{
84+
info = &Info{
7385
Type: typeNode,
7486
Name: node.Name,
7587
Namespace: node.Namespace,
7688
Labels: node.Labels,
77-
}, nil
89+
}
7890
case typeService:
7991
service := objs[0].(*v1.Service)
80-
return &Info{
92+
info = &Info{
8193
Type: typeService,
8294
Name: service.Name,
8395
Namespace: service.Namespace,
8496
Labels: service.Labels,
85-
}, nil
97+
}
8698
}
99+
100+
info.Owner = k.getOwner(info)
101+
return info, nil
87102
}
88103
}
89104

90105
return nil, fmt.Errorf("can't find ip")
91106
}
92107

93-
func (k KubeData) NewNodeInformer(informerFactory informers.SharedInformerFactory) error {
108+
func (k *KubeData) getOwner(info *Info) Owner {
109+
if info.OwnerReferences != nil && len(info.OwnerReferences) > 0 {
110+
ownerReference := info.OwnerReferences[0]
111+
if ownerReference.Kind == "ReplicaSet" {
112+
item, ok, err := k.replicaSetInformer.GetIndexer().GetByKey(info.Namespace + "/" + ownerReference.Name)
113+
if err != nil {
114+
panic(err)
115+
}
116+
if ok {
117+
replicaSet := item.(*appsv1.ReplicaSet)
118+
if len(replicaSet.OwnerReferences) > 0 {
119+
return Owner{
120+
Name: replicaSet.OwnerReferences[0].Name,
121+
Type: replicaSet.OwnerReferences[0].Kind,
122+
}
123+
}
124+
}
125+
} else {
126+
return Owner{
127+
Name: ownerReference.Name,
128+
Type: ownerReference.Kind,
129+
}
130+
}
131+
}
132+
133+
return Owner{
134+
Name: info.Name,
135+
Type: info.Type,
136+
}
137+
}
138+
139+
func (k *KubeData) NewNodeInformer(informerFactory informers.SharedInformerFactory) error {
94140
nodes := informerFactory.Core().V1().Nodes().Informer()
95141
err := nodes.AddIndexers(map[string]cache.IndexFunc{
96142
IndexIP: func(obj interface{}) ([]string, error) {
@@ -106,11 +152,11 @@ func (k KubeData) NewNodeInformer(informerFactory informers.SharedInformerFactor
106152
},
107153
})
108154

109-
k.informers[typeNode] = nodes
155+
k.ipInformers[typeNode] = nodes
110156
return err
111157
}
112158

113-
func (k KubeData) NewPodInformer(informerFactory informers.SharedInformerFactory) error {
159+
func (k *KubeData) NewPodInformer(informerFactory informers.SharedInformerFactory) error {
114160
pods := informerFactory.Core().V1().Pods().Informer()
115161
err := pods.AddIndexers(map[string]cache.IndexFunc{
116162
IndexIP: func(obj interface{}) ([]string, error) {
@@ -126,11 +172,11 @@ func (k KubeData) NewPodInformer(informerFactory informers.SharedInformerFactory
126172
},
127173
})
128174

129-
k.informers[typePod] = pods
175+
k.ipInformers[typePod] = pods
130176
return err
131177
}
132178

133-
func (k KubeData) NewServiceInformer(informerFactory informers.SharedInformerFactory) error {
179+
func (k *KubeData) NewServiceInformer(informerFactory informers.SharedInformerFactory) error {
134180
services := informerFactory.Core().V1().Services().Informer()
135181
err := services.AddIndexers(map[string]cache.IndexFunc{
136182
IndexIP: func(obj interface{}) ([]string, error) {
@@ -143,14 +189,23 @@ func (k KubeData) NewServiceInformer(informerFactory informers.SharedInformerFac
143189
},
144190
})
145191

146-
k.informers[typeService] = services
192+
k.ipInformers[typeService] = services
147193
return err
148194
}
149195

150-
func (k KubeData) InitFromConfig(kubeConfigPath string) error {
196+
func (k *KubeData) NewReplicaSetInformer(informerFactory informers.SharedInformerFactory) error {
197+
k.replicaSetInformer = informerFactory.Apps().V1().ReplicaSets().Informer()
198+
return nil
199+
}
200+
201+
func (k *KubeData) InitFromConfig(kubeConfigPath string) error {
151202
var config *rest.Config
152203
var err error
153204

205+
// Initialization variables
206+
k.stopChan = make(chan struct{})
207+
k.ipInformers = map[string]cache.SharedIndexInformer{}
208+
154209
if kubeConfigPath != "" {
155210
config, err = clientcmd.BuildConfigFromFlags("", kubeConfigPath)
156211
if err != nil {
@@ -189,9 +244,7 @@ func (k KubeData) InitFromConfig(kubeConfigPath string) error {
189244
return nil
190245
}
191246

192-
func (k KubeData) initInformers(client kubernetes.Interface) error {
193-
//defer close(stopChan)
194-
247+
func (k *KubeData) initInformers(client kubernetes.Interface) error {
195248
informerFactory := informers.NewSharedInformerFactory(client, syncTime)
196249
err := k.NewNodeInformer(informerFactory)
197250
if err != nil {
@@ -205,18 +258,15 @@ func (k KubeData) initInformers(client kubernetes.Interface) error {
205258
if err != nil {
206259
return err
207260
}
261+
err = k.NewReplicaSetInformer(informerFactory)
262+
if err != nil {
263+
return err
264+
}
208265

209-
log.Debugf("starting kubernetes informer, waiting for syncronization")
266+
log.Debugf("starting kubernetes informers, waiting for syncronization")
210267
informerFactory.Start(k.stopChan)
211268
informerFactory.WaitForCacheSync(k.stopChan)
212269
log.Debugf("kubernetes informers started")
213270

214271
return nil
215272
}
216-
217-
func init() {
218-
Data = KubeData{
219-
informers: map[string]cache.SharedIndexInformer{},
220-
stopChan: make(chan struct{}),
221-
}
222-
}
Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
/*
2+
* Copyright (C) 2022 IBM, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*
16+
*/
17+
18+
package kubernetes
19+
20+
import (
21+
"github.com/stretchr/testify/mock"
22+
"github.com/stretchr/testify/require"
23+
v1 "k8s.io/api/core/v1"
24+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
25+
"k8s.io/client-go/tools/cache"
26+
"testing"
27+
)
28+
29+
type IndexerMock struct {
30+
mock.Mock
31+
cache.Indexer
32+
}
33+
34+
type InformerMock struct {
35+
mock.Mock
36+
InformerInterface
37+
}
38+
39+
type InformerInterface interface {
40+
cache.SharedInformer
41+
AddIndexers(indexers cache.Indexers) error
42+
GetIndexer() cache.Indexer
43+
}
44+
45+
func (indexMock *IndexerMock) ByIndex(indexName, indexedValue string) ([]interface{}, error) {
46+
pod := fakePod("podName", "podNamespace", "podHost")
47+
podInterface := interface{}(pod)
48+
return []interface{}{podInterface}, nil
49+
}
50+
51+
func fakePod(name, ns, host string) *v1.Pod {
52+
return &v1.Pod{
53+
ObjectMeta: metav1.ObjectMeta{
54+
Name: name,
55+
Namespace: ns,
56+
},
57+
Status: v1.PodStatus{
58+
HostIP: host,
59+
},
60+
}
61+
}
62+
63+
func (informerMock *InformerMock) GetIndexer() cache.Indexer {
64+
informerMock.Mock.Called()
65+
return &IndexerMock{}
66+
}
67+
68+
func TestKubeData_getInfo(t *testing.T) {
69+
// Test with no informer
70+
kubeData := KubeData{}
71+
info, err := kubeData.GetInfo("1.2.3.4")
72+
require.EqualError(t, err, "can't find ip")
73+
require.Nil(t, info)
74+
75+
// Test with mock pod informer
76+
expectedInfo := &Info{
77+
Type: "Pod",
78+
Name: "podName",
79+
Namespace: "podNamespace",
80+
Owner: Owner{Name: "podName", Type: "Pod"},
81+
}
82+
kubeData = KubeData{ipInformers: map[string]cache.SharedIndexInformer{}}
83+
informerMock := &InformerMock{}
84+
informerMock.On("GetIndexer", mock.Anything, mock.Anything, mock.Anything).Return(nil)
85+
kubeData.ipInformers["Pod"] = informerMock
86+
info, err = kubeData.GetInfo("1.2.3.4")
87+
require.NoError(t, err)
88+
require.Equal(t, info, expectedInfo)
89+
}

pkg/pipeline/transform/transform_network.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -130,9 +130,11 @@ func (n *Network) Transform(inputEntry config.GenericMap) config.GenericMap {
130130
log.Infof("Can't find kubernetes info for IP %v err %v", outputEntries[rule.Input], err)
131131
continue
132132
}
133-
outputEntries[rule.Output+"_Type"] = kubeInfo.Type
134-
outputEntries[rule.Output+"_Name"] = kubeInfo.Name
135133
outputEntries[rule.Output+"_Namespace"] = kubeInfo.Namespace
134+
outputEntries[rule.Output+"_Name"] = kubeInfo.Name
135+
outputEntries[rule.Output+"_Type"] = kubeInfo.Type
136+
outputEntries[rule.Output+"_OwnerName"] = kubeInfo.Owner.Name
137+
outputEntries[rule.Output+"_OwnerType"] = kubeInfo.Owner.Type
136138
for labelKey, labelValue := range kubeInfo.Labels {
137139
outputEntries[rule.Output+"_Labels_"+labelKey] = labelValue
138140
}

0 commit comments

Comments
 (0)