Skip to content

Commit a9b8f95

Browse files
authored
feat: implement repository locking mechanism in webhook and polling handlers (#506)
Add a per-repository lock mechanism to prevent concurrent jobs for the same repository. If a job is already running, new webhook requests receive an immediate HTTP 429 response and poll events get skipped, ensuring only one job per repository runs at a time.
1 parent 2228432 commit a9b8f95

File tree

2 files changed

+42
-10
lines changed

2 files changed

+42
-10
lines changed

cmd/doco-cd/http_handler.go

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -398,6 +398,8 @@ func (h *handlerData) WebhookHandler(w http.ResponseWriter, r *http.Request) {
398398

399399
jobLog.Debug("received webhook event")
400400

401+
repoName := "unknown"
402+
401403
// Limit the request body size
402404
r.Body = http.MaxBytesReader(w, r.Body, h.appConfig.MaxPayloadSize)
403405

@@ -426,11 +428,26 @@ func (h *handlerData) WebhookHandler(w http.ResponseWriter, r *http.Request) {
426428
statusCode = http.StatusInternalServerError
427429
}
428430

429-
onError(getRepoName(payload.CloneURL), w, jobLog.With(slog.String("ip", r.RemoteAddr), logger.ErrAttr(err)), errMsg, err.Error(), jobID, statusCode)
431+
if payload.CloneURL != "" {
432+
repoName = getRepoName(payload.CloneURL)
433+
}
434+
435+
onError(repoName, w, jobLog.With(slog.String("ip", r.RemoteAddr), logger.ErrAttr(err)), errMsg, err.Error(), jobID, statusCode)
436+
437+
return
438+
}
430439

440+
repoName = getRepoName(payload.CloneURL)
441+
lock := getRepoLock(repoName)
442+
locked := lock.TryLock()
443+
444+
if !locked {
445+
onError(repoName, w, jobLog, "Another job is still in progress for this repository", nil, jobID, http.StatusTooManyRequests)
431446
return
432447
}
433448

449+
defer lock.Unlock()
450+
434451
HandleEvent(ctx, jobLog, w, h.appConfig, h.dataMountPoint, payload, customTarget, jobID, h.dockerCli, h.dockerClient)
435452
}
436453

cmd/doco-cd/poll_handler.go

Lines changed: 24 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,15 @@ import (
3030
var (
3131
ErrNotManagedByDocoCD = errors.New("stack is not managed by doco-cd")
3232
ErrDeploymentConflict = errors.New("another stack with the same name already exists and is not managed by this repository")
33+
repoLocks sync.Map // Map to hold locks for each repository
3334
)
3435

36+
// getRepoLock retrieves a mutex lock for the given repository name.
37+
func getRepoLock(repoName string) *sync.Mutex {
38+
lockIface, _ := repoLocks.LoadOrStore(repoName, &sync.Mutex{})
39+
return lockIface.(*sync.Mutex)
40+
}
41+
3542
// StartPoll initializes PollJob with the provided configuration and starts the PollHandler goroutine.
3643
func StartPoll(h *handlerData, pollConfig config.PollConfig, wg *sync.WaitGroup) error {
3744
if pollConfig.Interval == 0 {
@@ -61,18 +68,28 @@ func StartPoll(h *handlerData, pollConfig config.PollConfig, wg *sync.WaitGroup)
6168

6269
// PollHandler is a function that handles polling for changes in a repository.
6370
func (h *handlerData) PollHandler(pollJob *config.PollJob) {
64-
logger := h.log.With()
71+
repoName := getRepoName(string(pollJob.Config.CloneUrl))
6572

73+
logger := h.log.With(slog.String("repository", repoName))
6674
logger.Debug("Start poll handler")
6775

76+
lock := getRepoLock(repoName)
77+
6878
for {
6979
if pollJob.LastRun == 0 || time.Now().Unix() >= pollJob.NextRun {
70-
repoName := getRepoName(string(pollJob.Config.CloneUrl))
71-
logger.Debug("Running poll for repository", slog.String("repoName", repoName))
80+
locked := lock.TryLock()
7281

73-
err := RunPoll(context.Background(), pollJob.Config, h.appConfig, h.dataMountPoint, h.dockerCli, h.dockerClient, logger)
74-
if err != nil {
75-
prometheus.PollErrors.WithLabelValues(repoName).Inc()
82+
if !locked {
83+
h.log.Info("Another poll job is still in progress, skipping this run")
84+
} else {
85+
logger.Debug("Start poll job")
86+
87+
err := RunPoll(context.Background(), pollJob.Config, h.appConfig, h.dataMountPoint, h.dockerCli, h.dockerClient, logger)
88+
if err != nil {
89+
prometheus.PollErrors.WithLabelValues(repoName).Inc()
90+
}
91+
92+
lock.Unlock()
7693
}
7794

7895
pollJob.NextRun = time.Now().Unix() + int64(pollJob.Config.Interval)
@@ -91,9 +108,7 @@ func RunPoll(ctx context.Context, pollConfig config.PollConfig, appConfig *confi
91108
cloneUrl := string(pollConfig.CloneUrl)
92109
jobID := uuid.Must(uuid.NewRandom()).String()
93110
repoName := getRepoName(cloneUrl)
94-
jobLog := logger.With(
95-
slog.String("repository", repoName),
96-
slog.String("job_id", jobID))
111+
jobLog := logger.With(slog.String("job_id", jobID))
97112

98113
if strings.Contains(repoName, "..") {
99114
jobLog.Error("invalid repository name, contains '..'")

0 commit comments

Comments
 (0)