Skip to content

Commit 2d3b69d

Browse files
authored
Merge pull request #18 from numberly/rate-limit-vault-api
feat: implement a new rate limit on the vault API to avoid 429 error
2 parents 5eb599f + a537831 commit 2d3b69d

File tree

4 files changed

+52
-23
lines changed

4 files changed

+52
-23
lines changed

pkg/config/config.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ type Config struct {
2626
SyncTTLSecond int `yaml:"syncTTLSecond" envconfig:"sync_ttl_second"`
2727
InjectorLabel string `yaml:"injectorLabel" envconfig:"injector_label"`
2828
DefaultEngine string `yaml:"defaultEngine" envconfig:"default_engine"`
29+
VaultRateLimit int `yaml:"vaultRateLimit" envconfig:"vault_rate_limit"`
2930
}
3031

3132
func NewConfig(configFile string) (*Config, error) {
@@ -45,6 +46,7 @@ func NewConfig(configFile string) (*Config, error) {
4546
SyncTTLSecond: 300,
4647
InjectorLabel: "vault-db-injector",
4748
DefaultEngine: "databases",
49+
VaultRateLimit: 50,
4850
}
4951
if configFile != "" {
5052
data, err := os.ReadFile(configFile)

pkg/k8smutator/k8smutator.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,7 @@ func handlePodConfiguration(ctx context.Context, cfg *config.Config, dbConfs *[]
9191
logger.Errorf("Their is an issue with the db Configuration")
9292
return nil, "db-role not found", nil, err
9393
}
94-
vaultConn := vault.NewConnector(cfg.VaultAddress, cfg.VaultAuthPath, cfg.KubeRole, vaultDbPath, dbConf.Role, tok)
94+
vaultConn := vault.NewConnector(cfg.VaultAddress, cfg.VaultAuthPath, cfg.KubeRole, vaultDbPath, dbConf.Role, tok, cfg.VaultRateLimit)
9595
if err := vaultConn.Login(ctx); err != nil {
9696
return nil, dbConf.Role, nil, errors.Newf("cannot authenticate vault role: %s", err.Error())
9797
}

pkg/vault/handle_token.go

Lines changed: 38 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010
"github.com/numberly/vault-db-injector/pkg/config"
1111
"github.com/numberly/vault-db-injector/pkg/k8s"
1212
promInjector "github.com/numberly/vault-db-injector/pkg/prometheus"
13+
"golang.org/x/time/rate"
1314
)
1415

1516
type KeyInformation struct {
@@ -119,24 +120,35 @@ func (c *Connector) ListKeyInformations(ctx context.Context, path, prefix string
119120
var wg sync.WaitGroup
120121
keyInformationsChan := make(chan *KeyInformation, len(keys))
121122

123+
// Create a rate limiter
124+
rateLimit := rate.Limit(c.VaultRateLimit) // requests per second
125+
limiter := rate.NewLimiter(rateLimit, 1)
126+
122127
for _, k := range keys {
123128
wg.Add(1)
124129
go func(k interface{}) {
125130
defer wg.Done()
131+
132+
// Wait for the rate limiter
133+
if err := limiter.Wait(ctx); err != nil {
134+
c.Log.Errorf("Rate limiter error: %v", err)
135+
return
136+
}
137+
126138
podName := strings.TrimSuffix(k.(string), "/")
127139

128140
// Utiliser le préfixe pour lire les données
129141
dataPath := fmt.Sprintf("%s/data/%s/%s", path, prefix, podName)
130142
podSecret, err := c.client.Logical().ReadWithContext(ctx, dataPath)
131143
if err != nil {
132-
c.Log.Errorf("Error while trying to recover data informations for : %s: %v", podName, err)
144+
c.Log.Errorf("Error while trying to recover data informations for: %s: %v", podName, err)
133145
return
134146
}
135147

136148
if podSecret == nil || podSecret.Data == nil || podSecret.Data["data"] == nil {
137149
status, err := c.DeleteData(ctx, podName, path, podName, "", prefix)
138150
if err != nil {
139-
c.Log.Errorf("Data for %s can't be deleted : %s with error : %s", podName, status, err.Error())
151+
c.Log.Errorf("Data for %s can't be deleted: %s with error: %s", podName, status, err.Error())
140152
}
141153
return
142154
}
@@ -189,61 +201,73 @@ func (c *Connector) HandleTokens(ctx context.Context, cfg *config.Config, keysIn
189201
return false
190202
}
191203

192-
// Créer une map pour une recherche rapide des podsInformations
204+
// Create a map for quick lookup of pod information
193205
podInfoMap := make(map[string]k8s.PodInformations)
194206
for _, pi := range podsInformations {
195207
for _, uuid := range pi.PodNameUUIDs {
196208
podInfoMap[uuid] = pi
197209
}
198-
199210
}
200211

201212
var KubePolicies []string
202213
KubePolicies = append(KubePolicies, c.authRole)
203214
_, err = c.CreateOrphanToken(ctx, "1h", KubePolicies)
204215
if err != nil {
205-
c.Log.Errorf("Can't create orphan ticket : %v", err)
216+
c.Log.Errorf("Can't create orphan ticket: %v", err)
206217
c.Log.Error("Token renew has been cancelled")
207218
return false
208219
}
220+
221+
// Create a rate limiter
222+
rateLimit := rate.Limit(cfg.VaultRateLimit) // requests per second
223+
limiter := rate.NewLimiter(rateLimit, 1)
224+
209225
var wg sync.WaitGroup
210226
var isOk bool = true
227+
211228
for _, ki := range keysInformations {
212229
wg.Add(1)
213230
go func(ki *KeyInformation) {
214231
defer wg.Done()
215232

233+
// Wait for the rate limiter
234+
if err := limiter.Wait(ctx); err != nil {
235+
c.Log.Errorf("Rate limiter error: %v", err)
236+
isOk = false
237+
return
238+
}
239+
216240
if _, found := podInfoMap[ki.PodNameUID]; found {
217241
err := c.RenewToken(ctx, ki.TokenId, ki.PodNameUID, ki.Namespace, SyncTTLSecond)
218242
if err != nil {
219-
c.Log.Errorf("Can't renew Token with pod UUID : %s", ki.PodNameUID)
243+
c.Log.Errorf("Can't renew Token with pod UUID: %s", ki.PodNameUID)
220244
isOk = false
221245
return
222246
}
223-
err = c.RenewLease(ctx, ki.LeaseId, 86400*5, ki.PodNameUID, ki.Namespace) // Renew for 1week
247+
err = c.RenewLease(ctx, ki.LeaseId, 86400*5, ki.PodNameUID, ki.Namespace) // Renew for 1 week
224248
if err != nil {
225-
c.Log.Errorf("Can't renew Lease with pod UUID : %s", ki.PodNameUID)
249+
c.Log.Errorf("Can't renew Lease with pod UUID: %s", ki.PodNameUID)
226250
isOk = false
227251
return
228252
}
229253
} else {
230254
leaseTooYoung, err := c.isLeaseTooYoung(ctx, ki.LeaseId)
231255
if err != nil {
232-
c.Log.Debug("error while trying to retrieve lease age, lease will be cleaned")
256+
c.Log.Debug("Error while trying to retrieve lease age, lease will be cleaned")
233257
}
234258
if leaseTooYoung {
235-
c.Log.Infof("This lease : %s is too young to be cleaned up.", ki.LeaseId)
259+
c.Log.Infof("This lease: %s is too young to be cleaned up.", ki.LeaseId)
236260
return
237261
}
238262
err = c.RevokeOrphanToken(ctx, ki.TokenId, ki.PodNameUID, ki.Namespace)
239263
if err != nil {
240-
c.Log.Errorf("Can't revok Token with UUID : %s", ki.PodNameUID)
264+
c.Log.Errorf("Can't revoke Token with UUID: %s", ki.PodNameUID)
241265
isOk = false
242266
return
243267
}
244268
status, err := c.DeleteData(ctx, ki.PodNameUID, secretName, ki.PodNameUID, ki.Namespace, prefix)
245269
if err != nil {
246-
c.Log.Errorf("Data for %s can't be deleted : %s with error : %s", ki.PodNameUID, status, err.Error())
270+
c.Log.Errorf("Data for %s can't be deleted: %s with error: %s", ki.PodNameUID, status, err.Error())
247271
isOk = false
248272
return
249273
}
@@ -252,10 +276,11 @@ func (c *Connector) HandleTokens(ctx context.Context, cfg *config.Config, keysIn
252276
promInjector.RenewLeaseCount.DeleteLabelValues(ki.PodNameUID, ki.Namespace)
253277
promInjector.RenewTokenCount.DeleteLabelValues(ki.PodNameUID, ki.Namespace)
254278
promInjector.DataDeletedCount.DeleteLabelValues(ki.PodNameUID, ki.Namespace)
255-
c.Log.Infof("Token has been revoked and data deleted : %s", status)
279+
c.Log.Infof("Token has been revoked and data deleted: %s", status)
256280
}
257281
}(ki)
258282
}
283+
259284
wg.Wait()
260285
c.RevokeSelfToken(ctx, c.client.Token(), "", "")
261286
c.SetToken(c.K8sSaVaultToken)

pkg/vault/vault.go

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ type Connector struct {
2828
client *vault.Client
2929
RenewalInterval time.Duration
3030
Log logger.Logger
31+
VaultRateLimit int
3132
}
3233

3334
func (c *Connector) GetToken() string {
@@ -46,15 +47,16 @@ type DbCreds struct {
4647
DbTokenId string
4748
}
4849

49-
func NewConnector(address string, authPath string, authRole string, dbMountPath string, dbRole string, token string) *Connector {
50+
func NewConnector(address string, authPath string, authRole string, dbMountPath string, dbRole string, token string, VaultRateLimit int) *Connector {
5051
return &Connector{
51-
address: address,
52-
authPath: authPath,
53-
dbRole: dbRole,
54-
dbMountPath: dbMountPath,
55-
k8sSaToken: token,
56-
authRole: authRole,
57-
Log: logger.GetLogger(),
52+
address: address,
53+
authPath: authPath,
54+
dbRole: dbRole,
55+
dbMountPath: dbMountPath,
56+
k8sSaToken: token,
57+
authRole: authRole,
58+
Log: logger.GetLogger(),
59+
VaultRateLimit: VaultRateLimit,
5860
}
5961
}
6062

@@ -67,7 +69,7 @@ func ConnectToVault(ctx context.Context, cfg *config.Config) (*Connector, error)
6769
return nil, errors.Newf("cannot get ServiceAccount token: %s", err.Error())
6870
}
6971
// Configure vault connection using serviceAccount token
70-
vaultConn := NewConnector(cfg.VaultAddress, cfg.VaultAuthPath, cfg.KubeRole, "random", "random", tok)
72+
vaultConn := NewConnector(cfg.VaultAddress, cfg.VaultAuthPath, cfg.KubeRole, "random", "random", tok, cfg.VaultRateLimit)
7173
if err := vaultConn.Login(ctx); // Assuming Login is modified to accept a context
7274
err != nil {
7375
promInjector.ConnectVaultError.WithLabelValues().Inc()

0 commit comments

Comments
 (0)