Skip to content

Commit 35157de

Browse files
authored
Fix bug where discovered traffic from a pod wasn't filtered properly by its creation timestamp (#243)
1 parent c7bfa5a commit 35157de

File tree

3 files changed

+69
-7
lines changed

3 files changed

+69
-7
lines changed

src/mapper/pkg/resolvers/helpers.go

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ import (
99
corev1 "k8s.io/api/core/v1"
1010
)
1111

12-
func (r *Resolver) discoverInternalSrcIdentity(ctx context.Context, src model.RecordedDestinationsForSrc) (model.OtterizeServiceIdentity, error) {
12+
func (r *Resolver) discoverInternalSrcIdentity(ctx context.Context, src *model.RecordedDestinationsForSrc) (model.OtterizeServiceIdentity, error) {
1313
svc, ok, err := r.kubeFinder.ResolveIPToControlPlane(ctx, src.SrcIP)
1414
if err != nil {
1515
return model.OtterizeServiceIdentity{}, errors.Errorf("could not resolve %s to service: %w", src.SrcIP, err)
@@ -34,6 +34,15 @@ func (r *Resolver) discoverInternalSrcIdentity(ctx context.Context, src model.Re
3434
return model.OtterizeServiceIdentity{}, errors.Errorf("found pod %s (by ip %s) doesn't match captured hostname %s, ignoring", srcPod.Name, src.SrcIP, src.SrcHostname)
3535
}
3636

37+
// This function requires "src" to be a pointer.
38+
// If at some point this function will be called with a non-pointer "src"
39+
// It may cause a bug because the function will not be able to modify the "src" object of the caller.
40+
r.filterTargetsAccordingToPodCreationTime(src, srcPod)
41+
42+
return r.resolveInClusterIdentity(ctx, srcPod)
43+
}
44+
45+
func (r *Resolver) filterTargetsAccordingToPodCreationTime(src *model.RecordedDestinationsForSrc, srcPod *corev1.Pod) {
3746
filteredDestinations := make([]model.Destination, 0)
3847
for _, dest := range src.Destinations {
3948
if srcPod.CreationTimestamp.After(dest.LastSeen) {
@@ -43,8 +52,6 @@ func (r *Resolver) discoverInternalSrcIdentity(ctx context.Context, src model.Re
4352
filteredDestinations = append(filteredDestinations, dest)
4453
}
4554
src.Destinations = filteredDestinations
46-
47-
return r.resolveInClusterIdentity(ctx, srcPod)
4855
}
4956

5057
func (r *Resolver) resolveInClusterIdentity(ctx context.Context, pod *corev1.Pod) (model.OtterizeServiceIdentity, error) {

src/mapper/pkg/resolvers/resolver_test.go

Lines changed: 56 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1019,6 +1019,7 @@ func (s *ResolverTestSuite) TestIntentsToApiServerDNS() {
10191019
Destinations: []test_gql_client.Destination{
10201020
{
10211021
Destination: fmt.Sprintf("%s.%s.svc.cluster.local", service.GetName(), service.GetNamespace()),
1022+
LastSeen: time.Now().Add(time.Minute),
10221023
},
10231024
},
10241025
},
@@ -1070,7 +1071,7 @@ func (s *ResolverTestSuite) TestIntentsToApiServerSocketScan() {
10701071
Destinations: []test_gql_client.Destination{
10711072
{
10721073
Destination: service.Spec.ClusterIP,
1073-
LastSeen: time.Now(),
1074+
LastSeen: time.Now().Add(time.Minute),
10741075
},
10751076
},
10761077
},
@@ -1365,6 +1366,60 @@ func (s *ResolverTestSuite) TestTCPResultsFromExternalToLoadBalancerServiceUsing
13651366
s.Require().Equal("8.8.8.8", intents[0].Intent.IP)
13661367
}
13671368

1369+
func (s *ResolverTestSuite) TestResolveOtterizeIdentityFilterSrcDestinationsByCreationTimestamp() {
1370+
podIP := "1.1.1.3"
1371+
pod := s.AddPod("pod3", podIP, nil, nil)
1372+
s.Require().True(s.Mgr.GetCache().WaitForCacheSync(context.Background()))
1373+
recorededDestinationForSrc := &model.RecordedDestinationsForSrc{
1374+
SrcIP: podIP,
1375+
Destinations: []model.Destination{
1376+
{
1377+
Destination: "target-on-time",
1378+
LastSeen: pod.CreationTimestamp.Add(time.Minute),
1379+
},
1380+
{
1381+
Destination: "target-before-time",
1382+
LastSeen: pod.CreationTimestamp.Add(-time.Minute),
1383+
},
1384+
},
1385+
}
1386+
srcIdentity, err := s.resolver.discoverInternalSrcIdentity(context.Background(), recorededDestinationForSrc)
1387+
s.Require().NoError(err)
1388+
s.Require().Equal("pod3", srcIdentity.Name)
1389+
1390+
s.Require().Len(recorededDestinationForSrc.Destinations, 1)
1391+
s.Require().Equal("target-on-time", recorededDestinationForSrc.Destinations[0].Destination)
1392+
1393+
}
1394+
1395+
func (s *ResolverTestSuite) TestPoop() {
1396+
//serviceIP := "10.0.0.10"
1397+
podIP := "1.1.1.3"
1398+
1399+
pod3 := s.AddPod("pod3", podIP, nil, nil)
1400+
//s.AddService(serviceName, map[string]string{"app": "test"}, serviceIP, []*v1.Pod{pod3})
1401+
s.Require().True(s.Mgr.GetCache().WaitForCacheSync(context.Background()))
1402+
1403+
pod := &v1.Pod{}
1404+
err := s.Mgr.GetClient().Get(context.Background(), types.NamespacedName{Name: pod3.Name, Namespace: pod3.Namespace}, pod)
1405+
s.Require().NoError(err)
1406+
podlist1 := &v1.PodList{}
1407+
err = s.Mgr.GetClient().List(context.Background(), podlist1, client.MatchingFields{"ip": pod.Status.PodIP})
1408+
s.Require().NoError(err)
1409+
s.Require().Len(podlist1.Items, 1)
1410+
1411+
err = s.Mgr.GetClient().Delete(context.Background(), pod)
1412+
s.Require().NoError(err)
1413+
1414+
s.Require().True(s.Mgr.GetCache().WaitForCacheSync(context.Background()))
1415+
1416+
podlist := &v1.PodList{}
1417+
err = s.Mgr.GetClient().List(context.Background(), podlist, client.MatchingFields{"ip": pod.Status.PodIP})
1418+
s.Require().NoError(err)
1419+
s.Require().Empty(podlist.Items)
1420+
1421+
}
1422+
13681423
func TestRunSuite(t *testing.T) {
13691424
suite.Run(t, new(ResolverTestSuite))
13701425
}

src/mapper/pkg/resolvers/schema.helpers.resolvers.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -399,7 +399,7 @@ func (r *Resolver) handleTCPCaptureResult(ctx context.Context, captureItem model
399399
return errors.Wrap(r.reportIncomingInternetTraffic(ctx, captureItem.SrcIP, captureItem.Destinations))
400400
}
401401

402-
srcSvcIdentity, err := r.discoverInternalSrcIdentity(ctx, captureItem)
402+
srcSvcIdentity, err := r.discoverInternalSrcIdentity(ctx, &captureItem)
403403
if err != nil {
404404
logrus.WithError(err).Debugf("could not discover src identity for '%s'", captureItem.SrcIP)
405405
return nil
@@ -462,7 +462,7 @@ func (r *Resolver) handleReportCaptureResults(ctx context.Context, results model
462462

463463
var newResults int
464464
for _, captureItem := range results.Results {
465-
srcSvcIdentity, err := r.discoverInternalSrcIdentity(ctx, captureItem)
465+
srcSvcIdentity, err := r.discoverInternalSrcIdentity(ctx, &captureItem)
466466
if err != nil {
467467
logrus.WithError(err).Debugf("could not discover src identity for '%s'", captureItem.SrcIP)
468468
continue
@@ -499,7 +499,7 @@ func (r *Resolver) handleReportSocketScanResults(ctx context.Context, results mo
499499
return nil
500500
}
501501
for _, socketScanItem := range results.Results {
502-
srcSvcIdentity, err := r.discoverInternalSrcIdentity(ctx, socketScanItem)
502+
srcSvcIdentity, err := r.discoverInternalSrcIdentity(ctx, &socketScanItem)
503503
if err != nil {
504504
logrus.WithError(err).Debugf("could not discover src identity for '%s'", socketScanItem.SrcIP)
505505
continue

0 commit comments

Comments
 (0)