-
Notifications
You must be signed in to change notification settings - Fork 12
Expand file tree
/
Copy pathcloud_public_cidr_controller.go
More file actions
98 lines (82 loc) · 2.38 KB
/
cloud_public_cidr_controller.go
File metadata and controls
98 lines (82 loc) · 2.38 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
package controllers
import (
"context"
"time"
"github.com/castai/kvisor/cmd/controller/kube"
"github.com/castai/kvisor/pkg/cloudprovider/serviceranges"
cloudtypes "github.com/castai/kvisor/pkg/cloudprovider/types"
"github.com/castai/logging"
)
type CloudPublicCIDRControllerConfig struct {
CloudProviderType cloudtypes.Type
RefreshInterval time.Duration
}
func NewCloudPublicCIDRController(
log *logging.Logger,
cfg CloudPublicCIDRControllerConfig,
vpcIndex *kube.NetworkIndex,
) *CloudPublicCIDRController {
if cfg.RefreshInterval == 0 {
cfg.RefreshInterval = 24 * time.Hour
}
return &CloudPublicCIDRController{
log: log.WithField("component", "cloud_public_cidr_controller"),
cfg: cfg,
vpcIndex: vpcIndex,
}
}
type CloudPublicCIDRController struct {
log *logging.Logger
cfg CloudPublicCIDRControllerConfig
vpcIndex *kube.NetworkIndex
}
func (c *CloudPublicCIDRController) Run(ctx context.Context) error {
c.log.Infof("running cloud public CIDR fetch for provider: %s", c.cfg.CloudProviderType)
defer c.log.Info("stopping cloud public CIDR fetch")
c.fetchWithRetries(ctx)
ticker := time.NewTicker(c.cfg.RefreshInterval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return ctx.Err()
case <-ticker.C:
c.fetchOnce(ctx)
}
}
}
func (c *CloudPublicCIDRController) fetchWithRetries(ctx context.Context) {
backoff := 2 * time.Second
maxRetries := 5
for i := 0; i < maxRetries; i++ {
if c.fetchOnce(ctx) {
return
}
if i < maxRetries-1 {
c.log.Warnf("cloud public CIDR fetch attempt %d/%d failed, retrying in %v", i+1, maxRetries, backoff)
select {
case <-ctx.Done():
return
case <-time.After(backoff):
backoff *= 2
if backoff > 30*time.Second {
backoff = 30 * time.Second
}
}
}
}
c.log.Errorf("failed to fetch cloud public CIDRs after %d attempts", maxRetries)
}
func (c *CloudPublicCIDRController) fetchOnce(ctx context.Context) bool {
ranges, domain, err := serviceranges.FetchServiceIPRanges(ctx, c.log, c.cfg.CloudProviderType)
if err != nil {
c.log.Errorf("fetching cloud public service IP ranges: %v", err)
return false
}
if err := c.vpcIndex.UpdateCloudPublicCIDRs(domain, ranges); err != nil {
c.log.Errorf("updating cloud public CIDRs in VPC index: %v", err)
return false
}
c.log.Infof("cloud public CIDRs updated successfully (%d region groups)", len(ranges))
return true
}