Skip to content

Commit b372d5d

Browse files
authored
Fix a bug where detected TCP traffic from pods short time before their deletion time could be labeled as incoming traffic from the internet (#245)
1 parent 153f669 commit b372d5d

File tree

2 files changed

+66
-8
lines changed

2 files changed

+66
-8
lines changed

src/mapper/pkg/kubefinder/kubefinder.go

Lines changed: 29 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package kubefinder
33
import (
44
"context"
55
"fmt"
6+
"github.com/hashicorp/golang-lru/v2/expirable"
67
"github.com/otterize/intents-operator/src/shared/errors"
78
"github.com/otterize/intents-operator/src/shared/serviceidresolver"
89
"github.com/otterize/network-mapper/src/mapper/pkg/config"
@@ -31,26 +32,36 @@ const (
3132
IstioCanonicalNameLabelKey = "service.istio.io/canonical-name"
3233
apiServerName = "kubernetes"
3334
apiServerNamespace = "default"
35+
seenIPsCacheSize = 2000
36+
seenIPsCacheTTL = time.Minute * 10
3437
)
3538

3639
type KubeFinder struct {
3740
mgr manager.Manager
3841
client client.Client
3942
serviceIdResolver *serviceidresolver.Resolver
43+
seenIPsTTLCache *expirable.LRU[string, struct{}]
4044
}
4145

42-
var ErrNoPodFound = errors.NewSentinelError("no pod found")
43-
var ErrFoundMoreThanOnePod = errors.NewSentinelError("ip belongs to more than one pod")
44-
var ErrFoundMoreThanOneService = errors.NewSentinelError("ip belongs to more than one service")
45-
var ErrServiceNotFound = errors.NewSentinelError("service not found")
46+
var (
47+
ErrNoPodFound = errors.NewSentinelError("no pod found")
48+
ErrFoundMoreThanOnePod = errors.NewSentinelError("ip belongs to more than one pod")
49+
ErrFoundMoreThanOneService = errors.NewSentinelError("ip belongs to more than one service")
50+
ErrServiceNotFound = errors.NewSentinelError("service not found")
51+
)
4652

4753
func NewKubeFinder(ctx context.Context, mgr manager.Manager) (*KubeFinder, error) {
48-
indexer := &KubeFinder{client: mgr.GetClient(), mgr: mgr, serviceIdResolver: serviceidresolver.NewResolver(mgr.GetClient())}
49-
err := indexer.initIndexes(ctx)
54+
finder := &KubeFinder{client: mgr.GetClient(), mgr: mgr, serviceIdResolver: serviceidresolver.NewResolver(mgr.GetClient())}
55+
finder.initSeenIPsCache()
56+
err := finder.initIndexes(ctx)
5057
if err != nil {
5158
return nil, errors.Wrap(err)
5259
}
53-
return indexer, nil
60+
return finder, nil
61+
}
62+
63+
func (k *KubeFinder) initSeenIPsCache() {
64+
k.seenIPsTTLCache = expirable.NewLRU[string, struct{}](seenIPsCacheSize, nil, seenIPsCacheTTL)
5465
}
5566

5667
func (k *KubeFinder) initIndexes(ctx context.Context) error {
@@ -80,6 +91,7 @@ func (k *KubeFinder) initIndexes(ctx context.Context) error {
8091
}
8192

8293
for _, ip := range pod.Status.PodIPs {
94+
k.seenIPsTTLCache.Add(ip.IP, struct{}{})
8395
res = append(res, ip.IP)
8496
}
8597
return res
@@ -464,7 +476,12 @@ func (k *KubeFinder) ResolveOtterizeIdentityForService(ctx context.Context, svc
464476
}
465477

466478
func (k *KubeFinder) IsSrcIpClusterInternal(ctx context.Context, ip string) (bool, error) {
467-
// Known issue: this function is currently missing support for services/endpoints, node.PodCIDR, and pods that were deleted.
479+
// Known issue: this function is currently missing support for services/endpoints, node.PodCIDR
480+
481+
wasPodIp := k.WasPodIP(ip)
482+
if wasPodIp {
483+
return true, nil
484+
}
468485

469486
isNode, err := k.IsNodeIP(ctx, ip)
470487
if err != nil {
@@ -502,6 +519,10 @@ func (k *KubeFinder) IsPodIp(ctx context.Context, ip string) (bool, error) {
502519
return len(pods.Items) > 0, nil
503520
}
504521

522+
func (k *KubeFinder) WasPodIP(ip string) bool {
523+
return k.seenIPsTTLCache.Contains(ip)
524+
}
525+
505526
func (k *KubeFinder) IsNodeIP(ctx context.Context, ip string) (bool, error) {
506527
var nodes corev1.NodeList
507528
err := k.client.List(ctx, &nodes, client.MatchingFields{nodeIPIndexField: ip})

src/mapper/pkg/kubefinder/kubefinder_test.go

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,43 @@ func (s *KubeFinderTestSuite) TestResolveServiceAddressToIps() {
7575
s.Require().ElementsMatch(lo.Map(pods, func(p corev1.Pod, _ int) string { return p.Status.PodIP }), lo.Map(pods4444, func(p *corev1.Pod, _ int) string { return p.Status.PodIP }))
7676
}
7777

78+
func (s *KubeFinderTestSuite) TestIsSrcIpClusterInternal() {
79+
pod := s.AddPod("test-pod", "1.1.1.1", nil, nil)
80+
s.Require().True(s.Mgr.GetCache().WaitForCacheSync(context.Background()))
81+
82+
// Check with existing pod's ip
83+
isInternal, err := s.kubeFinder.IsSrcIpClusterInternal(context.Background(), "1.1.1.1")
84+
s.Require().NoError(err)
85+
s.Require().True(isInternal)
86+
87+
// Check with non-existing pod's ip
88+
isInternal, err = s.kubeFinder.IsSrcIpClusterInternal(context.Background(), "8.8.8.8")
89+
s.Require().NoError(err)
90+
s.Require().False(isInternal)
91+
92+
err = s.Mgr.GetClient().Delete(context.Background(), pod)
93+
s.Require().NoError(err)
94+
s.Require().True(s.Mgr.GetCache().WaitForCacheSync(context.Background()))
95+
96+
// Check pod doesn't exist in the manager's cache
97+
pod, err = s.kubeFinder.ResolveIPToPod(context.Background(), "1.1.1.1")
98+
s.Require().Nil(pod)
99+
s.Require().Error(err)
100+
101+
// Check isInternal with the deleted pod's ip
102+
isInternal, err = s.kubeFinder.IsSrcIpClusterInternal(context.Background(), "1.1.1.1")
103+
s.Require().NoError(err)
104+
s.Require().True(isInternal)
105+
106+
// Reset the cache
107+
s.kubeFinder.initSeenIPsCache()
108+
109+
// Check isInternal with the deleted pod's ip after cache reset
110+
isInternal, err = s.kubeFinder.IsSrcIpClusterInternal(context.Background(), "1.1.1.1")
111+
s.Require().NoError(err)
112+
s.Require().False(isInternal)
113+
}
114+
78115
func TestKubeFinderTestSuite(t *testing.T) {
79116
suite.Run(t, new(KubeFinderTestSuite))
80117
}

0 commit comments

Comments
 (0)