Skip to content

Commit 8bd1dce

Browse files
Periodically refresh cluster network ranges (#622)
1 parent 7b48d84 commit 8bd1dce

File tree

11 files changed

+462
-157
lines changed

11 files changed

+462
-157
lines changed

api/v1/kube/kube_api.pb.go

Lines changed: 14 additions & 5 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

api/v1/kube/kube_api.proto

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,10 @@ service KubeAPI {
1616
rpc GetCloudVolumes(GetCloudVolumesRequest) returns (GetCloudVolumesResponse);
1717
}
1818

19-
message GetClusterInfoRequest {}
19+
message GetClusterInfoRequest {
20+
bool exclude_other_cidr = 1;
21+
}
22+
2023
message GetClusterInfoResponse {
2124
repeated string pods_cidr = 1;
2225
repeated string service_cidr = 2;

cmd/agent/daemon/config/config.go

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -78,12 +78,13 @@ type EnricherConfig struct {
7878
}
7979

8080
type NetflowConfig struct {
81-
Enabled bool `json:"enabled"`
82-
ExportInterval time.Duration `json:"exportInterval"`
83-
Grouping ebpftracer.NetflowGrouping `json:"grouping"`
84-
MaxPublicIPs int16 `json:"maxPublicIPs"`
85-
CheckClusterNetworkRanges bool `json:"checkClusterNetworkRanges"`
86-
CgroupDNSCacheMaxEntries uint32 `json:"cgroupDNSCacheMaxEntries"`
81+
Enabled bool `json:"enabled"`
82+
ExportInterval time.Duration `json:"exportInterval"`
83+
Grouping ebpftracer.NetflowGrouping `json:"grouping"`
84+
MaxPublicIPs int16 `json:"maxPublicIPs"`
85+
CheckClusterNetworkRanges bool `json:"checkClusterNetworkRanges"`
86+
ClusterInfoRefreshInterval time.Duration `json:"clusterInfoRefreshInterval"`
87+
CgroupDNSCacheMaxEntries uint32 `json:"cgroupDNSCacheMaxEntries"`
8788
}
8889

8990
type FileAccessConfig struct {

cmd/agent/daemon/daemon.go

Lines changed: 14 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -82,12 +82,13 @@ func NewRunCommand(version string) *cobra.Command {
8282
gitCloneDetectionSignatureRedactPasswords = command.Flags().Bool("signature-git-clone-detection-redact-password", true, "If enabled, any password passed via the URL gets redacted")
8383
ingressNightmareExploitDetectionSignatureEnabled = command.Flags().Bool("signature-ingress-nightmare-exploit-detection-enabled", true, "Enables the detection signature to detect exploits of ingress nightmare")
8484

85-
netflowEnabled = command.Flags().Bool("netflow-enabled", false, "Enables netflow tracking")
86-
netflowCheckClusterNetworkRanges = command.Flags().Bool("netflow-check-cluster-network-ranges", true, "Check cluster network ranges before enriching destinations")
87-
netflowExportInterval = command.Flags().Duration("netflow-export-interval", 15*time.Second, "Netflow export interval")
88-
netflowMaxPublicIPsBucket = command.Flags().Int16("netflow-max-public-ips-bucket", -1, "Maximum number of unique public IPs destination before aggregating into 0.0.0.0 range")
89-
netflowCgroupDnsCacheMaxEntries = command.Flags().Uint32("netflow-cgroup-dns-cache-max-entries", 1024, "Number of dns cache entries per cgroup")
90-
netflowGrouping = ebpftracer.NetflowGroupingDropSrcPort
85+
netflowEnabled = command.Flags().Bool("netflow-enabled", false, "Enables netflow tracking")
86+
netflowCheckClusterNetworkRanges = command.Flags().Bool("netflow-check-cluster-network-ranges", true, "Check cluster network ranges before enriching destinations")
87+
netflowClusterInfoRefreshInterval = command.Flags().Duration("netflow-cluster-network-ranges-refresh-interval", 0, "Cluster network ranges refresh interval (0 to disable periodic refresh)")
88+
netflowExportInterval = command.Flags().Duration("netflow-export-interval", 15*time.Second, "Netflow export interval")
89+
netflowMaxPublicIPsBucket = command.Flags().Int16("netflow-max-public-ips-bucket", -1, "Maximum number of unique public IPs destination before aggregating into 0.0.0.0 range")
90+
netflowCgroupDnsCacheMaxEntries = command.Flags().Uint32("netflow-cgroup-dns-cache-max-entries", 1024, "Number of dns cache entries per cgroup")
91+
netflowGrouping = ebpftracer.NetflowGroupingDropSrcPort
9192

9293
processTreeEnabled = command.Flags().Bool("process-tree-enabled", false, "Enables process tree tracking")
9394

@@ -199,12 +200,13 @@ func NewRunCommand(version string) *cobra.Command {
199200
RedactSensitiveValuesRegex: redactSensitiveValuesRegex,
200201
},
201202
Netflow: config.NetflowConfig{
202-
Enabled: *netflowEnabled,
203-
Grouping: netflowGrouping,
204-
ExportInterval: *netflowExportInterval,
205-
MaxPublicIPs: *netflowMaxPublicIPsBucket,
206-
CheckClusterNetworkRanges: *netflowCheckClusterNetworkRanges,
207-
CgroupDNSCacheMaxEntries: *netflowCgroupDnsCacheMaxEntries,
203+
Enabled: *netflowEnabled,
204+
Grouping: netflowGrouping,
205+
ExportInterval: *netflowExportInterval,
206+
MaxPublicIPs: *netflowMaxPublicIPsBucket,
207+
CheckClusterNetworkRanges: *netflowCheckClusterNetworkRanges,
208+
ClusterInfoRefreshInterval: *netflowClusterInfoRefreshInterval,
209+
CgroupDNSCacheMaxEntries: *netflowCgroupDnsCacheMaxEntries,
208210
},
209211
Clickhouse: config.ClickhouseConfig{
210212
Addr: *clickhouseAddr,
Lines changed: 221 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,221 @@
1+
package pipeline
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"net/netip"
7+
"sync"
8+
"time"
9+
10+
kubepb "github.com/castai/kvisor/api/v1/kube"
11+
"github.com/castai/kvisor/pkg/logging"
12+
)
13+
14+
const (
15+
maxRetries = 10
16+
initialBackoff = 2 * time.Second
17+
maxBackoff = 1 * time.Minute
18+
)
19+
20+
type clusterInfo struct {
21+
mu sync.RWMutex
22+
kubeClient kubepb.KubeAPIClient
23+
log *logging.Logger
24+
podCidr []netip.Prefix
25+
serviceCidr []netip.Prefix
26+
nodeCidr []netip.Prefix
27+
vpcCidr []netip.Prefix
28+
cloudCidr []netip.Prefix
29+
clusterCidr []netip.Prefix
30+
}
31+
32+
func (c *Controller) runClusterInfoPipeline(ctx context.Context) {
33+
c.clusterInfo = newClusterInfo(c.kubeClient, c.log)
34+
35+
clusterInfoCtx, clusterInfoCancel := context.WithTimeout(ctx, maxBackoff)
36+
defer clusterInfoCancel()
37+
38+
// initial sync of cluster CIDRs info
39+
if err := c.clusterInfo.sync(clusterInfoCtx); err != nil {
40+
c.log.Errorf("syncing cluster info: %v", err)
41+
}
42+
43+
// Start periodic refresh if interval is configured
44+
// this is needed to keep cluster CIDRs up to date
45+
if c.cfg.Netflow.ClusterInfoRefreshInterval == 0 {
46+
return
47+
}
48+
49+
ticker := time.NewTicker(c.cfg.Netflow.ClusterInfoRefreshInterval)
50+
defer ticker.Stop()
51+
52+
c.log.Infof("starting cluster info refresh with interval %s", c.cfg.Netflow.ClusterInfoRefreshInterval)
53+
54+
for {
55+
select {
56+
case <-ctx.Done():
57+
c.log.Info("stopping cluster info refresh")
58+
return
59+
case <-ticker.C:
60+
c.log.Debug("refreshing cluster info")
61+
62+
clusterInfoCtx, clusterInfoCancel := context.WithTimeout(ctx, maxBackoff)
63+
defer clusterInfoCancel()
64+
65+
if err := c.clusterInfo.sync(clusterInfoCtx); err != nil {
66+
c.log.Errorf("syncing cluster info failed: %v, retry in %s", err, c.cfg.Netflow.ClusterInfoRefreshInterval)
67+
}
68+
}
69+
}
70+
}
71+
72+
func newClusterInfo(kubeClient kubepb.KubeAPIClient, log *logging.Logger) *clusterInfo {
73+
return &clusterInfo{
74+
kubeClient: kubeClient,
75+
log: log,
76+
}
77+
}
78+
79+
func (c *clusterInfo) sync(ctx context.Context) error {
80+
var resp *kubepb.GetClusterInfoResponse
81+
var err error
82+
83+
backoff := initialBackoff
84+
for attempt := 1; attempt <= maxRetries; attempt++ {
85+
// There is no point to refersh cloud CIDRs as they are changed very infrequently
86+
resp, err = c.kubeClient.GetClusterInfo(ctx, &kubepb.GetClusterInfoRequest{ExcludeOtherCidr: len(c.cloudCidr) > 0})
87+
if err == nil {
88+
break
89+
}
90+
91+
if attempt < maxRetries {
92+
c.log.Warnf("getting cluster info (attempt %d/%d): %v, retrying in %v", attempt, maxRetries, err, backoff)
93+
94+
select {
95+
case <-ctx.Done():
96+
return fmt.Errorf("context cancelled during retry: %w", ctx.Err())
97+
case <-time.After(backoff):
98+
backoff *= 2
99+
if backoff > maxBackoff {
100+
backoff = maxBackoff
101+
}
102+
}
103+
} else {
104+
return fmt.Errorf("getting cluster info after %d attempts: %w", maxRetries, err)
105+
}
106+
}
107+
108+
var podCidr, serviceCidr, nodeCidr, vpcCidr, cloudCidr, clusterCidr []netip.Prefix
109+
110+
for _, cidr := range resp.GetPodsCidr() {
111+
subnet, err := netip.ParsePrefix(cidr)
112+
if err != nil {
113+
c.log.Errorf("parsing pods cidr: %v", err)
114+
continue
115+
}
116+
podCidr = append(podCidr, subnet)
117+
clusterCidr = append(clusterCidr, subnet)
118+
}
119+
for _, cidr := range resp.GetServiceCidr() {
120+
subnet, err := netip.ParsePrefix(cidr)
121+
if err != nil {
122+
c.log.Errorf("parsing service cidr: %v", err)
123+
continue
124+
}
125+
serviceCidr = append(serviceCidr, subnet)
126+
clusterCidr = append(clusterCidr, subnet)
127+
}
128+
for _, cidr := range resp.GetNodeCidr() {
129+
subnet, err := netip.ParsePrefix(cidr)
130+
if err != nil {
131+
c.log.Errorf("parsing node cidr: %v", err)
132+
continue
133+
}
134+
nodeCidr = append(nodeCidr, subnet)
135+
clusterCidr = append(clusterCidr, subnet)
136+
}
137+
for _, cidr := range resp.GetVpcCidr() {
138+
subnet, err := netip.ParsePrefix(cidr)
139+
if err != nil {
140+
c.log.Errorf("parsing vpc cidr: %v", err)
141+
continue
142+
}
143+
vpcCidr = append(vpcCidr, subnet)
144+
clusterCidr = append(clusterCidr, subnet)
145+
}
146+
for _, cidr := range resp.GetOtherCidr() {
147+
subnet, err := netip.ParsePrefix(cidr)
148+
if err != nil {
149+
c.log.Errorf("parsing other cidr: %v", err)
150+
continue
151+
}
152+
cloudCidr = append(cloudCidr, subnet)
153+
clusterCidr = append(clusterCidr, subnet)
154+
}
155+
156+
c.mu.Lock()
157+
c.podCidr = podCidr
158+
c.serviceCidr = serviceCidr
159+
c.nodeCidr = nodeCidr
160+
c.vpcCidr = vpcCidr
161+
c.clusterCidr = clusterCidr
162+
if len(cloudCidr) > 0 {
163+
c.cloudCidr = cloudCidr
164+
}
165+
c.log.Infof(
166+
"fetched cluster info, pod_cidr=%s, service_cidr=%s, node_cidr=%s, vpc_cidr=%s, cloud_cidr_count=%d",
167+
c.podCidr, c.serviceCidr, c.nodeCidr, c.vpcCidr, len(c.cloudCidr),
168+
)
169+
c.mu.Unlock()
170+
171+
return nil
172+
}
173+
174+
func (c *clusterInfo) cloudCidrContains(ip netip.Addr) bool {
175+
if c == nil {
176+
// happens when netflow-check-cluster-network-ranges=false
177+
return false
178+
}
179+
180+
c.mu.RLock()
181+
defer c.mu.RUnlock()
182+
for _, cidr := range c.cloudCidr {
183+
if cidr.Contains(ip) {
184+
return true
185+
}
186+
}
187+
return false
188+
}
189+
190+
func (c *clusterInfo) serviceCidrContains(ip netip.Addr) bool {
191+
if c == nil {
192+
// happens when netflow-check-cluster-network-ranges=false
193+
return false
194+
}
195+
196+
c.mu.RLock()
197+
defer c.mu.RUnlock()
198+
for _, cidr := range c.serviceCidr {
199+
if cidr.Contains(ip) {
200+
return true
201+
}
202+
}
203+
return false
204+
}
205+
206+
func (c *clusterInfo) clusterCidrContains(ip netip.Addr) bool {
207+
if c == nil {
208+
// happens when netflow-check-cluster-network-ranges=false
209+
return false
210+
}
211+
212+
c.mu.RLock()
213+
defer c.mu.RUnlock()
214+
for _, cidr := range c.clusterCidr {
215+
if cidr.Contains(ip) {
216+
return true
217+
}
218+
}
219+
220+
return false
221+
}

0 commit comments

Comments
 (0)