Skip to content

Commit c7bfa5a

Browse files
authored
FIx bug where hostNetwork pods could be falsely detected as incoming traffic from outside of the cluster; Support resolution of LoadBalancer typed services using their node ports (#240)
1 parent 2a98e7d commit c7bfa5a

File tree

4 files changed

+261
-39
lines changed

4 files changed

+261
-39
lines changed

src/mapper/pkg/kubefinder/kubefinder.go

Lines changed: 84 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -21,15 +21,16 @@ import (
2121
)
2222

2323
const (
24-
podIPIndexField = "ip"
25-
endpointIPPortIndexField = "ipPort"
26-
serviceIPIndexField = "spec.ip"
27-
externalIPIndexField = "spec.externalIPs"
28-
portNumberIndexField = "service.spec.ports.nodePort"
29-
nodeIPIndexField = "node.status.Addresses.ExternalIP"
30-
IstioCanonicalNameLabelKey = "service.istio.io/canonical-name"
31-
apiServerName = "kubernetes"
32-
apiServerNamespace = "default"
24+
podIPIndexField = "ip"
25+
podIPIncludingHostNetworkIndexField = "ipAndHostNetwork"
26+
endpointIPPortIndexField = "ipPort"
27+
serviceIPIndexField = "spec.ip"
28+
externalIPIndexField = "spec.externalIPs"
29+
nodePortNumberIndexField = "service.spec.ports.nodePort"
30+
nodeIPIndexField = "node.status.Addresses.ExternalIP"
31+
IstioCanonicalNameLabelKey = "service.istio.io/canonical-name"
32+
apiServerName = "kubernetes"
33+
apiServerNamespace = "default"
3334
)
3435

3536
type KubeFinder struct {
@@ -70,6 +71,23 @@ func (k *KubeFinder) initIndexes(ctx context.Context) error {
7071
return errors.Wrap(err)
7172
}
7273

74+
err = k.mgr.GetCache().IndexField(ctx, &corev1.Pod{}, podIPIncludingHostNetworkIndexField, func(object client.Object) []string {
75+
res := make([]string, 0)
76+
pod := object.(*corev1.Pod)
77+
78+
if pod.DeletionTimestamp != nil {
79+
return res
80+
}
81+
82+
for _, ip := range pod.Status.PodIPs {
83+
res = append(res, ip.IP)
84+
}
85+
return res
86+
})
87+
if err != nil {
88+
return errors.Wrap(err)
89+
}
90+
7391
err = k.mgr.GetCache().IndexField(ctx, &corev1.Service{}, serviceIPIndexField, func(object client.Object) []string {
7492
res := make([]string, 0)
7593
svc := object.(*corev1.Service)
@@ -99,13 +117,15 @@ func (k *KubeFinder) initIndexes(ctx context.Context) error {
99117
return errors.Wrap(err)
100118
}
101119

102-
err = k.mgr.GetCache().IndexField(ctx, &corev1.Service{}, portNumberIndexField, func(object client.Object) []string {
120+
err = k.mgr.GetCache().IndexField(ctx, &corev1.Service{}, nodePortNumberIndexField, func(object client.Object) []string {
121+
// node ports are unique per service - so it can be used for indexing services
103122
ports := sets.New[string]()
104123
svc := object.(*corev1.Service)
105124
if svc.DeletionTimestamp != nil {
106125
return nil
107126
}
108-
if svc.Spec.Type != corev1.ServiceTypeNodePort {
127+
// Only node port and load balancer typed services use node ports
128+
if svc.Spec.Type != corev1.ServiceTypeNodePort && svc.Spec.Type != corev1.ServiceTypeLoadBalancer {
109129
return nil
110130
}
111131

@@ -252,22 +272,22 @@ func (k *KubeFinder) ResolveIPToControlPlane(ctx context.Context, ip string) (*c
252272
}
253273

254274
func (k *KubeFinder) ResolveIPToExternalAccessService(ctx context.Context, ip string, port int) (*corev1.Service, bool, error) {
255-
nodePortService, ok, err := k.resolveNodePortService(ctx, ip, port)
275+
nodePortService, ok, err := k.resolveServiceByNodeIPAndPort(ctx, ip, port)
256276
if err != nil {
257277
return nil, false, errors.Wrap(err)
258278
}
259279
if ok {
260280
return nodePortService, true, nil
261281
}
262282

263-
loadBalancerService, ok, err := k.resolveLoadBalancerService(ctx, ip, port)
283+
loadBalancerService, ok, err := k.resolveLoadBalancerServiceByExternalIP(ctx, ip, port)
264284
if err != nil {
265285
return nil, false, errors.Wrap(err)
266286
}
267287
return loadBalancerService, ok, nil
268288
}
269289

270-
func (k *KubeFinder) resolveLoadBalancerService(ctx context.Context, ip string, port int) (*corev1.Service, bool, error) {
290+
func (k *KubeFinder) resolveLoadBalancerServiceByExternalIP(ctx context.Context, ip string, port int) (*corev1.Service, bool, error) {
271291
var services corev1.ServiceList
272292
err := k.client.List(ctx, &services, client.MatchingFields{externalIPIndexField: ip})
273293
if err != nil {
@@ -288,7 +308,7 @@ func (k *KubeFinder) resolveLoadBalancerService(ctx context.Context, ip string,
288308
return &service, true, nil
289309
}
290310

291-
func (k *KubeFinder) resolveNodePortService(ctx context.Context, ip string, port int) (*corev1.Service, bool, error) {
311+
func (k *KubeFinder) resolveServiceByNodeIPAndPort(ctx context.Context, ip string, port int) (*corev1.Service, bool, error) {
292312
var nodes corev1.NodeList
293313
err := k.client.List(ctx, &nodes, client.MatchingFields{nodeIPIndexField: ip})
294314
if err != nil {
@@ -304,7 +324,7 @@ func (k *KubeFinder) resolveNodePortService(ctx context.Context, ip string, port
304324

305325
portString := fmt.Sprintf("%d", port)
306326
var services corev1.ServiceList
307-
err = k.client.List(ctx, &services, client.MatchingFields{portNumberIndexField: portString})
327+
err = k.client.List(ctx, &services, client.MatchingFields{nodePortNumberIndexField: portString})
308328
if err != nil {
309329
return nil, false, errors.Wrap(err)
310330
}
@@ -442,3 +462,51 @@ func (k *KubeFinder) ResolveOtterizeIdentityForService(ctx context.Context, svc
442462
dstSvcIdentity.KubernetesService = lo.ToPtr(svc.Name)
443463
return dstSvcIdentity, true, nil
444464
}
465+
466+
func (k *KubeFinder) IsSrcIpClusterInternal(ctx context.Context, ip string) (bool, error) {
467+
// Known issue: this function is currently missing support for services/endpoints, node.PodCIDR, and pods that were deleted.
468+
469+
isNode, err := k.IsNodeIP(ctx, ip)
470+
if err != nil {
471+
return false, errors.Wrap(err)
472+
}
473+
if isNode {
474+
return true, nil
475+
}
476+
477+
isPod, err := k.IsPodIp(ctx, ip)
478+
if err != nil {
479+
return false, errors.Wrap(err)
480+
}
481+
if isPod {
482+
return true, nil
483+
}
484+
485+
_, isControlPlane, err := k.ResolveIPToControlPlane(ctx, ip)
486+
if err != nil {
487+
return false, errors.Wrap(err)
488+
}
489+
if isControlPlane {
490+
return true, nil
491+
}
492+
493+
return false, nil
494+
}
495+
496+
func (k *KubeFinder) IsPodIp(ctx context.Context, ip string) (bool, error) {
497+
var pods corev1.PodList
498+
err := k.client.List(ctx, &pods, client.MatchingFields{podIPIncludingHostNetworkIndexField: ip})
499+
if err != nil {
500+
return false, errors.Wrap(err)
501+
}
502+
return len(pods.Items) > 0, nil
503+
}
504+
505+
func (k *KubeFinder) IsNodeIP(ctx context.Context, ip string) (bool, error) {
506+
var nodes corev1.NodeList
507+
err := k.client.List(ctx, &nodes, client.MatchingFields{nodeIPIndexField: ip})
508+
if err != nil {
509+
return false, errors.Wrap(err)
510+
}
511+
return len(nodes.Items) > 0, nil
512+
}

src/mapper/pkg/resolvers/resolver_test.go

Lines changed: 128 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1224,7 +1224,7 @@ func (s *ResolverTestSuite) TestResolveOtterizeIdentityIgnoreHostNetworkPods() {
12241224
podIP := "1.1.1.3"
12251225

12261226
pod3 := s.AddPodWithHostNetwork("pod3", podIP, map[string]string{"app": "test"}, nil, true)
1227-
s.AddService(serviceName, map[string]string{"app": "test"}, serviceIP, []*v1.Pod{pod3})
1227+
s.AddClusterIPService(serviceName, map[string]string{"app": "test"}, serviceIP, []*v1.Pod{pod3})
12281228
s.Require().True(s.Mgr.GetCache().WaitForCacheSync(context.Background()))
12291229

12301230
service := &v1.Service{}
@@ -1238,6 +1238,133 @@ func (s *ResolverTestSuite) TestResolveOtterizeIdentityIgnoreHostNetworkPods() {
12381238

12391239
}
12401240

1241+
func (s *ResolverTestSuite) TestTCPResultsFromHostNetworkPodsIgnored() {
1242+
// Add external service
1243+
internalServiceIp := "10.0.0.16"
1244+
externalServiceIP := "34.10.0.12"
1245+
servicePort := 9090
1246+
s.AddDeploymentWithIngressService("service1", []string{"1.1.1.1"}, map[string]string{"app": "service1"}, internalServiceIp, externalServiceIP, servicePort)
1247+
1248+
// Add host network pod
1249+
hostNetworkServiceName := "test-service"
1250+
hostNetworkServiceIP := "10.0.0.10"
1251+
hostNetworkPodIP := "1.1.1.3"
1252+
pod := s.AddPodWithHostNetwork("pod", hostNetworkPodIP, map[string]string{"app": "test"}, nil, true)
1253+
s.AddClusterIPService(hostNetworkServiceName, map[string]string{"app": "test"}, hostNetworkServiceIP, []*v1.Pod{pod})
1254+
1255+
s.Require().True(s.Mgr.GetCache().WaitForCacheSync(context.Background()))
1256+
1257+
// Report TCP results of traffic from host network pod to external service
1258+
packetTime := time.Now().Add(time.Minute)
1259+
_, err := test_gql_client.ReportTCPCaptureResults(context.Background(), s.client, test_gql_client.CaptureTCPResults{
1260+
Results: []test_gql_client.RecordedDestinationsForSrc{
1261+
{
1262+
SrcIp: hostNetworkPodIP,
1263+
Destinations: []test_gql_client.Destination{
1264+
{
1265+
Destination: externalServiceIP,
1266+
DestinationIP: nilable.From(externalServiceIP),
1267+
DestinationPort: nilable.From(servicePort),
1268+
LastSeen: packetTime,
1269+
},
1270+
},
1271+
},
1272+
},
1273+
})
1274+
s.Require().NoError(err)
1275+
1276+
s.waitForCaptureResultsProcessed(10 * time.Second)
1277+
1278+
// Verify that the traffic from host network pod to external service is ignored
1279+
res, err := test_gql_client.ServiceIntents(context.Background(), s.client, nil)
1280+
s.Require().NoError(err)
1281+
s.Require().ElementsMatch(res.ServiceIntents, []test_gql_client.ServiceIntentsServiceIntents{})
1282+
1283+
intents := s.resolver.incomingTrafficHolder.GetNewIntentsSinceLastGet()
1284+
s.Require().Empty(intents)
1285+
}
1286+
1287+
func (s *ResolverTestSuite) TestTCPResultsFromExternalToPodSavedAsIncoming() {
1288+
// Create deployment
1289+
deploymentName := "coolz"
1290+
podIP := "1.1.1.3"
1291+
dep, _ := s.AddDeployment(deploymentName, []string{podIP}, map[string]string{"app": "coolz"})
1292+
s.Require().True(s.Mgr.GetCache().WaitForCacheSync(context.Background()))
1293+
1294+
// Report TCP results of traffic from external ip to pod
1295+
packetTime := time.Now().Add(time.Minute)
1296+
_, err := test_gql_client.ReportTCPCaptureResults(context.Background(), s.client, test_gql_client.CaptureTCPResults{
1297+
Results: []test_gql_client.RecordedDestinationsForSrc{
1298+
{
1299+
SrcIp: "8.8.8.8",
1300+
Destinations: []test_gql_client.Destination{
1301+
{
1302+
Destination: podIP,
1303+
DestinationIP: nilable.From(podIP),
1304+
DestinationPort: nilable.From(80),
1305+
LastSeen: packetTime,
1306+
},
1307+
},
1308+
},
1309+
},
1310+
})
1311+
s.Require().NoError(err)
1312+
1313+
s.waitForCaptureResultsProcessed(10 * time.Second)
1314+
1315+
// Verify that the traffic from external ip to pod is saved as incoming
1316+
intents := s.resolver.incomingTrafficHolder.GetNewIntentsSinceLastGet()
1317+
s.Require().Len(intents, 1)
1318+
s.Require().Equal(dep.Name, intents[0].Intent.Server.Name)
1319+
s.Require().Equal(dep.Namespace, intents[0].Intent.Server.Namespace)
1320+
s.Require().Equal("8.8.8.8", intents[0].Intent.IP)
1321+
}
1322+
1323+
func (s *ResolverTestSuite) TestTCPResultsFromExternalToLoadBalancerServiceUsingNodeIpAndPortSavedAsIncoming() {
1324+
// Create deployment
1325+
deploymentName := "coolz"
1326+
podIP := "1.1.1.3"
1327+
serviceIP := "10.0.0.10"
1328+
dep, pods := s.AddDeployment(deploymentName, []string{podIP}, map[string]string{"app": "coolz"})
1329+
s.Require().True(s.Mgr.GetCache().WaitForCacheSync(context.Background()))
1330+
svc := s.AddLoadBalancerService("coolz", map[string]string{"app": "coolz"}, serviceIP, pods)
1331+
s.Require().True(s.Mgr.GetCache().WaitForCacheSync(context.Background()))
1332+
nodePort := svc.Spec.Ports[0].NodePort
1333+
1334+
nodes := v1.NodeList{}
1335+
err := s.Mgr.GetClient().List(context.Background(), &nodes)
1336+
s.Require().NoError(err)
1337+
s.Require().NotEmpty(nodes.Items)
1338+
nodeIP := nodes.Items[0].Status.Addresses[0].Address
1339+
1340+
// Report TCP results of traffic from external ip to nodeIp:nodePort
1341+
packetTime := time.Now().Add(time.Minute)
1342+
_, err = test_gql_client.ReportTCPCaptureResults(context.Background(), s.client, test_gql_client.CaptureTCPResults{
1343+
Results: []test_gql_client.RecordedDestinationsForSrc{
1344+
{
1345+
SrcIp: "8.8.8.8",
1346+
Destinations: []test_gql_client.Destination{
1347+
{
1348+
Destination: nodeIP,
1349+
DestinationIP: nilable.From(nodeIP),
1350+
DestinationPort: nilable.From(int(nodePort)),
1351+
LastSeen: packetTime,
1352+
},
1353+
},
1354+
},
1355+
},
1356+
})
1357+
s.Require().NoError(err)
1358+
s.waitForCaptureResultsProcessed(10 * time.Second)
1359+
1360+
// Verify that the traffic from external ip to nodeIp:nodePort is saved as incoming
1361+
intents := s.resolver.incomingTrafficHolder.GetNewIntentsSinceLastGet()
1362+
s.Require().Len(intents, 1)
1363+
s.Require().Equal(dep.Name, intents[0].Intent.Server.Name)
1364+
s.Require().Equal(dep.Namespace, intents[0].Intent.Server.Namespace)
1365+
s.Require().Equal("8.8.8.8", intents[0].Intent.IP)
1366+
}
1367+
12411368
func TestRunSuite(t *testing.T) {
12421369
suite.Run(t, new(ResolverTestSuite))
12431370
}

src/mapper/pkg/resolvers/schema.helpers.resolvers.go

Lines changed: 27 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -376,31 +376,40 @@ func (r *Resolver) handleReportTCPCaptureResults(ctx context.Context, results mo
376376
}
377377

378378
for _, captureItem := range results.Results {
379-
logrus.Debugf("Handling TCP capture result from %s to %s:%d", captureItem.SrcIP, captureItem.Destinations[0].Destination, lo.FromPtr(captureItem.Destinations[0].DestinationPort))
380-
381-
srcSvcIdentity, err := r.discoverInternalSrcIdentity(ctx, captureItem)
382-
if errors.Is(err, kubefinder.ErrNoPodFound) {
383-
err := r.reportIncomingInternetTraffic(ctx, captureItem.SrcIP, captureItem.Destinations)
384-
if err != nil {
385-
logrus.WithError(err).Error("could not report incoming internet traffic")
386-
continue
387-
}
388-
}
389-
379+
err := r.handleTCPCaptureResult(ctx, captureItem)
390380
if err != nil {
391-
logrus.WithError(err).Debugf("could not discover src identity for '%s'", captureItem.SrcIP)
392-
continue
393-
}
394-
395-
for _, dest := range captureItem.Destinations {
396-
r.handleExternalIncomingTrafficTCPResult(ctx, srcSvcIdentity, dest)
381+
logrus.WithError(err).
382+
WithField("srcIp", captureItem.SrcIP).
383+
WithField("srcHostname", captureItem.SrcHostname).
384+
Error("could not handle TCP capture result")
397385
}
398386
}
399387
telemetrysender.SendNetworkMapper(telemetriesgql.EventTypeIntentsDiscoveredCapture, len(results.Results))
400388
r.gotResultsSignal()
401389
return nil
402390
}
403391

392+
func (r *Resolver) handleTCPCaptureResult(ctx context.Context, captureItem model.RecordedDestinationsForSrc) error {
393+
logrus.Debugf("Handling TCP capture result from %s to %s:%d", captureItem.SrcIP, captureItem.Destinations[0].Destination, lo.FromPtr(captureItem.Destinations[0].DestinationPort))
394+
isSrcInCluster, err := r.kubeFinder.IsSrcIpClusterInternal(ctx, captureItem.SrcIP)
395+
if err != nil {
396+
return errors.Wrap(err)
397+
}
398+
if !isSrcInCluster {
399+
return errors.Wrap(r.reportIncomingInternetTraffic(ctx, captureItem.SrcIP, captureItem.Destinations))
400+
}
401+
402+
srcSvcIdentity, err := r.discoverInternalSrcIdentity(ctx, captureItem)
403+
if err != nil {
404+
logrus.WithError(err).Debugf("could not discover src identity for '%s'", captureItem.SrcIP)
405+
return nil
406+
}
407+
for _, dest := range captureItem.Destinations {
408+
r.handleInternalTrafficTCPResult(ctx, srcSvcIdentity, dest)
409+
}
410+
return nil
411+
}
412+
404413
func (r *Resolver) reportIncomingInternetTraffic(ctx context.Context, srcIP string, destinations []model.Destination) error {
405414
for _, dest := range destinations {
406415
destSvcIdentity, ok, err := r.resolveOtterizeIdentityForExternalAccessDestination(ctx, dest)
@@ -423,7 +432,7 @@ func (r *Resolver) reportIncomingInternetTraffic(ctx context.Context, srcIP stri
423432
return nil
424433
}
425434

426-
func (r *Resolver) handleExternalIncomingTrafficTCPResult(ctx context.Context, srcIdentity model.OtterizeServiceIdentity, dest model.Destination) {
435+
func (r *Resolver) handleInternalTrafficTCPResult(ctx context.Context, srcIdentity model.OtterizeServiceIdentity, dest model.Destination) {
427436
lastSeen := dest.LastSeen
428437
destIdentity, ok, err := r.resolveDestIdentity(ctx, dest, lastSeen)
429438
if err != nil {

0 commit comments

Comments
 (0)