Skip to content

Commit f0801ee

Browse files
author
Samu
authored
Allow extending cloud provider with more controllers (#617)
1 parent 27a628b commit f0801ee

File tree

20 files changed

+384
-399
lines changed

20 files changed

+384
-399
lines changed

cmd/controller/app/app.go

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ import (
3333
"github.com/castai/kvisor/cmd/controller/kube"
3434
"github.com/castai/kvisor/pkg/blobscache"
3535
"github.com/castai/kvisor/pkg/castai"
36+
"github.com/castai/kvisor/pkg/cloudprovider"
3637
"github.com/castai/kvisor/pkg/logging"
3738
)
3839

@@ -108,11 +109,19 @@ func (a *App) Run(ctx context.Context) error {
108109
})
109110

110111
// Initialize cloud provider if enabled
111-
if cfg.CloudProvider.Enabled {
112-
errg.Go(func() error {
113-
vpcMetadataCtrl := controllers.NewVPCMetadataController(log, cfg.CloudProvider, kubeClient)
114-
return vpcMetadataCtrl.Run(ctx)
115-
})
112+
if cfg.CloudProviderConfig.CloudProvider.Type != "" {
113+
provider, err := cloudprovider.NewProvider(ctx, cfg.CloudProviderConfig.CloudProvider)
114+
if err != nil {
115+
return fmt.Errorf("failed to initialize cloud provider: %w", err)
116+
}
117+
log.Infof("cloud provider %s initialized successfully", provider.Type())
118+
119+
if cfg.CloudProviderConfig.VPCStateController.Enabled {
120+
errg.Go(func() error {
121+
return controllers.NewVPCStateController(log,
122+
cfg.CloudProviderConfig.VPCStateController, provider, kubeClient).Run(ctx)
123+
})
124+
}
116125
}
117126

118127
// CAST AI specific logic.

cmd/controller/config/config.go

Lines changed: 14 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88
"github.com/castai/kvisor/cmd/controller/controllers/kubebench"
99
"github.com/castai/kvisor/cmd/controller/controllers/kubelinter"
1010
"github.com/castai/kvisor/pkg/castai"
11+
cloudtypes "github.com/castai/kvisor/pkg/cloudprovider/types"
1112
)
1213

1314
type Config struct {
@@ -32,14 +33,19 @@ type Config struct {
3233
MetricsHTTPListenPort int `json:"metricsHTTPListenPort"`
3334
KubeServerListenPort int `validate:"required" json:"kubeServerListenPort"`
3435

35-
CastaiController controllers.CastaiConfig `json:"castaiController"`
36-
CastaiEnv castai.Config `json:"castaiEnv"`
37-
ImageScan imagescan.Config `json:"imageScan"`
38-
Linter kubelinter.Config `json:"linter"`
39-
KubeBench kubebench.Config `json:"kubeBench"`
40-
JobsCleanup controllers.JobsCleanupConfig `json:"jobsCleanup"`
41-
AgentConfig AgentConfig `json:"agentConfig"`
42-
CloudProvider controllers.VPCMetadataConfig `json:"cloudProvider"`
36+
CastaiController controllers.CastaiConfig `json:"castaiController"`
37+
CastaiEnv castai.Config `json:"castaiEnv"`
38+
ImageScan imagescan.Config `json:"imageScan"`
39+
Linter kubelinter.Config `json:"linter"`
40+
KubeBench kubebench.Config `json:"kubeBench"`
41+
JobsCleanup controllers.JobsCleanupConfig `json:"jobsCleanup"`
42+
AgentConfig AgentConfig `json:"agentConfig"`
43+
CloudProviderConfig CloudProviderConfig `json:"cloudProviderConfig"`
44+
}
45+
46+
type CloudProviderConfig struct {
47+
CloudProvider cloudtypes.ProviderConfig `json:"cloudProvider"`
48+
VPCStateController controllers.VPCStateControllerConfig `json:"vpcStateController"`
4349
}
4450

4551
type AgentConfig struct {

cmd/controller/controllers/vpc_metadata_controller.go

Lines changed: 0 additions & 144 deletions
This file was deleted.
Lines changed: 129 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,129 @@
1+
package controllers
2+
3+
import (
4+
"context"
5+
"time"
6+
7+
"github.com/castai/kvisor/cmd/controller/kube"
8+
cloudtypes "github.com/castai/kvisor/pkg/cloudprovider/types"
9+
"github.com/castai/kvisor/pkg/logging"
10+
)
11+
12+
type VPCStateControllerConfig struct {
13+
Enabled bool `json:"enabled"`
14+
NetworkName string `json:"networkName"`
15+
RefreshInterval time.Duration `json:"refreshInterval"`
16+
CacheSize uint32 `json:"CacheSize"`
17+
}
18+
19+
type cloudProvider interface {
20+
Type() cloudtypes.Type
21+
GetNetworkState(ctx context.Context) (*cloudtypes.NetworkState, error)
22+
RefreshNetworkState(ctx context.Context, network string) error
23+
}
24+
25+
func NewVPCStateController(log *logging.Logger, cfg VPCStateControllerConfig, cloudProvider cloudProvider, kubeClient *kube.Client) *VPCStateController {
26+
if cfg.RefreshInterval == 0 {
27+
cfg.RefreshInterval = 1 * time.Hour
28+
}
29+
return &VPCStateController{
30+
log: log.WithField("component", "vpc_state_controller"),
31+
cfg: cfg,
32+
cloudProvider: cloudProvider,
33+
kubeClient: kubeClient,
34+
}
35+
}
36+
37+
type VPCStateController struct {
38+
log *logging.Logger
39+
cfg VPCStateControllerConfig
40+
cloudProvider cloudProvider
41+
kubeClient *kube.Client
42+
}
43+
44+
func (c *VPCStateController) Run(ctx context.Context) error {
45+
c.log.Infof("running for cloud provider: %s", c.cloudProvider.Type())
46+
defer c.log.Infof("stopping")
47+
48+
vpcIndex := kube.NewVPCIndex(c.log, c.cfg.RefreshInterval, c.cfg.CacheSize)
49+
50+
if err := c.fetchInitialNetworkState(ctx, vpcIndex); err != nil {
51+
c.log.Errorf("failed to fetch initial VPC state: %v", err)
52+
return nil
53+
}
54+
55+
c.kubeClient.SetVPCIndex(vpcIndex)
56+
57+
return c.runRefreshLoop(ctx, vpcIndex)
58+
}
59+
60+
func (c *VPCStateController) fetchInitialNetworkState(ctx context.Context, vpcIndex *kube.VPCIndex) error {
61+
backoff := 2 * time.Second
62+
maxRetries := 5
63+
64+
for i := 0; i < maxRetries; i++ {
65+
err := c.cloudProvider.RefreshNetworkState(ctx, c.cfg.NetworkName)
66+
if err != nil {
67+
c.log.Errorf("VPC state refresh failed: %v", err)
68+
continue
69+
}
70+
state, err := c.cloudProvider.GetNetworkState(ctx)
71+
if err == nil {
72+
if err := vpcIndex.Update(state); err != nil {
73+
c.log.Errorf("failed to update VPC index: %v", err)
74+
} else {
75+
c.log.Info("initial VPC state loaded successfully")
76+
return nil
77+
}
78+
}
79+
80+
if i < maxRetries-1 {
81+
c.log.Warnf("VPC state fetch attempt %d/%d failed: %v, retrying in %v", i+1, maxRetries, err, backoff)
82+
select {
83+
case <-ctx.Done():
84+
return ctx.Err()
85+
case <-time.After(backoff):
86+
backoff *= 2
87+
if backoff > 30*time.Second {
88+
backoff = 30 * time.Second
89+
}
90+
}
91+
}
92+
}
93+
94+
c.log.Errorf("failed to fetch initial VPC state after %d attempts", maxRetries)
95+
return nil
96+
}
97+
98+
func (c *VPCStateController) runRefreshLoop(ctx context.Context, vpcIndex *kube.VPCIndex) error {
99+
ticker := time.NewTicker(c.cfg.RefreshInterval)
100+
defer ticker.Stop()
101+
102+
c.log.Infof("starting VPC state refresh (interval: %v)", c.cfg.RefreshInterval)
103+
104+
for {
105+
select {
106+
case <-ctx.Done():
107+
return ctx.Err()
108+
case <-ticker.C:
109+
err := c.cloudProvider.RefreshNetworkState(ctx, c.cfg.NetworkName)
110+
if err != nil {
111+
c.log.Errorf("VPC state refresh failed: %v", err)
112+
continue
113+
}
114+
115+
state, err := c.cloudProvider.GetNetworkState(ctx)
116+
if err != nil {
117+
c.log.Errorf("VPC state loading failed: %v", err)
118+
continue
119+
}
120+
121+
if err := vpcIndex.Update(state); err != nil {
122+
c.log.Errorf("failed to update VPC index: %v", err)
123+
continue
124+
}
125+
126+
c.log.Debug("VPC state refreshed successfully")
127+
}
128+
}
129+
}

0 commit comments

Comments
 (0)