Skip to content

Commit 0de8ebb

Browse files
fix: auto-fetch cloud public CIDRs without cloud sync permissions (#660)
1 parent b8679d2 commit 0de8ebb

File tree

15 files changed

+649
-401
lines changed

15 files changed

+649
-401
lines changed

cmd/controller/app/app.go

Lines changed: 25 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -137,7 +137,7 @@ func (a *App) Run(ctx context.Context) error {
137137
k8sVersion,
138138
clientset,
139139
cfg.CloudProviderConfig.CloudProvider.Type,
140-
cfg.CloudProviderConfig.VPCStateController.UseZoneID,
140+
cfg.CloudProviderConfig.NetworkStateController.UseZoneID,
141141
)
142142

143143
kubeClient.RegisterHandlers(informersFactory)
@@ -146,23 +146,35 @@ func (a *App) Run(ctx context.Context) error {
146146
return kubeClient.Run(ctx)
147147
})
148148

149-
vpcCfg := cfg.CloudProviderConfig.VPCStateController
149+
vpcCfg := cfg.CloudProviderConfig.NetworkStateController
150150

151151
// Create and populate VPC index upfront (no cloud provider required for this)
152-
var vpcIndex *kube.VPCIndex
153-
if vpcCfg.Enabled || vpcCfg.StaticCIDRsFile != "" {
154-
vpcIndex = kube.NewVPCIndex(
152+
var networkIndex *kube.NetworkIndex
153+
cloudProviderType := cfg.CloudProviderConfig.CloudProvider.Type
154+
if vpcCfg.Enabled || vpcCfg.StaticCIDRsFile != "" || cloudProviderType != "" {
155+
networkIndex = kube.NewNetworkIndex(
155156
log,
156-
kube.VPCConfig{
157-
RefreshInterval: vpcCfg.RefreshInterval,
158-
CacheSize: vpcCfg.CacheSize,
159-
UseAwsZoneId: vpcCfg.UseZoneID,
157+
kube.NetworkConfig{
158+
NetworkRefreshInterval: vpcCfg.NetworkRefreshInterval,
159+
PublicCIDRsRefreshInterval: vpcCfg.PublicCIDRsRefreshInterval,
160+
CacheSize: vpcCfg.CacheSize,
161+
UseAwsZoneId: vpcCfg.UseZoneID,
160162
},
161163
)
162-
if err := controllers.LoadStaticCIDRsFromFile(log, vpcCfg.StaticCIDRsFile, vpcIndex); err != nil {
164+
if err := controllers.LoadStaticCIDRsFromFile(log, vpcCfg.StaticCIDRsFile, networkIndex); err != nil {
163165
log.Warnf("failed to load static CIDRs: %v", err)
164166
}
165-
kubeClient.SetVPCIndex(vpcIndex)
167+
kubeClient.SetVPCIndex(networkIndex)
168+
169+
// Launch cloud public CIDR controller (fetches from public endpoints, no auth needed).
170+
if cloudProviderType != "" {
171+
errg.Go(func() error {
172+
return controllers.NewCloudPublicCIDRController(log, controllers.CloudPublicCIDRControllerConfig{
173+
CloudProviderType: cloudProviderType,
174+
RefreshInterval: vpcCfg.PublicCIDRsRefreshInterval,
175+
}, networkIndex).Run(ctx)
176+
})
177+
}
166178
}
167179

168180
// Initialize cloud provider for cloud-sync features
@@ -171,9 +183,9 @@ func (a *App) Run(ctx context.Context) error {
171183
if err == nil {
172184
log.Infof("cloud provider %s initialized successfully", provider.Type())
173185

174-
if vpcCfg.Enabled && vpcIndex != nil {
186+
if vpcCfg.Enabled && networkIndex != nil {
175187
errg.Go(func() error {
176-
return controllers.NewVPCStateController(log, vpcCfg, provider, vpcIndex).Run(ctx)
188+
return controllers.NewVPCStateController(log, vpcCfg, provider, networkIndex).Run(ctx)
177189
})
178190
}
179191

cmd/controller/config/config.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -56,11 +56,11 @@ type AgentConfig struct {
5656
}
5757

5858
type CloudProviderConfig struct {
59-
CloudProvider cloudtypes.ProviderConfig `json:"cloudProvider"`
60-
VPCStateController controllers.VPCStateControllerConfig `json:"vpcStateController"`
61-
VolumeStateController controllers.VolumeStateControllerConfig `json:"volumeStateController"`
59+
CloudProvider cloudtypes.ProviderConfig `json:"cloudProvider"`
60+
NetworkStateController controllers.NetworkStateControllerConfig `json:"vpcStateController"`
61+
VolumeStateController controllers.VolumeStateControllerConfig `json:"volumeStateController"`
6262
}
6363

6464
func (c CloudProviderConfig) IsAnyControllerEnabled() bool {
65-
return c.VPCStateController.Enabled || c.VolumeStateController.Enabled
65+
return c.NetworkStateController.Enabled || c.VolumeStateController.Enabled
6666
}
Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
package controllers
2+
3+
import (
4+
"context"
5+
"time"
6+
7+
"github.com/castai/kvisor/cmd/controller/kube"
8+
"github.com/castai/kvisor/pkg/cloudprovider/serviceranges"
9+
cloudtypes "github.com/castai/kvisor/pkg/cloudprovider/types"
10+
"github.com/castai/logging"
11+
)
12+
13+
type CloudPublicCIDRControllerConfig struct {
14+
CloudProviderType cloudtypes.Type
15+
RefreshInterval time.Duration
16+
}
17+
18+
func NewCloudPublicCIDRController(
19+
log *logging.Logger,
20+
cfg CloudPublicCIDRControllerConfig,
21+
vpcIndex *kube.NetworkIndex,
22+
) *CloudPublicCIDRController {
23+
if cfg.RefreshInterval == 0 {
24+
cfg.RefreshInterval = 24 * time.Hour
25+
}
26+
return &CloudPublicCIDRController{
27+
log: log.WithField("component", "cloud_public_cidr_controller"),
28+
cfg: cfg,
29+
vpcIndex: vpcIndex,
30+
}
31+
}
32+
33+
type CloudPublicCIDRController struct {
34+
log *logging.Logger
35+
cfg CloudPublicCIDRControllerConfig
36+
vpcIndex *kube.NetworkIndex
37+
}
38+
39+
func (c *CloudPublicCIDRController) Run(ctx context.Context) error {
40+
c.log.Infof("running cloud public CIDR fetch for provider: %s", c.cfg.CloudProviderType)
41+
defer c.log.Info("stopping cloud public CIDR fetch")
42+
43+
c.fetchWithRetries(ctx)
44+
45+
ticker := time.NewTicker(c.cfg.RefreshInterval)
46+
defer ticker.Stop()
47+
48+
for {
49+
select {
50+
case <-ctx.Done():
51+
return ctx.Err()
52+
case <-ticker.C:
53+
c.fetchOnce(ctx)
54+
}
55+
}
56+
}
57+
58+
func (c *CloudPublicCIDRController) fetchWithRetries(ctx context.Context) {
59+
backoff := 2 * time.Second
60+
maxRetries := 5
61+
62+
for i := 0; i < maxRetries; i++ {
63+
if c.fetchOnce(ctx) {
64+
return
65+
}
66+
67+
if i < maxRetries-1 {
68+
c.log.Warnf("cloud public CIDR fetch attempt %d/%d failed, retrying in %v", i+1, maxRetries, backoff)
69+
select {
70+
case <-ctx.Done():
71+
return
72+
case <-time.After(backoff):
73+
backoff *= 2
74+
if backoff > 30*time.Second {
75+
backoff = 30 * time.Second
76+
}
77+
}
78+
}
79+
}
80+
81+
c.log.Errorf("failed to fetch cloud public CIDRs after %d attempts", maxRetries)
82+
}
83+
84+
func (c *CloudPublicCIDRController) fetchOnce(ctx context.Context) bool {
85+
ranges, domain, err := serviceranges.FetchServiceIPRanges(ctx, c.log, c.cfg.CloudProviderType)
86+
if err != nil {
87+
c.log.Errorf("fetching cloud public service IP ranges: %v", err)
88+
return false
89+
}
90+
91+
if err := c.vpcIndex.UpdateCloudPublicCIDRs(domain, ranges); err != nil {
92+
c.log.Errorf("updating cloud public CIDRs in VPC index: %v", err)
93+
return false
94+
}
95+
96+
c.log.Infof("cloud public CIDRs updated successfully (%d region groups)", len(ranges))
97+
return true
98+
}

cmd/controller/controllers/vpc_state_controller.go

Lines changed: 31 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -13,13 +13,14 @@ import (
1313
"github.com/castai/logging"
1414
)
1515

16-
type VPCStateControllerConfig struct {
17-
Enabled bool `json:"enabled"`
18-
UseZoneID bool `json:"useZoneID"`
19-
NetworkName string `json:"networkName"`
20-
RefreshInterval time.Duration `json:"refreshInterval"`
21-
CacheSize uint32 `json:"cacheSize"`
22-
StaticCIDRsFile string `json:"staticCIDRsFile"` // Path to YAML file
16+
type NetworkStateControllerConfig struct {
17+
Enabled bool `json:"enabled"`
18+
UseZoneID bool `json:"useZoneID"`
19+
NetworkName string `json:"networkName"`
20+
NetworkRefreshInterval time.Duration `json:"networkRefreshInterval"`
21+
PublicCIDRsRefreshInterval time.Duration `json:"publicCIDRsRefreshInterval"`
22+
CacheSize uint32 `json:"cacheSize"`
23+
StaticCIDRsFile string `json:"staticCIDRsFile"` // Path to YAML file
2324
}
2425

2526
// StaticCIDRMapping represents a manual CIDR to zone/region/service mapping.
@@ -38,9 +39,9 @@ type cloudProvider interface {
3839
RefreshNetworkState(ctx context.Context, network string) error
3940
}
4041

41-
func NewVPCStateController(log *logging.Logger, cfg VPCStateControllerConfig, cloudProvider cloudProvider, vpcIndex *kube.VPCIndex) *VPCStateController {
42-
if cfg.RefreshInterval == 0 {
43-
cfg.RefreshInterval = 1 * time.Hour
42+
func NewVPCStateController(log *logging.Logger, cfg NetworkStateControllerConfig, cloudProvider cloudProvider, vpcIndex *kube.NetworkIndex) *VPCStateController {
43+
if cfg.NetworkRefreshInterval == 0 {
44+
cfg.NetworkRefreshInterval = 1 * time.Hour
4445
}
4546
return &VPCStateController{
4647
log: log.WithField("component", "vpc_state_controller"),
@@ -52,9 +53,9 @@ func NewVPCStateController(log *logging.Logger, cfg VPCStateControllerConfig, cl
5253

5354
type VPCStateController struct {
5455
log *logging.Logger
55-
cfg VPCStateControllerConfig
56+
cfg NetworkStateControllerConfig
5657
cloudProvider cloudProvider
57-
vpcIndex *kube.VPCIndex
58+
vpcIndex *kube.NetworkIndex
5859
}
5960

6061
func (c *VPCStateController) Run(ctx context.Context) error {
@@ -63,34 +64,29 @@ func (c *VPCStateController) Run(ctx context.Context) error {
6364

6465
if err := c.fetchInitialNetworkState(ctx, c.vpcIndex); err != nil {
6566
c.log.Errorf("failed to fetch initial VPC state: %v", err)
66-
return nil
67+
return err
6768
}
6869

6970
return c.runRefreshLoop(ctx, c.vpcIndex)
7071
}
7172

72-
func (c *VPCStateController) fetchInitialNetworkState(ctx context.Context, vpcIndex *kube.VPCIndex) error {
73+
func (c *VPCStateController) fetchInitialNetworkState(ctx context.Context, vpcIndex *kube.NetworkIndex) error {
7374
backoff := 2 * time.Second
7475
maxRetries := 5
7576

7677
for i := 0; i < maxRetries; i++ {
77-
err := c.cloudProvider.RefreshNetworkState(ctx, c.cfg.NetworkName)
78-
if err != nil {
79-
c.log.Errorf("VPC state refresh failed: %v", err)
80-
continue
81-
}
82-
state, err := c.cloudProvider.GetNetworkState(ctx)
83-
if err == nil {
84-
if err := vpcIndex.Update(state); err != nil {
85-
c.log.Errorf("failed to update VPC index: %v", err)
86-
} else {
87-
c.log.Info("initial VPC state loaded successfully")
88-
return nil
89-
}
78+
if err := c.cloudProvider.RefreshNetworkState(ctx, c.cfg.NetworkName); err != nil {
79+
c.log.Warnf("VPC state refresh failed (attempt %d/%d): %v", i+1, maxRetries, err)
80+
} else if state, err := c.cloudProvider.GetNetworkState(ctx); err != nil {
81+
c.log.Warnf("VPC state fetch failed (attempt %d/%d): %v", i+1, maxRetries, err)
82+
} else if err := vpcIndex.Update(state); err != nil {
83+
c.log.Errorf("failed to update VPC index: %v", err)
84+
} else {
85+
c.log.Info("initial VPC state loaded successfully")
86+
return nil
9087
}
9188

9289
if i < maxRetries-1 {
93-
c.log.Warnf("VPC state fetch attempt %d/%d failed: %v, retrying in %v", i+1, maxRetries, err, backoff)
9490
select {
9591
case <-ctx.Done():
9692
return ctx.Err()
@@ -103,15 +99,14 @@ func (c *VPCStateController) fetchInitialNetworkState(ctx context.Context, vpcIn
10399
}
104100
}
105101

106-
c.log.Errorf("failed to fetch initial VPC state after %d attempts", maxRetries)
107-
return nil
102+
return fmt.Errorf("failed to fetch initial VPC state after %d attempts", maxRetries)
108103
}
109104

110-
func (c *VPCStateController) runRefreshLoop(ctx context.Context, vpcIndex *kube.VPCIndex) error {
111-
ticker := time.NewTicker(c.cfg.RefreshInterval)
105+
func (c *VPCStateController) runRefreshLoop(ctx context.Context, vpcIndex *kube.NetworkIndex) error {
106+
ticker := time.NewTicker(c.cfg.NetworkRefreshInterval)
112107
defer ticker.Stop()
113108

114-
c.log.Infof("starting VPC state refresh (interval: %v)", c.cfg.RefreshInterval)
109+
c.log.Infof("starting VPC state refresh (interval: %v)", c.cfg.NetworkRefreshInterval)
115110

116111
for {
117112
select {
@@ -135,13 +130,13 @@ func (c *VPCStateController) runRefreshLoop(ctx context.Context, vpcIndex *kube.
135130
continue
136131
}
137132

138-
c.log.Debug("VPC state refreshed successfully")
133+
c.log.Infof("VPC state refreshed successfully")
139134
}
140135
}
141136
}
142137

143138
// LoadStaticCIDRsFromFile loads static CIDRs from a YAML file.
144-
func LoadStaticCIDRsFromFile(log *logging.Logger, path string, vpcIndex *kube.VPCIndex) error {
139+
func LoadStaticCIDRsFromFile(log *logging.Logger, path string, vpcIndex *kube.NetworkIndex) error {
145140
if path == "" {
146141
return nil
147142
}
@@ -164,7 +159,7 @@ func LoadStaticCIDRsFromFile(log *logging.Logger, path string, vpcIndex *kube.VP
164159
return vpcIndex.AddStaticCIDRs(entries)
165160
}
166161

167-
// convertStaticMappingsToEntries converts config mappings to VPCIndex entries.
162+
// convertStaticMappingsToEntries converts config mappings to NetworkIndex entries.
168163
func convertStaticMappingsToEntries(mappings []StaticCIDRMapping) []kube.StaticCIDREntry {
169164
entries := make([]kube.StaticCIDREntry, len(mappings))
170165
for i, m := range mappings {

cmd/controller/controllers/vpc_state_controller_test.go

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -22,12 +22,12 @@ func TestVPCStateController(t *testing.T) {
2222

2323
provider := noop.NewProvider()
2424

25-
cfg := VPCStateControllerConfig{
26-
NetworkName: "test-network",
27-
RefreshInterval: 1 * time.Hour,
25+
cfg := NetworkStateControllerConfig{
26+
NetworkName: "test-network",
27+
NetworkRefreshInterval: 1 * time.Hour,
2828
}
2929

30-
vpcIndex := kube.NewVPCIndex(log, kube.VPCConfig{RefreshInterval: time.Hour, CacheSize: 1000})
30+
vpcIndex := kube.NewNetworkIndex(log, kube.NetworkConfig{NetworkRefreshInterval: time.Hour, CacheSize: 1000})
3131
ctrl := NewVPCStateController(log, cfg, provider, vpcIndex)
3232

3333
err := ctrl.Run(ctx)
@@ -42,14 +42,14 @@ func TestVPCStateController(t *testing.T) {
4242

4343
provider := noop.NewProvider()
4444

45-
cfg := VPCStateControllerConfig{
46-
NetworkName: "test-network",
47-
RefreshInterval: 0,
45+
cfg := NetworkStateControllerConfig{
46+
NetworkName: "test-network",
47+
NetworkRefreshInterval: 0,
4848
}
4949

50-
vpcIndex := kube.NewVPCIndex(log, kube.VPCConfig{RefreshInterval: time.Hour, CacheSize: 1000})
50+
vpcIndex := kube.NewNetworkIndex(log, kube.NetworkConfig{NetworkRefreshInterval: time.Hour, CacheSize: 1000})
5151
ctrl := NewVPCStateController(log, cfg, provider, vpcIndex)
52-
r.Equal(1*time.Hour, ctrl.cfg.RefreshInterval)
52+
r.Equal(1*time.Hour, ctrl.cfg.NetworkRefreshInterval)
5353
})
5454

5555
t.Run("uses configured refresh interval", func(t *testing.T) {
@@ -58,13 +58,13 @@ func TestVPCStateController(t *testing.T) {
5858
provider := noop.NewProvider()
5959

6060
customInterval := 30 * time.Minute
61-
cfg := VPCStateControllerConfig{
62-
NetworkName: "test-network",
63-
RefreshInterval: customInterval,
61+
cfg := NetworkStateControllerConfig{
62+
NetworkName: "test-network",
63+
NetworkRefreshInterval: customInterval,
6464
}
6565

66-
vpcIndex := kube.NewVPCIndex(log, kube.VPCConfig{RefreshInterval: time.Hour, CacheSize: 1000})
66+
vpcIndex := kube.NewNetworkIndex(log, kube.NetworkConfig{NetworkRefreshInterval: time.Hour, CacheSize: 1000})
6767
ctrl := NewVPCStateController(log, cfg, provider, vpcIndex)
68-
r.Equal(customInterval, ctrl.cfg.RefreshInterval)
68+
r.Equal(customInterval, ctrl.cfg.NetworkRefreshInterval)
6969
})
7070
}

cmd/controller/kube/client.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ type Client struct {
6767
kvisorControllerPodSpec *corev1.PodSpec
6868

6969
index *Index
70-
vpcIndex *VPCIndex
70+
vpcIndex *NetworkIndex
7171
volumeIndex *VolumeIndex
7272

7373
clusterInfo *ClusterInfo
@@ -106,12 +106,12 @@ func NewClient(
106106
}
107107

108108
// SetVPCIndex sets the VPC index for enriching external IPs with VPC metadata.
109-
func (c *Client) SetVPCIndex(vpcIndex *VPCIndex) {
109+
func (c *Client) SetVPCIndex(vpcIndex *NetworkIndex) {
110110
c.vpcIndex = vpcIndex
111111
}
112112

113113
// GetVPCIndex returns the VPC index if available.
114-
func (c *Client) GetVPCIndex() *VPCIndex {
114+
func (c *Client) GetVPCIndex() *NetworkIndex {
115115
return c.vpcIndex
116116
}
117117

0 commit comments

Comments
 (0)