Skip to content

Commit 9eed37b

Browse files
authored
Merge pull request #57 from harisudarsan1/informers
feat: Add network metadata to telemetry events using K8s informers
2 parents 011fc76 + ae17a0a commit 9eed37b

File tree

2 files changed

+150
-0
lines changed

2 files changed

+150
-0
lines changed

deployments/relay-deployment.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ rules:
2525
- ""
2626
resources:
2727
- pods
28+
- services
2829
verbs:
2930
- get
3031
- list

relay-server/server/relayServer.go

Lines changed: 149 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010
"fmt"
1111
"math/rand"
1212
"net"
13+
"strings"
1314
"sync"
1415
"time"
1516

@@ -27,6 +28,10 @@ import (
2728
"github.com/kubearmor/kubearmor-relay-server/relay-server/elasticsearch"
2829
kg "github.com/kubearmor/kubearmor-relay-server/relay-server/log"
2930
"github.com/kubearmor/kubearmor-relay-server/relay-server/opensearch"
31+
32+
corev1 "k8s.io/api/core/v1"
33+
"k8s.io/client-go/informers"
34+
"k8s.io/client-go/tools/cache"
3035
)
3136

3237
// ============ //
@@ -93,6 +98,9 @@ type LogService struct {
9398
//
9499
}
95100

101+
// IP to K8sResource Map
102+
var Ipcache sync.Map
103+
96104
// HealthCheck Function
97105
func (ls *LogService) HealthCheck(ctx context.Context, nonce *pb.NonceMessage) (*pb.ReplyMessage, error) {
98106
replyMessage := pb.ReplyMessage{Retval: nonce.Nonce}
@@ -536,6 +544,7 @@ func (lc *LogClient) WatchLogs(wg *sync.WaitGroup, stop chan struct{}, errCh cha
536544
errCh <- fmt.Errorf("failed to receive a log (%s) %s", lc.Server, err.Error())
537545
return
538546
}
547+
addRemoteHostInfo(res)
539548

540549
select {
541550
case LogBufferChannel <- res:
@@ -781,6 +790,9 @@ func connectToKubeArmor(nodeID, port string) error {
781790
go client.WatchLogs(&wg, stop, errCh)
782791
kg.Print("Started to watch logs from " + server)
783792

793+
//start IP informers
794+
kg.Print("Started to watch k8s IP's for resources")
795+
784796
// Wait for an error or all goroutines to finish
785797
select {
786798
case err := <-errCh:
@@ -825,6 +837,8 @@ func (rs *RelayServer) GetFeedsFromNodes() {
825837
defer cancel()
826838
rs.WgServer.Add(1)
827839
go K8s.WatchKubeArmorPods(ctx, &rs.WgServer, ipsChan)
840+
rs.WgServer.Add(1)
841+
go startIPInformers(ctx, &rs.WgServer)
828842
} else {
829843
close(ipsChan)
830844
}
@@ -844,3 +858,138 @@ func (rs *RelayServer) GetFeedsFromNodes() {
844858
}
845859
}
846860
}
861+
862+
func startIPInformers(ctx context.Context, wg *sync.WaitGroup) {
863+
kg.Printf("Started IP informers")
864+
defer wg.Done()
865+
866+
// Check if context is already canceled
867+
if ctx.Err() != nil {
868+
kg.Errf("Context canceled before starting informers")
869+
return
870+
}
871+
872+
factory := informers.NewSharedInformerFactory(K8s.K8sClient, 30*time.Second)
873+
podInformer := factory.Core().V1().Pods().Informer()
874+
svcInformer := factory.Core().V1().Services().Informer()
875+
876+
updateIP := func(kind, namespace, name, ip string) {
877+
878+
if ip == "" {
879+
return
880+
}
881+
resource := ""
882+
switch strings.ToUpper(kind) {
883+
884+
case "POD":
885+
resource = fmt.Sprintf("pod/%s/%s", namespace, name)
886+
case "SERVICE":
887+
resource = fmt.Sprintf("svc/%s/%s", namespace, name)
888+
}
889+
890+
Ipcache.Store(ip, resource)
891+
}
892+
893+
// 4. Attach event handlers
894+
_, err := podInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
895+
AddFunc: func(obj interface{}) {
896+
pod, ok := obj.(*corev1.Pod)
897+
if !ok {
898+
return
899+
}
900+
updateIP("POD", pod.Namespace, pod.Name, pod.Status.PodIP)
901+
},
902+
UpdateFunc: func(oldObj, newObj interface{}) {
903+
pod, ok := newObj.(*corev1.Pod)
904+
if !ok {
905+
return
906+
}
907+
908+
updateIP("POD", pod.Namespace, pod.Name, pod.Status.PodIP)
909+
},
910+
DeleteFunc: func(obj interface{}) {
911+
pod, ok := obj.(*corev1.Pod)
912+
if !ok {
913+
return
914+
}
915+
Ipcache.Delete(pod.Status.PodIP)
916+
917+
},
918+
})
919+
920+
if err != nil {
921+
kg.Errf("Failed to add pod event handler: %v", err)
922+
}
923+
924+
// Service handlers
925+
_, err = svcInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
926+
AddFunc: func(obj interface{}) {
927+
svc, ok := obj.(*corev1.Service)
928+
if !ok {
929+
return
930+
}
931+
932+
updateIP("SERVICE", svc.Namespace, svc.Name, svc.Spec.ClusterIP)
933+
},
934+
UpdateFunc: func(oldObj, newObj interface{}) {
935+
svc, ok := newObj.(*corev1.Service)
936+
if !ok {
937+
return
938+
}
939+
940+
updateIP("SERVICE", svc.Namespace, svc.Name, svc.Spec.ClusterIP)
941+
},
942+
DeleteFunc: func(obj interface{}) {
943+
svc, ok := obj.(*corev1.Service)
944+
if !ok {
945+
return
946+
}
947+
Ipcache.Delete(svc.Spec.ClusterIP)
948+
},
949+
})
950+
951+
if err != nil {
952+
kg.Errf("Failed to add service event handler: %v", err)
953+
}
954+
955+
factory.Start(ctx.Done())
956+
if !cache.WaitForCacheSync(ctx.Done(), podInformer.HasSynced, svcInformer.HasSynced) {
957+
kg.Errf("Timed out waiting for cache sync")
958+
return
959+
}
960+
961+
<-ctx.Done()
962+
kg.Printf("Shutting down informers…")
963+
}
964+
965+
func extractIPfromLog(resource string) string {
966+
for _, field := range strings.Fields(resource) {
967+
if strings.Contains(field, "remoteip=") {
968+
parts := strings.SplitN(field, "=", 2)
969+
if len(parts) == 2 && net.ParseIP(parts[1]) != nil {
970+
kg.Debugf("remote ip: %s", parts[1])
971+
return parts[1]
972+
}
973+
}
974+
}
975+
return ""
976+
}
977+
978+
func addRemoteHostInfo(log *pb.Log) {
979+
if (log != nil) && strings.Contains(log.Data, "tcp_") {
980+
ip := extractIPfromLog(log.Resource)
981+
rawIface, found := Ipcache.Load(ip)
982+
if !found {
983+
kg.Printf("Host not found for this IP")
984+
return
985+
}
986+
987+
k8sRes, ok := rawIface.(string)
988+
if !ok {
989+
return
990+
}
991+
992+
log.Resource = fmt.Sprintf("%s remotehost=%s", log.Resource, k8sRes)
993+
kg.Printf("%s", log.Resource)
994+
}
995+
}

0 commit comments

Comments
 (0)