Skip to content

Commit 967831b

Browse files
committed
fix
1 parent 6679f53 commit 967831b

File tree

5 files changed

+75
-55
lines changed

5 files changed

+75
-55
lines changed

models/actions/run.go

Lines changed: 3 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -197,20 +197,13 @@ func UpdateRepoRunsNumbers(ctx context.Context, repo *repo_model.Repository) err
197197
// It's useful when a new run is triggered, and all previous runs needn't be continued anymore.
198198
func CancelPreviousJobs(ctx context.Context, repoID int64, ref, workflowID string, event webhook_module.HookEventType) error {
199199
// Find all runs in the specified repository, reference, and workflow with non-final status
200-
opts := &FindRunOptions{
200+
runs, total, err := db.FindAndCount[ActionRun](ctx, FindRunOptions{
201201
RepoID: repoID,
202202
Ref: ref,
203203
WorkflowID: workflowID,
204204
TriggerEvent: event,
205205
Status: []Status{StatusRunning, StatusWaiting, StatusBlocked},
206-
}
207-
return CancelPreviousJobsWithOpts(ctx, opts)
208-
}
209-
210-
// CancelPreviousJobs cancels all previous jobs with opts
211-
func CancelPreviousJobsWithOpts(ctx context.Context, opts *FindRunOptions) error {
212-
// Find all runs by opts
213-
runs, total, err := db.FindAndCount[ActionRun](ctx, opts)
206+
})
214207
if err != nil {
215208
return err
216209
}
@@ -376,12 +369,9 @@ func UpdateRun(ctx context.Context, run *ActionRun, cols ...string) error {
376369
type ActionRunIndex db.ResourceIndex
377370

378371
func ShouldBlockRunByConcurrency(ctx context.Context, actionRun *ActionRun) (bool, error) {
379-
if len(actionRun.ConcurrencyGroup) == 0 {
372+
if len(actionRun.ConcurrencyGroup) == 0 || actionRun.ConcurrencyCancel {
380373
return false, nil
381374
}
382-
if actionRun.ConcurrencyCancel {
383-
return false, CancelConcurrentRuns(ctx, actionRun)
384-
}
385375

386376
concurrentRunsNum, err := db.Count[ActionRun](ctx, &FindRunOptions{
387377
RepoID: actionRun.RepoID,
@@ -394,15 +384,3 @@ func ShouldBlockRunByConcurrency(ctx context.Context, actionRun *ActionRun) (boo
394384

395385
return concurrentRunsNum > 0, nil
396386
}
397-
398-
func CancelConcurrentRuns(ctx context.Context, actionRun *ActionRun) error {
399-
return CancelPreviousJobsWithOpts(ctx, &FindRunOptions{
400-
RepoID: actionRun.RepoID,
401-
ConcurrencyGroup: actionRun.ConcurrencyGroup,
402-
Status: []Status{
403-
StatusRunning,
404-
StatusWaiting,
405-
StatusBlocked,
406-
},
407-
})
408-
}

models/actions/run_job.go

Lines changed: 63 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -192,46 +192,85 @@ func AggregateJobStatus(jobs []*ActionRunJob) Status {
192192
}
193193
}
194194

195-
func ShouldBlockJobByConcurrency(ctx context.Context, actionRunJob *ActionRunJob) (bool, error) {
196-
if len(actionRunJob.RawConcurrencyGroup) == 0 {
195+
func ShouldBlockJobByConcurrency(ctx context.Context, job *ActionRunJob) (bool, error) {
196+
if len(job.RawConcurrencyGroup) == 0 {
197197
return false, nil
198198
}
199-
if !actionRunJob.IsConcurrencyEvaluated {
199+
if !job.IsConcurrencyEvaluated {
200200
return false, fmt.Errorf("the raw concurrency group has not been evaluated")
201201
}
202-
if len(actionRunJob.ConcurrencyGroup) == 0 {
202+
if len(job.ConcurrencyGroup) == 0 || job.ConcurrencyCancel {
203203
return false, nil
204204
}
205-
if actionRunJob.ConcurrencyCancel {
206-
return false, CancelConcurrentJobs(ctx, actionRunJob)
207-
}
208205

209206
concurrentJobsNum, err := db.Count[ActionRunJob](ctx, FindRunJobOptions{
210-
RepoID: actionRunJob.RepoID,
211-
ConcurrencyGroup: actionRunJob.ConcurrencyGroup,
207+
RepoID: job.RepoID,
208+
ConcurrencyGroup: job.ConcurrencyGroup,
212209
Statuses: []Status{StatusRunning, StatusWaiting},
213210
})
214211
if err != nil {
215212
return false, fmt.Errorf("count running and waiting jobs: %w", err)
216213
}
214+
if concurrentJobsNum > 0 {
215+
return true, nil
216+
}
217+
218+
if err := job.LoadRun(ctx); err != nil {
219+
return false, fmt.Errorf("load run: %w", err)
220+
}
217221

218-
return concurrentJobsNum > 0, nil
222+
return ShouldBlockRunByConcurrency(ctx, job.Run)
219223
}
220224

221-
func CancelConcurrentJobs(ctx context.Context, actionRunJob *ActionRunJob) error {
222-
// cancel previous jobs in the same concurrency group
223-
previousJobs, err := db.Find[ActionRunJob](ctx, FindRunJobOptions{
224-
RepoID: actionRunJob.RepoID,
225-
ConcurrencyGroup: actionRunJob.ConcurrencyGroup,
226-
Statuses: []Status{
227-
StatusRunning,
228-
StatusWaiting,
229-
StatusBlocked,
230-
},
231-
})
232-
if err != nil {
233-
return fmt.Errorf("find previous jobs: %w", err)
225+
func CancelPreviousJobsByConcurrency(ctx context.Context, job *ActionRunJob) error {
226+
if len(job.RawConcurrencyGroup) > 0 {
227+
if !job.IsConcurrencyEvaluated {
228+
return fmt.Errorf("the raw concurrency group has not been evaluated")
229+
}
230+
if len(job.ConcurrencyGroup) > 0 && job.ConcurrencyCancel {
231+
// cancel previous jobs in the same concurrency group
232+
previousJobs, err := db.Find[ActionRunJob](ctx, &FindRunJobOptions{
233+
RepoID: job.RepoID,
234+
ConcurrencyGroup: job.ConcurrencyGroup,
235+
Statuses: []Status{StatusRunning, StatusWaiting, StatusBlocked},
236+
})
237+
if err != nil {
238+
return fmt.Errorf("find previous jobs: %w", err)
239+
}
240+
if err := CancelJobs(ctx, previousJobs); err != nil {
241+
return fmt.Errorf("cancel previous jobs: %w", err)
242+
}
243+
}
234244
}
235245

236-
return CancelJobs(ctx, previousJobs)
246+
if err := job.LoadRun(ctx); err != nil {
247+
return fmt.Errorf("load run: %w", err)
248+
}
249+
if len(job.Run.ConcurrencyGroup) > 0 && job.Run.ConcurrencyCancel {
250+
// cancel previous runs in the same concurrency group
251+
runs, err := db.Find[ActionRun](ctx, &FindRunOptions{
252+
RepoID: job.RepoID,
253+
ConcurrencyGroup: job.Run.ConcurrencyGroup,
254+
Status: []Status{StatusRunning, StatusWaiting, StatusBlocked},
255+
})
256+
if err != nil {
257+
return fmt.Errorf("find runs: %w", err)
258+
}
259+
for _, run := range runs {
260+
if run.ID == job.Run.ID {
261+
continue
262+
}
263+
jobs, err := db.Find[ActionRunJob](ctx, FindRunJobOptions{
264+
RunID: run.ID,
265+
})
266+
if err != nil {
267+
return fmt.Errorf("find run %d jobs: %w", run.ID, err)
268+
}
269+
if err := CancelJobs(ctx, jobs); err != nil {
270+
return fmt.Errorf("cancel run %d jobs: %w", run.ID, err)
271+
}
272+
}
273+
}
274+
275+
return nil
237276
}

models/actions/task.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -320,6 +320,10 @@ func CreateTaskForRunner(ctx context.Context, runner *ActionRunner) (*ActionTask
320320
return nil, false, nil
321321
}
322322

323+
if err := CancelPreviousJobsByConcurrency(ctx, job); err != nil {
324+
return nil, false, err
325+
}
326+
323327
task.Job = job
324328

325329
if err := committer.Commit(); err != nil {

routers/web/repo/actions/view.go

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -560,11 +560,11 @@ func Approve(ctx *context_module.Context) {
560560
return err
561561
}
562562
for _, job := range jobs {
563-
blockByConcurrency, err := actions_model.ShouldBlockJobByConcurrency(ctx, job)
563+
blockJobByConcurrency, err := actions_model.ShouldBlockJobByConcurrency(ctx, job)
564564
if err != nil {
565565
return err
566566
}
567-
if len(job.Needs) == 0 && job.Status.IsBlocked() && !blockByConcurrency {
567+
if len(job.Needs) == 0 && job.Status.IsBlocked() && !blockJobByConcurrency {
568568
job.Status = actions_model.StatusWaiting
569569
_, err := actions_model.UpdateRunJob(ctx, job, nil, "status")
570570
if err != nil {
@@ -830,10 +830,9 @@ func Run(ctx *context_module.Context) {
830830

831831
// find workflow from commit
832832
var workflows []*jobparser.SingleWorkflow
833-
var content []byte
834833
for _, entry := range entries {
835834
if entry.Name() == workflowID {
836-
content, err = actions.GetContentFromEntry(entry)
835+
content, err := actions.GetContentFromEntry(entry)
837836
if err != nil {
838837
ctx.Error(http.StatusInternalServerError, err.Error())
839838
return

services/actions/job_emitter.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ func checkJobsByRunID(ctx context.Context, runID int64) error {
6060

6161
// check run (workflow-level) concurrency
6262
concurrentRunIDs := make(container.Set[int64])
63-
if len(run.ConcurrencyGroup) > 0 && !run.ConcurrencyCancel {
63+
if len(run.ConcurrencyGroup) > 0 {
6464
concurrentRuns, err := db.Find[actions_model.ActionRun](ctx, actions_model.FindRunOptions{
6565
RepoID: run.RepoID,
6666
ConcurrencyGroup: run.ConcurrencyGroup,
@@ -94,7 +94,7 @@ func checkJobsByRunID(ctx context.Context, runID int64) error {
9494
return err
9595
}
9696
for _, job := range concurrentJobs {
97-
if job.Status.IsDone() && len(job.ConcurrencyGroup) > 0 && !job.ConcurrencyCancel {
97+
if job.Status.IsDone() && len(job.ConcurrencyGroup) > 0 {
9898
concurrentJobs, err := db.Find[actions_model.ActionRunJob](ctx, actions_model.FindRunJobOptions{
9999
RepoID: job.RepoID,
100100
ConcurrencyGroup: job.ConcurrencyGroup,

0 commit comments

Comments
 (0)