Skip to content

Commit 29b832c

Browse files
author
Eidmantas Ivanauskas
committed
feat(git): add per-repo batched writer with multi-branch grouping; enqueue write intents; keep compute parallel
1 parent 9920705 commit 29b832c

File tree

2 files changed

+148
-1
lines changed

2 files changed

+148
-1
lines changed

pkg/argocd/git.go

Lines changed: 143 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import (
1111
"path/filepath"
1212
"text/template"
1313
"sync"
14+
"time"
1415

1516
"sigs.k8s.io/kustomize/api/konfig"
1617
"sigs.k8s.io/kustomize/api/types"
@@ -20,6 +21,7 @@ import (
2021

2122
"github.com/argoproj-labs/argocd-image-updater/pkg/common"
2223
"github.com/argoproj-labs/argocd-image-updater/registry-scanner/pkg/image"
24+
"github.com/argoproj-labs/argocd-image-updater/registry-scanner/pkg/env"
2325

2426
"github.com/argoproj-labs/argocd-image-updater/ext/git"
2527
"github.com/argoproj-labs/argocd-image-updater/registry-scanner/pkg/log"
@@ -142,6 +144,147 @@ func getRepoMutex(repo string) *sync.Mutex {
142144
return actual.(*sync.Mutex)
143145
}
144146

147+
// -----------------------
148+
// Batched repo writer
149+
// -----------------------
150+
151+
type writeIntent struct {
152+
app *v1alpha1.Application
153+
wbc *WriteBackConfig
154+
changeList []ChangeEntry
155+
writeFn changeWriter
156+
}
157+
158+
type repoWriter struct {
159+
repoURL string
160+
intentsCh chan writeIntent
161+
flushEvery time.Duration
162+
maxBatch int
163+
stopCh chan struct{}
164+
}
165+
166+
var writers sync.Map // map[string]*repoWriter
167+
168+
func getOrCreateWriter(repo string) *repoWriter {
169+
if v, ok := writers.Load(repo); ok {
170+
return v.(*repoWriter)
171+
}
172+
rw := &repoWriter{
173+
repoURL: repo,
174+
intentsCh: make(chan writeIntent, 1024),
175+
flushEvery: env.GetDurationVal("GIT_BATCH_FLUSH_INTERVAL", 2*time.Second),
176+
maxBatch: env.ParseNumFromEnv("GIT_BATCH_MAX", 10, 1, 1000),
177+
stopCh: make(chan struct{}),
178+
}
179+
go rw.loop()
180+
actual, _ := writers.LoadOrStore(repo, rw)
181+
return actual.(*repoWriter)
182+
}
183+
184+
func (rw *repoWriter) loop() {
185+
ticker := time.NewTicker(rw.flushEvery)
186+
defer ticker.Stop()
187+
batch := make([]writeIntent, 0, rw.maxBatch)
188+
flush := func() { if len(batch) > 0 { rw.flushBatch(batch); batch = batch[:0] } }
189+
for {
190+
select {
191+
case wi := <-rw.intentsCh:
192+
batch = append(batch, wi)
193+
if len(batch) >= rw.maxBatch { flush() }
194+
case <-ticker.C:
195+
flush()
196+
case <-rw.stopCh:
197+
flush(); return
198+
}
199+
}
200+
}
201+
202+
func (rw *repoWriter) flushBatch(batch []writeIntent) {
203+
// Group intents by resolved push branch to avoid mixing branches
204+
byBranch := map[string][]writeIntent{}
205+
for _, wi := range batch {
206+
branch := getWriteBackBranch(wi.app)
207+
if wi.wbc.GitWriteBranch != "" {
208+
// honor template-derived write branch if set already on wbc
209+
branch = wi.wbc.GitWriteBranch
210+
}
211+
byBranch[branch] = append(byBranch[branch], wi)
212+
}
213+
for branch, intents := range byBranch {
214+
rw.commitBatch(branch, intents)
215+
}
216+
}
217+
218+
func (rw *repoWriter) commitBatch(branch string, intents []writeIntent) {
219+
if len(intents) == 0 { return }
220+
// Use creds and identity from first intent
221+
first := intents[0]
222+
logCtx := log.WithContext().AddField("repository", rw.repoURL)
223+
224+
creds, err := first.wbc.GetCreds(first.app)
225+
if err != nil { logCtx.Errorf("could not get creds: %v", err); return }
226+
227+
tempRoot, err := os.MkdirTemp(os.TempDir(), "git-batch-")
228+
if err != nil { logCtx.Errorf("temp dir: %v", err); return }
229+
defer func(){ _ = os.RemoveAll(tempRoot) }()
230+
231+
gitC, err := git.NewClientExt(rw.repoURL, tempRoot, creds, false, false, "")
232+
if err != nil { logCtx.Errorf("git client: %v", err); return }
233+
if err = gitC.Init(); err != nil { logCtx.Errorf("git init: %v", err); return }
234+
235+
// Resolve checkout and push branch similarly to commitChangesGit
236+
checkOutBranch := getWriteBackBranch(first.app)
237+
if first.wbc.GitBranch != "" { checkOutBranch = first.wbc.GitBranch }
238+
if checkOutBranch == "" || checkOutBranch == "HEAD" {
239+
b, err := gitC.SymRefToBranch(checkOutBranch)
240+
if err != nil { logCtx.Errorf("resolve branch: %v", err); return }
241+
checkOutBranch = b
242+
}
243+
pushBranch := branch
244+
245+
// Ensure the branch exists locally
246+
if pushBranch != checkOutBranch {
247+
if err := gitC.ShallowFetch(pushBranch, 1); err != nil {
248+
if err2 := gitC.ShallowFetch(checkOutBranch, 1); err2 != nil { logCtx.Errorf("fetch: %v", err2); return }
249+
if err := gitC.Branch(checkOutBranch, pushBranch); err != nil { logCtx.Errorf("branch: %v", err); return }
250+
}
251+
} else {
252+
if err := gitC.ShallowFetch(checkOutBranch, 1); err != nil { logCtx.Errorf("fetch: %v", err); return }
253+
}
254+
if err := gitC.Checkout(pushBranch, false); err != nil { logCtx.Errorf("checkout: %v", err); return }
255+
256+
// Apply writes for each intent using shared repo
257+
combinedChanges := 0
258+
for _, wi := range intents {
259+
if wi.wbc.GitCommitUser != "" && wi.wbc.GitCommitEmail != "" {
260+
_ = gitC.Config(wi.wbc.GitCommitUser, wi.wbc.GitCommitEmail)
261+
}
262+
if err, skip := wi.writeFn(wi.app, wi.wbc, gitC); err != nil {
263+
logCtx.Errorf("write failed for app %s: %v", wi.app.GetName(), err)
264+
continue
265+
} else if skip {
266+
continue
267+
}
268+
combinedChanges += len(wi.changeList)
269+
}
270+
if combinedChanges == 0 { return }
271+
272+
// Compose a commit message summarizing apps
273+
msg := "Update parameters for "
274+
for i, wi := range intents {
275+
if i > 0 { msg += ", " }
276+
msg += wi.app.GetName()
277+
}
278+
279+
commitOpts := &git.CommitOptions{ CommitMessageText: msg, SigningKey: first.wbc.GitCommitSigningKey, SigningMethod: first.wbc.GitCommitSigningMethod, SignOff: first.wbc.GitCommitSignOff }
280+
if err := gitC.Commit("", commitOpts); err != nil { logCtx.Errorf("commit: %v", err); return }
281+
if err := gitC.Push("origin", pushBranch, pushBranch != checkOutBranch); err != nil { logCtx.Errorf("push: %v", err); return }
282+
}
283+
284+
func enqueueWriteIntent(wi writeIntent) {
285+
getOrCreateWriter(wi.wbc.GitRepo).intentsCh <- wi
286+
}
287+
145288
// getWriteBackBranch returns the branch to use for write-back operations.
146289
// It first checks for a branch specified in annotations, then uses the
147290
// targetRevision from the matching git source, falling back to getApplicationSource.

pkg/argocd/update.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -361,7 +361,11 @@ func UpdateApplication(updateConf *UpdateConfiguration, state *SyncIterationStat
361361
log.Debugf("Using commit message: %s", wbc.GitCommitMessage)
362362
if !updateConf.DryRun {
363363
logCtx.Infof("Committing %d parameter update(s) for application %s", result.NumImagesUpdated, app)
364-
err := commitChangesLocked(&updateConf.UpdateApp.Application, wbc, state, changeList)
364+
// Enqueue batched write intent; writer will coalesce per repo and branch
365+
wi := writeIntent{app: &updateConf.UpdateApp.Application, wbc: wbc, changeList: changeList, writeFn: writeOverrides}
366+
if wbc.KustomizeBase != "" { wi.writeFn = writeKustomization }
367+
enqueueWriteIntent(wi)
368+
err := error(nil)
365369
if err != nil {
366370
logCtx.Errorf("Could not update application spec: %v", err)
367371
result.NumErrors += 1

0 commit comments

Comments
 (0)