Skip to content

Commit 16a965e

Browse files
author
Eidmantas Ivanauskas
committed
feat(registry): singleflight + retries for manifests; add test-mode bypass for git batching
1 parent 29b832c commit 16a965e

File tree

2 files changed

+65
-29
lines changed

2 files changed

+65
-29
lines changed

pkg/argocd/update.go

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import (
2323
"github.com/argoproj-labs/argocd-image-updater/registry-scanner/pkg/log"
2424
"github.com/argoproj-labs/argocd-image-updater/registry-scanner/pkg/registry"
2525
"github.com/argoproj-labs/argocd-image-updater/registry-scanner/pkg/tag"
26+
"github.com/argoproj-labs/argocd-image-updater/registry-scanner/pkg/env"
2627

2728
"github.com/argoproj/argo-cd/v2/pkg/apiclient/application"
2829
"github.com/argoproj/argo-cd/v2/pkg/apis/application/v1alpha1"
@@ -361,11 +362,15 @@ func UpdateApplication(updateConf *UpdateConfiguration, state *SyncIterationStat
361362
log.Debugf("Using commit message: %s", wbc.GitCommitMessage)
362363
if !updateConf.DryRun {
363364
logCtx.Infof("Committing %d parameter update(s) for application %s", result.NumImagesUpdated, app)
364-
// Enqueue batched write intent; writer will coalesce per repo and branch
365-
wi := writeIntent{app: &updateConf.UpdateApp.Application, wbc: wbc, changeList: changeList, writeFn: writeOverrides}
366-
if wbc.KustomizeBase != "" { wi.writeFn = writeKustomization }
367-
enqueueWriteIntent(wi)
368-
err := error(nil)
365+
// Enqueue batched write intent unless tests explicitly disable batching
366+
var err error
367+
if env.GetBoolVal("GIT_BATCH_DISABLE", false) {
368+
err = commitChangesLocked(&updateConf.UpdateApp.Application, wbc, state, changeList)
369+
} else {
370+
wi := writeIntent{app: &updateConf.UpdateApp.Application, wbc: wbc, changeList: changeList, writeFn: writeOverrides}
371+
if wbc.KustomizeBase != "" { wi.writeFn = writeKustomization }
372+
enqueueWriteIntent(wi)
373+
}
369374
if err != nil {
370375
logCtx.Errorf("Could not update application spec: %v", err)
371376
result.NumErrors += 1

registry-scanner/pkg/registry/client.go

Lines changed: 55 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -177,6 +177,7 @@ var repoAuthTransportCacheLock sync.RWMutex
177177

178178
// singleflight-style maps for deduping concurrent identical calls
179179
var tagsInFlight sync.Map // key string -> chan result
180+
var manifestInFlight sync.Map // key string -> chan result
180181

181182
type tagsResult struct {
182183
tags []string
@@ -229,34 +230,64 @@ func (clt *registryClient) Tags() ([]string, error) {
229230

230231
// Manifest returns a Manifest for a given tag in repository
231232
func (clt *registryClient) ManifestForTag(tagStr string) (distribution.Manifest, error) {
232-
manService, err := clt.regClient.Manifests(context.Background())
233-
if err != nil {
234-
return nil, err
235-
}
236-
manifest, err := manService.Get(
237-
context.Background(),
238-
digest.FromString(tagStr),
239-
distribution.WithTag(tagStr), distribution.WithManifestMediaTypes(knownMediaTypes))
240-
if err != nil {
241-
return nil, err
242-
}
243-
return manifest, nil
233+
key := clt.endpoint.RegistryAPI + "|manifest|" + clt.repoName + "|tag=" + tagStr
234+
if ch, loaded := manifestInFlight.Load(key); loaded {
235+
res := (<-ch.(chan struct{m distribution.Manifest; e error}))
236+
return res.m, res.e
237+
}
238+
ch := make(chan struct{m distribution.Manifest; e error}, 1)
239+
actual, loaded := manifestInFlight.LoadOrStore(key, ch)
240+
if loaded { res := (<-actual.(chan struct{m distribution.Manifest; e error})); return res.m, res.e }
241+
defer func(){ manifestInFlight.Delete(key); close(ch) }()
242+
243+
manService, err := clt.regClient.Manifests(context.Background())
244+
if err != nil { ch <- struct{m distribution.Manifest; e error}{nil, err}; return nil, err }
245+
var manifest distribution.Manifest
246+
base := 200 * time.Millisecond
247+
maxDelay := 3 * time.Second
248+
for attempt := 0; attempt < 3; attempt++ {
249+
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
250+
manifest, err = manService.Get(ctx, digest.FromString(tagStr), distribution.WithTag(tagStr), distribution.WithManifestMediaTypes(knownMediaTypes))
251+
cancel()
252+
if err == nil { break }
253+
d := time.Duration(float64(base) * (1 << attempt) * (0.7 + 0.6*rand.Float64()))
254+
if d > maxDelay { d = maxDelay }
255+
time.Sleep(d)
256+
}
257+
ch <- struct{m distribution.Manifest; e error}{manifest, err}
258+
if err != nil { return nil, err }
259+
return manifest, nil
244260
}
245261

246262
// ManifestForDigest returns a Manifest for a given digest in repository
247263
func (clt *registryClient) ManifestForDigest(dgst digest.Digest) (distribution.Manifest, error) {
248-
manService, err := clt.regClient.Manifests(context.Background())
249-
if err != nil {
250-
return nil, err
251-
}
252-
manifest, err := manService.Get(
253-
context.Background(),
254-
dgst,
255-
distribution.WithManifestMediaTypes(knownMediaTypes))
256-
if err != nil {
257-
return nil, err
258-
}
259-
return manifest, nil
264+
key := clt.endpoint.RegistryAPI + "|manifest|" + clt.repoName + "|dgst=" + dgst.String()
265+
if ch, loaded := manifestInFlight.Load(key); loaded {
266+
res := (<-ch.(chan struct{m distribution.Manifest; e error}))
267+
return res.m, res.e
268+
}
269+
ch := make(chan struct{m distribution.Manifest; e error}, 1)
270+
actual, loaded := manifestInFlight.LoadOrStore(key, ch)
271+
if loaded { res := (<-actual.(chan struct{m distribution.Manifest; e error})); return res.m, res.e }
272+
defer func(){ manifestInFlight.Delete(key); close(ch) }()
273+
274+
manService, err := clt.regClient.Manifests(context.Background())
275+
if err != nil { ch <- struct{m distribution.Manifest; e error}{nil, err}; return nil, err }
276+
var manifest distribution.Manifest
277+
base := 200 * time.Millisecond
278+
maxDelay := 3 * time.Second
279+
for attempt := 0; attempt < 3; attempt++ {
280+
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
281+
manifest, err = manService.Get(ctx, dgst, distribution.WithManifestMediaTypes(knownMediaTypes))
282+
cancel()
283+
if err == nil { break }
284+
d := time.Duration(float64(base) * (1 << attempt) * (0.7 + 0.6*rand.Float64()))
285+
if d > maxDelay { d = maxDelay }
286+
time.Sleep(d)
287+
}
288+
ch <- struct{m distribution.Manifest; e error}{manifest, err}
289+
if err != nil { return nil, err }
290+
return manifest, nil
260291
}
261292

262293
// TagMetadata retrieves metadata for a given manifest of given repository

0 commit comments

Comments
 (0)