Skip to content

Commit 6f14889

Browse files
committed
feat: Handle Secret Lookup Rate Limit errors
1 parent 804c52f commit 6f14889

File tree

11 files changed

+134
-29
lines changed

11 files changed

+134
-29
lines changed

api/context/context.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,14 +5,15 @@ import (
55
"fmt"
66
"strings"
77

8-
"github.com/flanksource/canary-checker/api/external"
9-
v1 "github.com/flanksource/canary-checker/api/v1"
10-
"github.com/flanksource/canary-checker/pkg"
118
"github.com/flanksource/duty/connection"
129
dutyCtx "github.com/flanksource/duty/context"
1310
"github.com/flanksource/duty/models"
1411
"github.com/flanksource/duty/types"
1512
"github.com/flanksource/gomplate/v3"
13+
14+
"github.com/flanksource/canary-checker/api/external"
15+
v1 "github.com/flanksource/canary-checker/api/v1"
16+
"github.com/flanksource/canary-checker/pkg"
1617
)
1718

1819
var DefaultContext dutyCtx.Context

checks/kubernetes_resource.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -200,7 +200,7 @@ func (c *KubernetesResourceChecker) Check(ctx context.Context, check v1.Kubernet
200200

201201
ctx = _ctx.(context.Context)
202202
checkCtx := context.New(ctx.Context, virtualCanary)
203-
res, err := Exec(checkCtx)
203+
res, err := RunChecksNoPersistence(checkCtx)
204204
if err != nil {
205205
return fmt.Errorf("error executing check: %w", err)
206206
} else {

checks/runchecks.go

Lines changed: 90 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -9,20 +9,37 @@ import (
99
"time"
1010

1111
"github.com/flanksource/artifacts"
12+
dutyCtx "github.com/flanksource/duty/context"
13+
"github.com/flanksource/duty/models"
14+
"github.com/google/uuid"
15+
gocache "github.com/patrickmn/go-cache"
16+
"golang.org/x/time/rate"
17+
1218
"github.com/flanksource/canary-checker/api/context"
1319
"github.com/flanksource/canary-checker/api/external"
1420
v1 "github.com/flanksource/canary-checker/api/v1"
1521
"github.com/flanksource/canary-checker/pkg"
1622
"github.com/flanksource/canary-checker/pkg/utils"
17-
"github.com/flanksource/duty/models"
18-
"github.com/google/uuid"
19-
gocache "github.com/patrickmn/go-cache"
23+
)
24+
25+
const (
26+
defaultSecretLookupFailureThreshold = 50
27+
defaultSecretLookupFailureWindow = 30 * time.Minute
2028
)
2129

2230
var checksCache = gocache.New(5*time.Minute, 5*time.Minute)
2331

32+
var secretLookupRateLimiter = rate.NewLimiter(
33+
rate.Limit(float64(defaultSecretLookupFailureThreshold)/defaultSecretLookupFailureWindow.Seconds()),
34+
defaultSecretLookupFailureThreshold,
35+
)
36+
2437
var DisabledChecks []string
2538

39+
type RunChecksMeta struct {
40+
SecretLookupRateLimitSkipped int
41+
}
42+
2643
func getDisabledChecks(ctx *context.Context) (map[string]struct{}, error) {
2744
if val, ok := checksCache.Get("disabledChecks"); ok {
2845
return val.(map[string]struct{}), nil
@@ -150,8 +167,13 @@ func sortChecksByDependency(checks []external.Check) ([]external.Check, error) {
150167
return append(unnamedChecks, sorted...), nil
151168
}
152169

153-
// Exec runs the actions specified and returns the results, without saving artifacts
154-
func Exec(ctx *context.Context) ([]*pkg.CheckResult, error) {
170+
// RunChecksNoPersistence executes checks and returns transformed results without persistence side effects.
171+
//
172+
// Unlike RunChecks, it does not perform canary deletion checks, dependency ordering,
173+
// artifact saving, or final result post-processing (e.g. resultMode handling).
174+
// It exports only check spec metrics (not per-result emitted metrics) and is intended
175+
// for embedded execution paths (e.g. topology lookups and kubernetesResource sub-checks).
176+
func RunChecksNoPersistence(ctx *context.Context) ([]*pkg.CheckResult, error) {
155177
var results []*pkg.CheckResult
156178
disabledChecks, err := getDisabledChecks(ctx)
157179
if err != nil {
@@ -174,9 +196,11 @@ func Exec(ctx *context.Context) ([]*pkg.CheckResult, error) {
174196

175197
result := c.Run(ctx)
176198
transformedResults := TransformResults(ctx, result)
177-
results = append(results, transformedResults...)
178-
ExportCheckMetrics(ctx, transformedResults, false)
199+
_, filteredResults := filterSecretLookupRateLimitedResults(ctx, transformedResults)
200+
results = append(results, filteredResults...)
201+
ExportCheckMetrics(ctx, filteredResults, false)
179202
}
203+
180204
return results, nil
181205
}
182206

@@ -189,23 +213,24 @@ func hasDependencies(checks []external.Check) bool {
189213
return false
190214
}
191215

192-
func RunChecks(ctx *context.Context) ([]*pkg.CheckResult, error) {
216+
func RunChecks(ctx *context.Context) ([]*pkg.CheckResult, RunChecksMeta, error) {
193217
var results []*pkg.CheckResult
218+
meta := RunChecksMeta{}
194219
disabledChecks, err := getDisabledChecks(ctx)
195220
if err != nil {
196-
return nil, fmt.Errorf("error getting disabled checks: %v", err)
221+
return nil, meta, fmt.Errorf("error getting disabled checks: %v", err)
197222
}
198223

199224
// Check if canary is not marked deleted in DB
200225
if ctx.DB() != nil && ctx.Canary.GetPersistedID() != "" {
201226
var deletedAt sql.NullTime
202227
err := ctx.DB().Table("canaries").Select("deleted_at").Where("id = ? and deleted_at < now()", ctx.Canary.GetPersistedID()).Scan(&deletedAt).Error
203228
if err != nil {
204-
return nil, fmt.Errorf("error getting canary: %v", err)
229+
return nil, meta, fmt.Errorf("error getting canary: %v", err)
205230
}
206231

207232
if deletedAt.Valid {
208-
return nil, nil
233+
return nil, meta, nil
209234
}
210235
}
211236

@@ -215,7 +240,7 @@ func RunChecks(ctx *context.Context) ([]*pkg.CheckResult, error) {
215240
if hasDependencies(checks) {
216241
sortedChecks, err := sortChecksByDependency(checks)
217242
if err != nil {
218-
return nil, fmt.Errorf("failed to sort checks: %v", err)
243+
return nil, meta, fmt.Errorf("failed to sort checks: %v", err)
219244
}
220245

221246
for _, check := range sortedChecks {
@@ -241,11 +266,13 @@ func RunChecks(ctx *context.Context) ([]*pkg.CheckResult, error) {
241266

242267
result := singleRunner.Check(ctx, check)
243268
transformedResults := TransformResults(ctx, result)
244-
results = append(results, transformedResults...)
245-
ExportCheckMetrics(ctx, transformedResults, true)
269+
skippedCount, filteredResults := filterSecretLookupRateLimitedResults(ctx, transformedResults)
270+
meta.SecretLookupRateLimitSkipped += skippedCount
271+
results = append(results, filteredResults...)
272+
ExportCheckMetrics(ctx, filteredResults, true)
246273

247-
if check.GetName() != "" && len(transformedResults) > 0 && transformedResults[0].Pass {
248-
ctx.SetOutput(check.GetName(), transformedResults[0])
274+
if check.GetName() != "" && len(filteredResults) > 0 && filteredResults[0].Pass {
275+
ctx.SetOutput(check.GetName(), filteredResults[0])
249276
}
250277
}
251278
} else {
@@ -259,16 +286,18 @@ func RunChecks(ctx *context.Context) ([]*pkg.CheckResult, error) {
259286

260287
result := c.Run(ctx)
261288
transformedResults := TransformResults(ctx, result)
262-
results = append(results, transformedResults...)
263-
ExportCheckMetrics(ctx, transformedResults, true)
289+
skippedCount, filteredResults := filterSecretLookupRateLimitedResults(ctx, transformedResults)
290+
meta.SecretLookupRateLimitSkipped += skippedCount
291+
results = append(results, filteredResults...)
292+
ExportCheckMetrics(ctx, filteredResults, true)
264293
}
265294
}
266295

267296
if err := saveArtifacts(ctx, results); err != nil {
268297
ctx.Errorf("error saving artifacts: %v", err)
269298
}
270299

271-
return ProcessResults(ctx, results), nil
300+
return ProcessResults(ctx, results), meta, nil
272301
}
273302

274303
func saveArtifacts(ctx *context.Context, results pkg.Results) error {
@@ -356,6 +385,48 @@ func TransformResults(ctx *context.Context, in []*pkg.CheckResult) (out []*pkg.C
356385
return out
357386
}
358387

388+
func isSecretLookupRateLimitResult(result *pkg.CheckResult) bool {
389+
if result == nil {
390+
return false
391+
}
392+
if dutyCtx.IsSecretLookupRateLimited(result.ErrorObject) {
393+
return true
394+
}
395+
return strings.Contains(strings.ToLower(result.Error), strings.ToLower(dutyCtx.ErrSecretLookupRateLimited.Error()))
396+
}
397+
398+
func filterSecretLookupRateLimitedResults(ctx *context.Context, in []*pkg.CheckResult) (int, []*pkg.CheckResult) {
399+
if len(in) == 0 {
400+
return 0, in
401+
}
402+
403+
skipped := 0
404+
out := make([]*pkg.CheckResult, 0, len(in))
405+
406+
for _, result := range in {
407+
if !isSecretLookupRateLimitResult(result) {
408+
out = append(out, result)
409+
continue
410+
}
411+
412+
if secretLookupRateLimiter.Allow() {
413+
skipped++
414+
ctx.Warnf("skipping check result due to secret lookup rate limiting for check=%s (threshold=%d window=%s)", result.GetName(), defaultSecretLookupFailureThreshold, defaultSecretLookupFailureWindow.Round(time.Second))
415+
continue
416+
}
417+
418+
ctx.Warnf("secret lookup rate limit threshold exceeded for check=%s (threshold=%d window=%s), recording as failure", result.GetName(), defaultSecretLookupFailureThreshold, defaultSecretLookupFailureWindow.Round(time.Second))
419+
result.Invalid = false
420+
result.Pass = false
421+
if result.Error == "" {
422+
result.Error = "secret lookup repeatedly rate limited"
423+
}
424+
out = append(out, result)
425+
}
426+
427+
return skipped, out
428+
}
429+
359430
func ProcessResults(ctx *context.Context, results []*pkg.CheckResult) []*pkg.CheckResult {
360431
if ctx.Canary.Spec.ResultMode == "" {
361432
return results

cmd/run.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ var Run = &cobra.Command{
6767
go func() {
6868
defer wg.Done()
6969

70-
res, err := checks.RunChecks(apicontext.New(apicontext.DefaultContext.WithName(_config.Name), _config))
70+
res, _, err := checks.RunChecks(apicontext.New(apicontext.DefaultContext.WithName(_config.Name), _config))
7171
if err != nil {
7272
log.Errorf("error running checks: %v", err)
7373
return

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ require (
1919
github.com/elastic/go-elasticsearch/v8 v8.19.0
2020
github.com/flanksource/artifacts v1.0.18
2121
github.com/flanksource/commons v1.44.1
22-
github.com/flanksource/duty v1.0.1175
22+
github.com/flanksource/duty v1.0.1176
2323
github.com/flanksource/gomplate/v3 v3.24.66
2424
github.com/flanksource/is-healthy v1.0.82
2525
github.com/flanksource/kommons v0.31.7

go.sum

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1030,6 +1030,8 @@ github.com/flanksource/deps v1.0.23 h1:yamxNjZe/gbxxS60QWqhq+0X1CzJjDjxAm9WZjQ3p
10301030
github.com/flanksource/deps v1.0.23/go.mod h1:s0HIPy85aWYkLxAsTywMenz6N29VdEKrytzE/bP8o48=
10311031
github.com/flanksource/duty v1.0.1175 h1:FrQxOcEn8LlWc1dJNkk14/MbpaZlT+i3VVxXxFs6mZg=
10321032
github.com/flanksource/duty v1.0.1175/go.mod h1:rirTJLPHps66SNz9erbriF3aHiSMb05x72361t7Gfsk=
1033+
github.com/flanksource/duty v1.0.1176 h1:pGgaUX8d1U6oe3upgND6LggqvULJul+a2jMuboD4G54=
1034+
github.com/flanksource/duty v1.0.1176/go.mod h1:rirTJLPHps66SNz9erbriF3aHiSMb05x72361t7Gfsk=
10331035
github.com/flanksource/gomplate/v3 v3.20.4/go.mod h1:27BNWhzzSjDed1z8YShO6W+z6G9oZXuxfNFGd/iGSdc=
10341036
github.com/flanksource/gomplate/v3 v3.24.66 h1:fTaN0s9t+YZCau+KlgcLn9pMcLTsSiMjBnZUbhGY/oY=
10351037
github.com/flanksource/gomplate/v3 v3.24.66/go.mod h1:PiYJOAk971BpG/suhFP9YAZSjfz4KiRaqwYlQZZJp0Q=

pkg/api/run_now.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ func RunCanaryHandler(c echo.Context) error {
4949
}
5050

5151
ctx := context.New(duty, *canary)
52-
results, err := checks.RunChecks(ctx)
52+
results, _, err := checks.RunChecks(ctx)
5353
if err != nil {
5454
return errorResponse(c, err, http.StatusInternalServerError)
5555
}

pkg/jobs/canary/canary_jobs.go

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package canary
33
import (
44
"errors"
55
"fmt"
6+
"math/rand"
67
"sync"
78
"time"
89

@@ -101,11 +102,15 @@ func (j CanaryJob) Run(ctx dutyjob.JobRuntime) error {
101102
attribute.String("canary.namespace", j.Canary.Namespace),
102103
)
103104

104-
results, err := checks.RunChecks(canaryCtx)
105+
results, runMeta, err := checks.RunChecks(canaryCtx)
105106
if err != nil {
106107
return err
107108
}
108109

110+
if runMeta.SecretLookupRateLimitSkipped > 0 {
111+
maybeRescheduleAfterSecretLookupRateLimit(ctx.Context, j.DBCanary, j.Canary, runMeta.SecretLookupRateLimitSkipped)
112+
}
113+
109114
// Get transformed checks before and after, and then delete the olds ones that are not in new set.
110115
// NOTE: Webhook checks, although they are transformed, are handled entirely by the webhook caller
111116
// and should not be deleted manually in here.
@@ -164,6 +169,28 @@ func (j CanaryJob) Run(ctx dutyjob.JobRuntime) error {
164169
return nil
165170
}
166171

172+
func maybeRescheduleAfterSecretLookupRateLimit(ctx context.Context, dbCanary pkg.Canary, canary v1.Canary, skipped int) {
173+
nextRuntime, err := canary.NextRuntime(time.Now())
174+
if err != nil || nextRuntime == nil {
175+
ctx.Warnf("failed to compute next runtime for canary[%s/%s]: %v", canary.Namespace, canary.Name, err)
176+
return
177+
}
178+
179+
if time.Until(*nextRuntime) <= 5*time.Minute {
180+
return
181+
}
182+
183+
rng := rand.New(rand.NewSource(time.Now().UnixNano()))
184+
delay := time.Minute + time.Duration(rng.Intn(60))*time.Second
185+
runAt := time.Now().Add(delay)
186+
if err := TriggerAt(ctx, dbCanary, runAt); err != nil {
187+
ctx.Warnf("failed to schedule earlier retry for canary[%s/%s] after %d skipped results: %v", canary.Namespace, canary.Name, skipped, err)
188+
return
189+
}
190+
191+
ctx.Infof("scheduled earlier retry for canary[%s/%s] in %s after %d secret lookup rate-limited skips", canary.Namespace, canary.Name, delay.Round(time.Second), skipped)
192+
}
193+
167194
func SaveResults(ctx context.Context, results []*pkg.CheckResult) ([]string, map[string]string, error) {
168195
var transformedChecksCreated []string
169196
// Transformed checks have a delete strategy

pkg/jobs/canary/status.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,10 @@ func UpdateCanaryStatusAndEvent(ctx context.Context, canary v1.Canary, results [
2222
return
2323
}
2424

25+
if len(results) == 0 {
26+
return
27+
}
28+
2529
// Skip function if canary is not sourced from Kubernetes CRD
2630
if !strings.HasPrefix(canary.Annotations["source"], "kubernetes") {
2731
return

pkg/topology/run.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -220,7 +220,7 @@ func lookup(ctx *ComponentContext, name string, spec v1.CanarySpec) ([]interface
220220
// canaryCtx.Environment = ctx.
221221
// canaryCtx.Logger = ctx.Logger
222222

223-
checkResults, err := checks.Exec(canaryCtx)
223+
checkResults, err := checks.RunChecksNoPersistence(canaryCtx)
224224
if err != nil {
225225
return nil, err
226226
}

0 commit comments

Comments
 (0)