Skip to content

Commit 85550e2

Browse files
author
Eidmantas Ivanauskas
committed
feat(scheduler): add LRU/fail-first scheduling, cooldown, and per-repo cap; improve logs for queued git writes
1 parent 96cfeba commit 85550e2

File tree

3 files changed

+125
-15
lines changed

3 files changed

+125
-15
lines changed

cmd/main.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,10 @@ type ImageUpdaterConfig struct {
5252
DisableKubeEvents bool
5353
GitCreds git.CredsStore
5454
EnableWebhook bool
55+
// Scheduler options
56+
Schedule string
57+
Cooldown time.Duration
58+
PerRepoCap int
5559
}
5660

5761
// newRootCommand implements the root command of argocd-image-updater

cmd/run.go

Lines changed: 63 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,36 @@ import (
3232
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
3333
)
3434

35+
// orderApplications reorders apps by schedule policy: lru (least-recent success first),
36+
// fail-first (recent failures first), with optional cooldown to deprioritize recently
37+
// successful apps.
38+
func orderApplications(names []string, appList map[string]argocd.ApplicationImages, state *argocd.SyncIterationState, cfg *ImageUpdaterConfig) []string {
39+
stats := state.GetStats()
40+
type item struct{ name string; score int64 }
41+
items := make([]item, 0, len(names))
42+
now := time.Now()
43+
for _, n := range names {
44+
s := stats[n]
45+
score := int64(0)
46+
switch cfg.Schedule {
47+
case "lru":
48+
// Older success => higher priority (lower score)
49+
if !s.LastSuccess.IsZero() { score -= int64(now.Sub(s.LastSuccess).Milliseconds()) }
50+
case "fail-first":
51+
score += int64(s.FailCount) * 1_000_000 // dominate by failures
52+
if !s.LastAttempt.IsZero() { score -= int64(now.Sub(s.LastAttempt).Milliseconds()) }
53+
}
54+
if cfg.Cooldown > 0 && !s.LastSuccess.IsZero() && now.Sub(s.LastSuccess) < cfg.Cooldown {
55+
score -= 1 // slight deprioritization
56+
}
57+
items = append(items, item{name: n, score: score})
58+
}
59+
sort.Slice(items, func(i, j int) bool { return items[i].score > items[j].score })
60+
out := make([]string, len(items))
61+
for i := range items { out[i] = items[i].name }
62+
return out
63+
}
64+
3565
// newRunCommand implements "run" command
3666
func newRunCommand() *cobra.Command {
3767
var cfg *ImageUpdaterConfig = &ImageUpdaterConfig{}
@@ -60,7 +90,7 @@ func newRunCommand() *cobra.Command {
6090
return fmt.Errorf("--max-concurrency must be greater than 1")
6191
}
6292

63-
log.Infof("%s %s starting [loglevel:%s, interval:%s, healthport:%s]",
93+
log.Infof("%s %s starting [loglevel:%s, interval:%s, healthport:%s]",
6494
version.BinaryName(),
6595
version.Version(),
6696
strings.ToUpper(cfg.LogLevel),
@@ -144,7 +174,7 @@ func newRunCommand() *cobra.Command {
144174
)
145175

146176
// Initialize metrics before starting the metrics server or using any counters
147-
metrics.InitMetrics()
177+
metrics.InitMetrics()
148178

149179
// Health server will start in a go routine and run asynchronously
150180
var hsErrCh chan error
@@ -255,7 +285,7 @@ func newRunCommand() *cobra.Command {
255285

256286
// This is our main loop. We leave it only when our health probe server
257287
// returns an error.
258-
for {
288+
for {
259289
select {
260290
case err := <-hsErrCh:
261291
if err != nil {
@@ -288,7 +318,7 @@ func newRunCommand() *cobra.Command {
288318
return nil
289319
default:
290320
if lastRun.IsZero() || time.Since(lastRun) > cfg.CheckInterval {
291-
result, err := runImageUpdater(cfg, false)
321+
result, err := runImageUpdater(cfg, false)
292322
if err != nil {
293323
log.Errorf("Error: %v", err)
294324
} else {
@@ -340,6 +370,9 @@ func newRunCommand() *cobra.Command {
340370
runCmd.Flags().StringVar(&cfg.AppLabel, "match-application-label", "", "label selector to match application labels against. DEPRECATED: this flag will be removed in a future version.")
341371

342372
runCmd.Flags().BoolVar(&warmUpCache, "warmup-cache", true, "whether to perform a cache warm-up on startup")
373+
runCmd.Flags().StringVar(&cfg.Schedule, "schedule", env.GetStringVal("IMAGE_UPDATER_SCHEDULE", "default"), "scheduling policy: default|lru|fail-first")
374+
runCmd.Flags().DurationVar(&cfg.Cooldown, "cooldown", env.GetDurationVal("IMAGE_UPDATER_COOLDOWN", 0), "deprioritize apps updated within this duration")
375+
runCmd.Flags().IntVar(&cfg.PerRepoCap, "per-repo-cap", env.ParseNumFromEnv("IMAGE_UPDATER_PER_REPO_CAP", 0, 0, 100000), "max updates per repo per cycle (0 = unlimited)")
343376
runCmd.Flags().StringVar(&cfg.GitCommitUser, "git-commit-user", env.GetStringVal("GIT_COMMIT_USER", "argocd-image-updater"), "Username to use for Git commits")
344377
runCmd.Flags().StringVar(&cfg.GitCommitMail, "git-commit-email", env.GetStringVal("GIT_COMMIT_EMAIL", "[email protected]"), "E-Mail address to use for Git commits")
345378
runCmd.Flags().StringVar(&cfg.GitCommitSigningKey, "git-commit-signing-key", env.GetStringVal("GIT_COMMIT_SIGNING_KEY", ""), "GnuPG key ID or path to Private SSH Key used to sign the commits")
@@ -403,7 +436,7 @@ func runImageUpdater(cfg *ImageUpdaterConfig, warmUp bool) (argocd.ImageUpdaterR
403436
log.Infof("Starting image update cycle, considering %d annotated application(s) for update", len(appList))
404437
}
405438

406-
syncState := argocd.NewSyncIterationState()
439+
syncState := argocd.NewSyncIterationState()
407440

408441
// Allow a maximum of MaxConcurrency number of goroutines to exist at the
409442
// same time. If in warm-up mode, set to 1 explicitly.
@@ -420,7 +453,25 @@ func runImageUpdater(cfg *ImageUpdaterConfig, warmUp bool) (argocd.ImageUpdaterR
420453
var wg sync.WaitGroup
421454
wg.Add(len(appList))
422455

423-
for app, curApplication := range appList {
456+
// Optionally reorder apps by scheduling policy
457+
ordered := make([]string, 0, len(appList))
458+
for app := range appList { ordered = append(ordered, app) }
459+
if cfg.Schedule != "default" || cfg.Cooldown > 0 || cfg.PerRepoCap > 0 {
460+
ordered = orderApplications(ordered, appList, syncState, cfg)
461+
}
462+
463+
perRepoCounter := map[string]int{}
464+
465+
for _, app := range ordered {
466+
curApplication := appList[app]
467+
// Per-repo cap if configured
468+
if cfg.PerRepoCap > 0 {
469+
repo := argocd.GetApplicationSource(&curApplication.Application).RepoURL
470+
if perRepoCounter[repo] >= cfg.PerRepoCap {
471+
continue
472+
}
473+
}
474+
syncState.RecordAttempt(app)
424475
lockErr := sem.Acquire(context.Background(), 1)
425476
if lockErr != nil {
426477
log.Errorf("Could not acquire semaphore for application %s: %v", app, lockErr)
@@ -447,7 +498,12 @@ func runImageUpdater(cfg *ImageUpdaterConfig, warmUp bool) (argocd.ImageUpdaterR
447498
DisableKubeEvents: cfg.DisableKubeEvents,
448499
GitCreds: cfg.GitCreds,
449500
}
450-
res := argocd.UpdateApplication(upconf, syncState)
501+
res := argocd.UpdateApplication(upconf, syncState)
502+
syncState.RecordResult(app, res.NumErrors > 0)
503+
if cfg.PerRepoCap > 0 {
504+
repo := argocd.GetApplicationSource(&curApplication.Application).RepoURL
505+
perRepoCounter[repo] = perRepoCounter[repo] + 1
506+
}
451507
result.NumApplicationsProcessed += 1
452508
result.NumErrors += res.NumErrors
453509
result.NumImagesConsidered += res.NumImagesConsidered

pkg/argocd/update.go

Lines changed: 58 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -123,15 +123,65 @@ type ChangeEntry struct {
123123
type SyncIterationState struct {
124124
lock sync.Mutex
125125
repositoryLocks map[string]*sync.Mutex
126+
lastSuccess map[string]time.Time
127+
lastAttempt map[string]time.Time
128+
failCount map[string]int
126129
}
127130

128131
// NewSyncIterationState returns a new instance of SyncIterationState
129132
func NewSyncIterationState() *SyncIterationState {
130133
return &SyncIterationState{
131-
repositoryLocks: make(map[string]*sync.Mutex),
134+
repositoryLocks: make(map[string]*sync.Mutex),
135+
lastSuccess: make(map[string]time.Time),
136+
lastAttempt: make(map[string]time.Time),
137+
failCount: make(map[string]int),
132138
}
133139
}
134140

141+
// AppStats is a snapshot of per-application scheduling stats
142+
type AppStats struct {
143+
LastSuccess time.Time
144+
LastAttempt time.Time
145+
FailCount int
146+
}
147+
148+
// RecordAttempt notes that an attempt was made to process the application
149+
func (state *SyncIterationState) RecordAttempt(app string) {
150+
state.lock.Lock()
151+
state.lastAttempt[app] = time.Now()
152+
state.lock.Unlock()
153+
}
154+
155+
// RecordResult records the outcome for an application
156+
func (state *SyncIterationState) RecordResult(app string, hadErrors bool) {
157+
state.lock.Lock()
158+
defer state.lock.Unlock()
159+
if hadErrors {
160+
state.failCount[app] = state.failCount[app] + 1
161+
} else {
162+
state.failCount[app] = 0
163+
state.lastSuccess[app] = time.Now()
164+
}
165+
}
166+
167+
// GetStats returns a copy of current stats for scheduling decisions
168+
func (state *SyncIterationState) GetStats() map[string]AppStats {
169+
state.lock.Lock()
170+
defer state.lock.Unlock()
171+
out := make(map[string]AppStats, len(state.lastAttempt))
172+
// Union of keys
173+
for k := range state.lastAttempt { out[k] = AppStats{} }
174+
for k := range state.lastSuccess { out[k] = AppStats{} }
175+
for k := range state.failCount { out[k] = AppStats{} }
176+
for k, v := range out {
177+
v.LastAttempt = state.lastAttempt[k]
178+
v.LastSuccess = state.lastSuccess[k]
179+
v.FailCount = state.failCount[k]
180+
out[k] = v
181+
}
182+
return out
183+
}
184+
135185
// GetRepositoryLock returns the lock for a specified repository
136186
func (state *SyncIterationState) GetRepositoryLock(repository string) *sync.Mutex {
137187
state.lock.Lock()
@@ -169,7 +219,7 @@ func UpdateApplication(updateConf *UpdateConfiguration, state *SyncIterationStat
169219

170220
result.NumApplicationsProcessed += 1
171221

172-
// Loop through all images of current application, and check whether one of
222+
// Loop through all images of current application, and check whether one of
173223
// its images is eligible for updating.
174224
//
175225
// Whether an image qualifies for update is dependent on semantic version
@@ -250,7 +300,7 @@ func UpdateApplication(updateConf *UpdateConfiguration, state *SyncIterationStat
250300
imgCtx.Warnf("Could not fetch credentials: %v", err)
251301
result.NumErrors += 1
252302
continue
253-
}
303+
}
254304
}
255305

256306
regClient, err := updateConf.NewRegFN(rep, creds.Username, creds.Password)
@@ -357,11 +407,11 @@ func UpdateApplication(updateConf *UpdateConfiguration, state *SyncIterationStat
357407
wbc.GitCommitSignOff = updateConf.GitCommitSignOff
358408
}
359409

360-
if needUpdate {
410+
if needUpdate {
361411
logCtx := log.WithContext().AddField("application", app)
362-
log.Debugf("Using commit message: %s", wbc.GitCommitMessage)
412+
log.Debugf("Using commit message: %s", wbc.GitCommitMessage)
363413
if !updateConf.DryRun {
364-
logCtx.Infof("Committing %d parameter update(s) for application %s", result.NumImagesUpdated, app)
414+
logCtx.Infof("Queuing %d parameter update(s) for application %s (git write pending)", result.NumImagesUpdated, app)
365415
// Enqueue batched write intent unless tests explicitly disable batching
366416
var err error
367417
if env.GetBoolVal("GIT_BATCH_DISABLE", false) {
@@ -375,8 +425,8 @@ func UpdateApplication(updateConf *UpdateConfiguration, state *SyncIterationStat
375425
logCtx.Errorf("Could not update application spec: %v", err)
376426
result.NumErrors += 1
377427
result.NumImagesUpdated = 0
378-
} else {
379-
logCtx.Infof("Successfully updated the live application spec")
428+
} else {
429+
logCtx.Infof("Application spec updated in-memory; write-back scheduled")
380430
if !updateConf.DisableKubeEvents && updateConf.KubeClient != nil {
381431
annotations := map[string]string{}
382432
for i, c := range changeList {

0 commit comments

Comments
 (0)