Skip to content

Commit aed57ba

Browse files
Improve netflow zone/region discovery (#614)
1 parent 28148bd commit aed57ba

File tree

16 files changed

+1318
-318
lines changed

16 files changed

+1318
-318
lines changed

api/v1/kube/kube_api.pb.go

Lines changed: 25 additions & 6 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

api/v1/kube/kube_api.proto

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,9 @@ message GetClusterInfoRequest {}
1818
message GetClusterInfoResponse {
1919
repeated string pods_cidr = 1;
2020
repeated string service_cidr = 2;
21+
repeated string node_cidr = 4;
22+
repeated string vpc_cidr = 5;
23+
// other_cidr is a list of known cloud services CIDRs
2124
repeated string other_cidr = 3;
2225
}
2326

cmd/agent/daemon/pipeline/netflow_pipeline.go

Lines changed: 25 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,8 @@ import (
2727
type clusterInfo struct {
2828
podCidr []netip.Prefix
2929
serviceCidr []netip.Prefix
30-
otherCidr []netip.Prefix
30+
nodeCidr []netip.Prefix
31+
vpcCidr []netip.Prefix
3132
clusterCidr []netip.Prefix
3233
}
3334

@@ -62,12 +63,27 @@ func (c *Controller) getClusterInfo(ctx context.Context) (*clusterInfo, error) {
6263
res.serviceCidr = append(res.serviceCidr, subnet)
6364
res.clusterCidr = append(res.clusterCidr, subnet)
6465
}
66+
for _, cidr := range resp.NodeCidr {
67+
subnet, err := netip.ParsePrefix(cidr)
68+
if err != nil {
69+
return nil, fmt.Errorf("parsing node cidr: %w", err)
70+
}
71+
res.nodeCidr = append(res.nodeCidr, subnet)
72+
res.clusterCidr = append(res.clusterCidr, subnet)
73+
}
74+
for _, cidr := range resp.VpcCidr {
75+
subnet, err := netip.ParsePrefix(cidr)
76+
if err != nil {
77+
return nil, fmt.Errorf("parsing vpc cidr: %w", err)
78+
}
79+
res.vpcCidr = append(res.vpcCidr, subnet)
80+
res.clusterCidr = append(res.clusterCidr, subnet)
81+
}
6582
for _, cidr := range resp.OtherCidr {
6683
subnet, err := netip.ParsePrefix(cidr)
6784
if err != nil {
6885
return nil, fmt.Errorf("parsing other cidr: %w", err)
6986
}
70-
res.otherCidr = append(res.otherCidr, subnet)
7187
res.clusterCidr = append(res.clusterCidr, subnet)
7288
}
7389
return &res, nil
@@ -78,6 +94,9 @@ func (c *Controller) runNetflowPipeline(ctx context.Context) error {
7894
c.log.Info("running netflow pipeline")
7995
defer c.log.Info("netflow pipeline done")
8096

97+
// FIXME: we fetch cluster networks ranges only once when agent pod starts
98+
// which could lead to a problem that some CIDRs are not yet indexed in controller pod
99+
// and some flows can be missed.
81100
if c.cfg.Netflow.CheckClusterNetworkRanges {
82101
clusterInfoCtx, clusterInfoCancel := context.WithTimeout(ctx, time.Second*60)
83102
defer clusterInfoCancel()
@@ -87,7 +106,10 @@ func (c *Controller) runNetflowPipeline(ctx context.Context) error {
87106
}
88107
c.clusterInfo = clusterInfo
89108
if clusterInfo != nil {
90-
c.log.Infof("fetched cluster info, pod_cidr=%s, service_cidr=%s", clusterInfo.podCidr, clusterInfo.serviceCidr)
109+
c.log.Infof(
110+
"fetched cluster info, pod_cidr=%s, service_cidr=%s, node_cidr=%s, vpc_cidr=%s",
111+
clusterInfo.podCidr, clusterInfo.serviceCidr, clusterInfo.nodeCidr, clusterInfo.vpcCidr,
112+
)
91113
}
92114
}
93115

cmd/controller/controllers/vpc_metadata_controller.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ type VPCMetadataConfig struct {
1515
Type string `json:"type"`
1616
NetworkName string `json:"networkName"`
1717
RefreshInterval time.Duration `json:"refreshInterval"`
18+
CacheSize uint32 `json:"CacheSize"`
1819
CredentialsFile string `json:"credentialsFile"`
1920
GCPProjectID string `json:"gcpProjectID"`
2021
AWSAccountID string `json:"awsAccountID"`
@@ -59,7 +60,7 @@ func (c *VPCMetadataController) Run(ctx context.Context) error {
5960

6061
c.log.Infof("cloud provider %s initialized successfully", provider.Type())
6162

62-
vpcIndex := kube.NewVPCIndex(c.log, c.cfg.RefreshInterval)
63+
vpcIndex := kube.NewVPCIndex(c.log, c.cfg.RefreshInterval, c.cfg.CacheSize)
6364

6465
if err := c.fetchInitialMetadata(ctx, provider, vpcIndex); err != nil {
6566
c.log.Errorf("failed to fetch initial VPC metadata: %v", err)

cmd/controller/kube/client.go

Lines changed: 81 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -89,21 +89,20 @@ func NewClient(
8989
kvisorControllerContainerName: "controller",
9090
client: client,
9191
index: NewIndex(),
92-
// TODO: set default
93-
vpcIndex: nil,
94-
version: version,
95-
ipInfoTTL: 30 * time.Second,
92+
vpcIndex: NewVPCIndex(log, 0, 0),
93+
version: version,
94+
ipInfoTTL: 30 * time.Second,
9695
}
9796
}
9897

9998
// SetVPCIndex sets the VPC index for enriching external IPs with VPC metadata.
100-
func (i *Client) SetVPCIndex(vpcIndex *VPCIndex) {
101-
i.vpcIndex = vpcIndex
99+
func (c *Client) SetVPCIndex(vpcIndex *VPCIndex) {
100+
c.vpcIndex = vpcIndex
102101
}
103102

104103
// GetVPCIndex returns the VPC index if available.
105-
func (i *Client) GetVPCIndex() *VPCIndex {
106-
return i.vpcIndex
104+
func (c *Client) GetVPCIndex() *VPCIndex {
105+
return c.vpcIndex
107106
}
108107

109108
func (c *Client) RegisterHandlers(factory informers.SharedInformerFactory) {
@@ -173,6 +172,11 @@ func (c *Client) runCleanup() {
173172
if deleted > 0 {
174173
c.log.Debugf("ips index cleanup done, removed %d ips", deleted)
175174
}
175+
176+
deletedCidrs := c.index.nodesCIDRIndex.Cleanup(c.ipInfoTTL)
177+
if deletedCidrs > 0 {
178+
c.log.Debugf("nodes cidr index cleanup done, removed %d cidrs", deletedCidrs)
179+
}
176180
}
177181

178182
func (c *Client) eventHandler() cache.ResourceEventHandler {
@@ -259,22 +263,30 @@ func (c *Client) GetIPInfo(ip netip.Addr) (IPInfo, bool) {
259263
c.mu.RLock()
260264
defer c.mu.RUnlock()
261265

262-
val, found := c.index.ipsDetails.find(ip)
263-
return val, found
264-
}
265-
266-
func (c *Client) GetIPsInfo(ips []netip.Addr) []IPInfo {
267-
c.mu.RLock()
268-
defer c.mu.RUnlock()
266+
// step 1: check IPs from kube client first
267+
// all known pods/services/endpoints/nodes
268+
val, kubeIPFound := c.index.ipsDetails.find(ip)
269+
if kubeIPFound && val.zone != "" {
270+
return val, true
271+
}
272+
// step 2: check IPs from nodes pod CIDRs
273+
// in case when IP is not found within known k8s resources
274+
// i.e. CNI bridge gateway IP
275+
cidrInfo, nodeCIDRFound := c.index.nodesCIDRIndex.Lookup(ip)
276+
if nodeCIDRFound && cidrInfo.Metadata != "" {
277+
val.Node = c.index.nodesByName[cidrInfo.Metadata]
278+
return val, true
279+
}
269280

270-
var res []IPInfo
271-
for _, ip := range ips {
272-
val, found := c.index.ipsDetails.find(ip)
273-
if found {
274-
res = append(res, val)
275-
}
281+
// step 3: check IPs from VPC index
282+
if vpcInfo, vpcCIDRFound := c.vpcIndex.LookupIP(ip); vpcCIDRFound {
283+
val.zone = vpcInfo.Zone
284+
val.region = vpcInfo.Region
285+
val.cloudDomain = vpcInfo.CloudDomain
286+
return val, true
276287
}
277-
return res
288+
289+
return val, kubeIPFound || nodeCIDRFound
278290
}
279291

280292
func (c *Client) GetPodInfo(uid string) (*PodInfo, bool) {
@@ -317,6 +329,7 @@ func (c *Client) GetOwnerUID(obj Object) string {
317329
type ClusterInfo struct {
318330
PodCidr []string
319331
ServiceCidr []string
332+
NodeCidr []string
320333
}
321334

322335
func (c *Client) GetClusterInfo(ctx context.Context) (*ClusterInfo, error) {
@@ -327,11 +340,27 @@ func (c *Client) GetClusterInfo(ctx context.Context) (*ClusterInfo, error) {
327340
}
328341

329342
var res ClusterInfo
343+
// Find nodes cidr.
344+
// This is neeeded when `netflow-check-cluster-network-ranges` flag enabled
345+
// to allow to enrich destination flows with node cidr
346+
nodeCidrMap := make(map[string]struct{})
347+
for _, node := range c.index.nodesByName {
348+
nodeCidr, err := getNodeCidrFromNodeSpec(node)
349+
if err != nil {
350+
continue
351+
}
352+
_, found := nodeCidrMap[nodeCidr]
353+
if nodeCidr != "" && !found {
354+
res.NodeCidr = append(res.NodeCidr, nodeCidr)
355+
nodeCidrMap[nodeCidr] = struct{}{}
356+
}
357+
}
358+
330359
// Try to find pods cidr from nodes.
331360
for _, node := range c.index.nodesByName {
332361
podCidr, err := getPodCidrFromNodeSpec(node)
333362
if err != nil {
334-
return nil, err
363+
continue
335364
}
336365
if len(podCidr) > 0 {
337366
res.PodCidr = podCidr
@@ -373,6 +402,22 @@ func (c *Client) GetClusterInfo(ctx context.Context) (*ClusterInfo, error) {
373402
return &res, nil
374403
}
375404

405+
func getPodCidrsFromNodeSpec(node *corev1.Node) ([]netip.Prefix, error) {
406+
nodeCidrs := node.Spec.PodCIDRs
407+
if len(nodeCidrs) == 0 && node.Spec.PodCIDR != "" {
408+
nodeCidrs = []string{node.Spec.PodCIDR}
409+
}
410+
var podCidrs []netip.Prefix
411+
for _, cidr := range nodeCidrs {
412+
subnet, err := netip.ParsePrefix(cidr)
413+
if err != nil {
414+
return nil, fmt.Errorf("parsing pod cidr: %w", err)
415+
}
416+
podCidrs = append(podCidrs, subnet)
417+
}
418+
return podCidrs, nil
419+
}
420+
376421
func getPodCidrFromNodeSpec(node *corev1.Node) ([]string, error) {
377422
nodeCidrs := node.Spec.PodCIDRs
378423
if len(nodeCidrs) == 0 && node.Spec.PodCIDR != "" {
@@ -407,6 +452,19 @@ func getPodCidrFromPod(pod *corev1.Pod) ([]string, error) {
407452
return podCidr, nil
408453
}
409454

455+
func getNodeCidrFromNodeSpec(node *corev1.Node) (string, error) {
456+
for _, address := range node.Status.Addresses {
457+
if address.Type == corev1.NodeInternalIP {
458+
cidr, err := parseIPToCidr(address.Address)
459+
if err != nil {
460+
return "", err
461+
}
462+
return cidr, nil
463+
}
464+
}
465+
return "", nil
466+
}
467+
410468
func getServiceCidr(ctx context.Context, client kubernetes.Interface, namespace string, ipDetails ipsDetails) ([]string, error) {
411469
res, err := getServiceCidrFromServiceCreation(ctx, client, namespace)
412470
if err == nil {

0 commit comments

Comments
 (0)