Skip to content

Commit 19f7e33

Browse files
Add netflow classification for internet, private, and cloud destinations (#621)
1 parent 1c52b79 commit 19f7e33

File tree

8 files changed

+643
-27
lines changed

8 files changed

+643
-27
lines changed

cmd/agent/daemon/pipeline/netflow_pipeline.go

Lines changed: 49 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import (
1313
kubepb "github.com/castai/kvisor/api/v1/kube"
1414
castaipb "github.com/castai/kvisor/api/v1/runtime"
1515
"github.com/castai/kvisor/cmd/agent/daemon/metrics"
16+
"github.com/castai/kvisor/pkg/cloudprovider"
1617
"github.com/castai/kvisor/pkg/containers"
1718
"github.com/castai/kvisor/pkg/ebpftracer"
1819
"github.com/castai/kvisor/pkg/ebpftracer/types"
@@ -29,6 +30,7 @@ type clusterInfo struct {
2930
serviceCidr []netip.Prefix
3031
nodeCidr []netip.Prefix
3132
vpcCidr []netip.Prefix
33+
cloudCidr []netip.Prefix
3234
clusterCidr []netip.Prefix
3335
}
3436

@@ -84,6 +86,7 @@ func (c *Controller) getClusterInfo(ctx context.Context) (*clusterInfo, error) {
8486
if err != nil {
8587
return nil, fmt.Errorf("parsing other cidr: %w", err)
8688
}
89+
res.cloudCidr = append(res.cloudCidr, subnet)
8790
res.clusterCidr = append(res.clusterCidr, subnet)
8891
}
8992
return &res, nil
@@ -306,10 +309,17 @@ func (c *Controller) enrichKubeDestinations(ctx context.Context, groups map[uint
306309
flowDest.Region = info.Region
307310
flowDest.NodeName = info.NodeName
308311

309-
// set cloud domain as dns question when it's empty
310-
// i.e. googleapis.com or amazonaws.com
311-
if flowDest.DnsQuestion == "" && info.CloudDomain != "" {
312-
flowDest.DnsQuestion = info.CloudDomain
312+
// CloudDomain will be non empty if this is flow within cloud traffic
313+
if info.CloudDomain != "" {
314+
// set cloud domain as dns question when it's empty
315+
// i.e. googleapis.com or amazonaws.com
316+
if flowDest.DnsQuestion == "" {
317+
flowDest.DnsQuestion = info.CloudDomain
318+
}
319+
320+
// Set cloud as kind and type as workload name for cloud IPs
321+
flowDest.WorkloadName = cloudprovider.DomainToProviderType(info.CloudDomain)
322+
flowDest.WorkloadKind = "cloud"
313323
}
314324
}
315325
}
@@ -320,9 +330,13 @@ func (c *Controller) enrichKubeDestinations(ctx context.Context, groups map[uint
320330
func (c *Controller) addNetflowDestination(netflow *netflowVal, dest *castaipb.NetflowDestination, destAddr netip.Addr) {
321331
isPublicDest := !iputil.IsPrivateNetwork(destAddr)
322332
netflow.updatedAt = time.Now()
333+
var isCloudDest bool
334+
if c.clusterInfo != nil {
335+
isCloudDest = c.clusterInfo.cloudCidrContains(destAddr)
336+
}
323337
// To reduce cardinality we merge destinations to 0.0.0.0 range if
324338
// it's a public ip and doesn't have dns domain.
325-
maybeMerge := isNetflowDestCandidateForMerge(dest, isPublicDest, c.cfg.Netflow.MaxPublicIPs)
339+
maybeMerge := isNetflowDestCandidateForMerge(dest, isPublicDest, isCloudDest, c.cfg.Netflow.MaxPublicIPs)
326340
if maybeMerge && netflow.mergeThreshold >= int(c.cfg.Netflow.MaxPublicIPs) {
327341
if netflow.mergedDestIndex == 0 {
328342
netflow.pb.Destinations = append(netflow.pb.Destinations, &castaipb.NetflowDestination{
@@ -357,11 +371,21 @@ func (c *Controller) addNetflowDestination(netflow *netflowVal, dest *castaipb.N
357371
}
358372
}
359373

360-
func isNetflowDestCandidateForMerge(dest *castaipb.NetflowDestination, isPublic bool, maxPublicIPs int16) bool {
374+
func isNetflowDestCandidateForMerge(
375+
dest *castaipb.NetflowDestination,
376+
isPublic bool,
377+
isCloud bool,
378+
maxPublicIPs int16,
379+
) bool {
361380
// No merge for private destinations.
362381
if !isPublic {
363382
return false
364383
}
384+
385+
// No merge for cloud destinations.
386+
if isCloud {
387+
return false
388+
}
365389
// Not merge if there is destination dns context.
366390
if dest.DnsQuestion != "" {
367391
return false
@@ -452,6 +476,11 @@ func (c *Controller) toNetflowDestination(key ebpftracer.TrafficKey, summary ebp
452476
}
453477
}
454478

479+
flowKind := "private"
480+
if iputil.IsPublicNetwork(remote.Addr()) {
481+
flowKind = "internet"
482+
}
483+
455484
destination := &castaipb.NetflowDestination{
456485
DnsQuestion: dns,
457486
Addr: remote.Addr().AsSlice(),
@@ -460,6 +489,11 @@ func (c *Controller) toNetflowDestination(key ebpftracer.TrafficKey, summary ebp
460489
RxBytes: summary.RxBytes,
461490
TxPackets: summary.TxPackets,
462491
RxPackets: summary.RxPackets,
492+
493+
// Mark workload kind as private or internet,
494+
// but it later could be overriden by IP info from kube client
495+
// within `enrichKubeDestinations` method
496+
WorkloadKind: flowKind,
463497
}
464498

465499
return destination, remote.Addr(), nil
@@ -507,6 +541,15 @@ func (c *clusterInfo) clusterCidrContains(ip netip.Addr) bool {
507541
return false
508542
}
509543

544+
func (c *clusterInfo) cloudCidrContains(ip netip.Addr) bool {
545+
for _, cidr := range c.cloudCidr {
546+
if cidr.Contains(ip) {
547+
return true
548+
}
549+
}
550+
return false
551+
}
552+
510553
func toProtoProtocol(proto uint8) castaipb.NetflowProtocol {
511554
switch proto {
512555
case unix.IPPROTO_TCP:

0 commit comments

Comments
 (0)