Skip to content

Commit a383b13

Browse files
committed
fixes
1 parent 865d0c3 commit a383b13

File tree

8 files changed

+63
-77
lines changed

8 files changed

+63
-77
lines changed

models/actions/run.go

Lines changed: 14 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -226,48 +226,19 @@ func CancelPreviousJobs(ctx context.Context, repoID int64, ref, workflowID strin
226226
return cancelledJobs, err
227227
}
228228

229-
// Iterate over each job and attempt to cancel it.
230-
for _, job := range jobs {
231-
// Skip jobs that are already in a terminal state (completed, cancelled, etc.).
232-
status := job.Status
233-
if status.IsDone() {
234-
continue
235-
}
236-
237-
// If the job has no associated task (probably an error), set its status to 'Cancelled' and stop it.
238-
if job.TaskID == 0 {
239-
job.Status = StatusCancelled
240-
job.Stopped = timeutil.TimeStampNow()
241-
242-
// Update the job's status and stopped time in the database.
243-
n, err := UpdateRunJob(ctx, job, builder.Eq{"task_id": 0}, "status", "stopped")
244-
if err != nil {
245-
return cancelledJobs, err
246-
}
247-
248-
// If the update affected 0 rows, it means the job has changed in the meantime, so we need to try again.
249-
if n == 0 {
250-
return cancelledJobs, errors.New("job has changed, try again")
251-
}
252-
253-
cancelledJobs = append(cancelledJobs, job)
254-
// Continue with the next job.
255-
continue
256-
}
257-
258-
// If the job has an associated task, try to stop the task, effectively cancelling the job.
259-
if err := StopTask(ctx, job.TaskID, StatusCancelled); err != nil {
260-
return cancelledJobs, err
261-
}
262-
cancelledJobs = append(cancelledJobs, job)
229+
if cjs, err := CancelJobs(ctx, jobs); err != nil {
230+
return cancelledJobs, err
231+
} else {
232+
cancelledJobs = append(cancelledJobs, cjs...)
263233
}
264234
}
265235

266236
// Return nil to indicate successful cancellation of all running and waiting jobs.
267237
return cancelledJobs, nil
268238
}
269239

270-
func CancelJobs(ctx context.Context, jobs []*ActionRunJob) error {
240+
func CancelJobs(ctx context.Context, jobs []*ActionRunJob) ([]*ActionRunJob, error) {
241+
cancelledJobs := make([]*ActionRunJob, 0, len(jobs))
271242
// Iterate over each job and attempt to cancel it.
272243
for _, job := range jobs {
273244
// Skip jobs that are already in a terminal state (completed, cancelled, etc.).
@@ -284,24 +255,28 @@ func CancelJobs(ctx context.Context, jobs []*ActionRunJob) error {
284255
// Update the job's status and stopped time in the database.
285256
n, err := UpdateRunJob(ctx, job, builder.Eq{"task_id": 0}, "status", "stopped")
286257
if err != nil {
287-
return err
258+
return cancelledJobs, err
288259
}
289260

290261
// If the update affected 0 rows, it means the job has changed in the meantime, so we need to try again.
291262
if n == 0 {
292-
return fmt.Errorf("job has changed, try again")
263+
return cancelledJobs, errors.New("job has changed, try again")
293264
}
294265

266+
cancelledJobs = append(cancelledJobs, job)
295267
// Continue with the next job.
296268
continue
297269
}
298270

299271
// If the job has an associated task, try to stop the task, effectively cancelling the job.
300272
if err := StopTask(ctx, job.TaskID, StatusCancelled); err != nil {
301-
return err
273+
return cancelledJobs, err
302274
}
275+
cancelledJobs = append(cancelledJobs, job)
303276
}
304-
return nil
277+
278+
// Return nil to indicate successful cancellation of all running and waiting jobs.
279+
return cancelledJobs, nil
305280
}
306281

307282
func GetRunByID(ctx context.Context, id int64) (*ActionRun, error) {

models/actions/run_job.go

Lines changed: 17 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -238,10 +238,12 @@ func ShouldBlockJobByConcurrency(ctx context.Context, job *ActionRunJob) (bool,
238238
return ShouldBlockRunByConcurrency(ctx, job.Run)
239239
}
240240

241-
func CancelPreviousJobsByConcurrency(ctx context.Context, job *ActionRunJob) error {
241+
func CancelPreviousJobsByConcurrency(ctx context.Context, job *ActionRunJob) ([]*ActionRunJob, error) {
242+
var cancelledJobs []*ActionRunJob
243+
242244
if job.RawConcurrencyGroup != "" {
243245
if !job.IsConcurrencyEvaluated {
244-
return ErrUnevaluatedConcurrency{
246+
return cancelledJobs, ErrUnevaluatedConcurrency{
245247
Group: job.RawConcurrencyGroup,
246248
CancelInProgress: job.RawConcurrencyCancel,
247249
}
@@ -254,17 +256,19 @@ func CancelPreviousJobsByConcurrency(ctx context.Context, job *ActionRunJob) err
254256
Statuses: []Status{StatusRunning, StatusWaiting, StatusBlocked},
255257
})
256258
if err != nil {
257-
return fmt.Errorf("find previous jobs: %w", err)
259+
return cancelledJobs, fmt.Errorf("find previous jobs: %w", err)
258260
}
259261
previousJobs = slices.DeleteFunc(previousJobs, func(j *ActionRunJob) bool { return j.ID == job.ID })
260-
if err := CancelJobs(ctx, previousJobs); err != nil {
261-
return fmt.Errorf("cancel previous jobs: %w", err)
262+
if cjs, err := CancelJobs(ctx, previousJobs); err != nil {
263+
return cancelledJobs, fmt.Errorf("cancel previous jobs: %w", err)
264+
} else {
265+
cancelledJobs = append(cancelledJobs, cjs...)
262266
}
263267
}
264268
}
265269

266270
if err := job.LoadRun(ctx); err != nil {
267-
return fmt.Errorf("load run: %w", err)
271+
return cancelledJobs, fmt.Errorf("load run: %w", err)
268272
}
269273
if job.Run.ConcurrencyGroup != "" && job.Run.ConcurrencyCancel {
270274
// cancel previous runs in the same concurrency group
@@ -274,7 +278,7 @@ func CancelPreviousJobsByConcurrency(ctx context.Context, job *ActionRunJob) err
274278
Status: []Status{StatusRunning, StatusWaiting, StatusBlocked},
275279
})
276280
if err != nil {
277-
return fmt.Errorf("find runs: %w", err)
281+
return cancelledJobs, fmt.Errorf("find runs: %w", err)
278282
}
279283
for _, run := range runs {
280284
if run.ID == job.Run.ID {
@@ -284,15 +288,17 @@ func CancelPreviousJobsByConcurrency(ctx context.Context, job *ActionRunJob) err
284288
RunID: run.ID,
285289
})
286290
if err != nil {
287-
return fmt.Errorf("find run %d jobs: %w", run.ID, err)
291+
return cancelledJobs, fmt.Errorf("find run %d jobs: %w", run.ID, err)
288292
}
289-
if err := CancelJobs(ctx, jobs); err != nil {
290-
return fmt.Errorf("cancel run %d jobs: %w", run.ID, err)
293+
if cjs, err := CancelJobs(ctx, jobs); err != nil {
294+
return cancelledJobs, fmt.Errorf("cancel run %d jobs: %w", run.ID, err)
295+
} else {
296+
cancelledJobs = append(cancelledJobs, cjs...)
291297
}
292298
}
293299
}
294300

295-
return nil
301+
return cancelledJobs, nil
296302
}
297303

298304
type ErrUnevaluatedConcurrency struct {

models/actions/task.go

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -321,10 +321,6 @@ func CreateTaskForRunner(ctx context.Context, runner *ActionRunner) (*ActionTask
321321
return nil, false, nil
322322
}
323323

324-
if err := CancelPreviousJobsByConcurrency(ctx, job); err != nil {
325-
return nil, false, err
326-
}
327-
328324
task.Job = job
329325

330326
if err := committer.Commit(); err != nil {

services/actions/clear_tasks.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,12 @@ func CleanRepoScheduleTasks(ctx context.Context, repo *repo_model.Repository) er
5757
return err
5858
}
5959

60+
func CancelConcurrentJobs(ctx context.Context, job *actions_model.ActionRunJob) error {
61+
jobs, err := actions_model.CancelPreviousJobsByConcurrency(ctx, job)
62+
notifyWorkflowJobStatusUpdate(ctx, jobs)
63+
return err
64+
}
65+
6066
func stopTasks(ctx context.Context, opts actions_model.FindTaskOptions) error {
6167
tasks, err := db.Find[actions_model.ActionTask](ctx, opts)
6268
if err != nil {

services/actions/concurrency.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ package actions
55

66
import (
77
"context"
8+
"errors"
89
"fmt"
910

1011
actions_model "code.gitea.io/gitea/models/actions"
@@ -61,7 +62,7 @@ func EvaluateJobConcurrency(ctx context.Context, run *actions_model.ActionRun, a
6162
if err != nil {
6263
return "", false, fmt.Errorf("parse single workflow: %w", err)
6364
} else if len(singleWorkflows) != 1 {
64-
return "", false, fmt.Errorf("not single workflow")
65+
return "", false, errors.New("not single workflow")
6566
}
6667
_, singleWorkflowJob := singleWorkflows[0].Job()
6768

services/actions/job_emitter_test.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@
44
package actions
55

66
import (
7-
"context"
87
"testing"
98

109
actions_model "code.gitea.io/gitea/models/actions"
@@ -131,7 +130,7 @@ jobs:
131130
for _, tt := range tests {
132131
t.Run(tt.name, func(t *testing.T) {
133132
r := newJobStatusResolver(tt.jobs, nil)
134-
assert.Equal(t, tt.want, r.Resolve(context.Background()))
133+
assert.Equal(t, tt.want, r.Resolve(t.Context()))
135134
})
136135
}
137136
}

services/actions/task.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,10 @@ func PickTask(ctx context.Context, runner *actions_model.ActionRunner) (*runnerv
5353
return nil
5454
}
5555

56+
if err := CancelConcurrentJobs(ctx, t.Job); err != nil {
57+
return fmt.Errorf("CancelConcurrentJobs: %w", err)
58+
}
59+
5660
if err := t.LoadAttributes(ctx); err != nil {
5761
return fmt.Errorf("task LoadAttributes: %w", err)
5862
}

tests/integration/actions_concurrency_test.go

Lines changed: 19 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@
44
package integration
55

66
import (
7-
"context"
87
"encoding/base64"
98
"fmt"
109
"net/http"
@@ -92,11 +91,11 @@ jobs:
9291
steps:
9392
- run: echo 'job from workflow3'
9493
`
95-
opts1 := getWorkflowCreateFileOptions(user2, repo.DefaultBranch, fmt.Sprintf("create %s", wf1TreePath), wf1FileContent)
94+
opts1 := getWorkflowCreateFileOptions(user2, repo.DefaultBranch, "create "+wf1TreePath, wf1FileContent)
9695
createWorkflowFile(t, token, user2.Name, repo.Name, wf1TreePath, opts1)
97-
opts2 := getWorkflowCreateFileOptions(user2, repo.DefaultBranch, fmt.Sprintf("create %s", wf2TreePath), wf2FileContent)
96+
opts2 := getWorkflowCreateFileOptions(user2, repo.DefaultBranch, "create "+wf2TreePath, wf2FileContent)
9897
createWorkflowFile(t, token, user2.Name, repo.Name, wf2TreePath, opts2)
99-
opts3 := getWorkflowCreateFileOptions(user2, repo.DefaultBranch, fmt.Sprintf("create %s", wf3TreePath), wf3FileContent)
98+
opts3 := getWorkflowCreateFileOptions(user2, repo.DefaultBranch, "create "+wf3TreePath, wf3FileContent)
10099
createWorkflowFile(t, token, user2.Name, repo.Name, wf3TreePath, opts3)
101100

102101
// fetch and exec workflow1, workflow2 and workflow3 are blocked
@@ -164,7 +163,7 @@ jobs:
164163
steps:
165164
- run: echo 'test the pull'
166165
`
167-
opts1 := getWorkflowCreateFileOptions(user2, baseRepo.DefaultBranch, fmt.Sprintf("create %s", wfTreePath), wfFileContent)
166+
opts1 := getWorkflowCreateFileOptions(user2, baseRepo.DefaultBranch, "create %s"+wfTreePath, wfFileContent)
168167
createWorkflowFile(t, user2Token, baseRepo.OwnerName, baseRepo.Name, wfTreePath, opts1)
169168
// user2 creates a pull request
170169
doAPICreateFile(user2APICtx, "user2-fix.txt", &api.CreateFileOptions{
@@ -225,7 +224,7 @@ jobs:
225224
},
226225
ContentBase64: base64.StdEncoding.EncodeToString([]byte("user4-fix")),
227226
})(t)
228-
doAPICreatePullRequest(user4APICtx, baseRepo.OwnerName, baseRepo.Name, baseRepo.DefaultBranch, fmt.Sprintf("%s:bugfix/bbb", user4.Name))(t)
227+
doAPICreatePullRequest(user4APICtx, baseRepo.OwnerName, baseRepo.Name, baseRepo.DefaultBranch, user4.Name+"%s:bugfix/bbb")(t)
229228
// cannot fetch the task because an approval is required
230229
runner.fetchNoTask(t)
231230
// user2 approves the run
@@ -265,7 +264,7 @@ jobs:
265264
},
266265
ContentBase64: base64.StdEncoding.EncodeToString([]byte("user4-fix2")),
267266
})(t)
268-
doAPICreatePullRequest(user4APICtx, baseRepo.OwnerName, baseRepo.Name, baseRepo.DefaultBranch, fmt.Sprintf("%s:do-not-cancel/ccc", user4.Name))(t)
267+
doAPICreatePullRequest(user4APICtx, baseRepo.OwnerName, baseRepo.Name, baseRepo.DefaultBranch, user4.Name+"%s:do-not-cancel/ccc")(t)
269268
// cannot fetch the task because cancel-in-progress is false
270269
runner.fetchNoTask(t)
271270
runner.execTask(t, pr2Task1, &mockTaskOutcome{
@@ -358,9 +357,9 @@ jobs:
358357
- run: echo 'wf3-job1'
359358
`
360359

361-
opts1 := getWorkflowCreateFileOptions(user2, repo.DefaultBranch, fmt.Sprintf("create %s", wf1TreePath), wf1FileContent)
360+
opts1 := getWorkflowCreateFileOptions(user2, repo.DefaultBranch, "create %s"+wf1TreePath, wf1FileContent)
362361
createWorkflowFile(t, token, user2.Name, repo.Name, wf1TreePath, opts1)
363-
opts2 := getWorkflowCreateFileOptions(user2, repo.DefaultBranch, fmt.Sprintf("create %s", wf2TreePath), wf2FileContent)
362+
opts2 := getWorkflowCreateFileOptions(user2, repo.DefaultBranch, "create %s"+wf2TreePath, wf2FileContent)
364363
createWorkflowFile(t, token, user2.Name, repo.Name, wf2TreePath, opts2)
365364

366365
// fetch wf1-job1
@@ -388,7 +387,7 @@ jobs:
388387
assert.Equal(t, "job-main-v1.23.0", wf2Job2ActionJob.ConcurrencyGroup)
389388
assert.Equal(t, actions_model.StatusRunning, wf2Job2ActionJob.Status)
390389
// push workflow3 to trigger wf3-job1
391-
opts3 := getWorkflowCreateFileOptions(user2, repo.DefaultBranch, fmt.Sprintf("create %s", wf3TreePath), wf3FileContent)
390+
opts3 := getWorkflowCreateFileOptions(user2, repo.DefaultBranch, "create %s"+wf3TreePath, wf3FileContent)
392391
createWorkflowFile(t, token, user2.Name, repo.Name, wf3TreePath, opts3)
393392
// fetch wf3-job1
394393
wf3Job1Task := runner1.fetchTask(t)
@@ -446,7 +445,7 @@ jobs:
446445
- run: echo 'job2'
447446
`
448447

449-
opts1 := getWorkflowCreateFileOptions(user2, repo.DefaultBranch, fmt.Sprintf("create %s", wf1TreePath), wf1FileContent)
448+
opts1 := getWorkflowCreateFileOptions(user2, repo.DefaultBranch, "create %s"+wf1TreePath, wf1FileContent)
450449
createWorkflowFile(t, token, user2.Name, repo.Name, wf1TreePath, opts1)
451450

452451
job1WinTask := windowsRunner.fetchTask(t)
@@ -524,7 +523,7 @@ jobs:
524523
- run: echo 'workflow dispatch job'
525524
`
526525

527-
opts1 := getWorkflowCreateFileOptions(user2, repo.DefaultBranch, fmt.Sprintf("create %s", wf1TreePath), wf1FileContent)
526+
opts1 := getWorkflowCreateFileOptions(user2, repo.DefaultBranch, "create %s"+wf1TreePath, wf1FileContent)
528527
createWorkflowFile(t, token, user2.Name, repo.Name, wf1TreePath, opts1)
529528

530529
// run the workflow with appVersion=v1.21 and cancel=false
@@ -605,7 +604,7 @@ jobs:
605604
- run: echo 'schedule workflow'
606605
`
607606

608-
opts1 := getWorkflowCreateFileOptions(user2, repo.DefaultBranch, fmt.Sprintf("create %s", wf1TreePath), wf1FileContent)
607+
opts1 := getWorkflowCreateFileOptions(user2, repo.DefaultBranch, "create %s"+wf1TreePath, wf1FileContent)
609608
createWorkflowFile(t, token, user2.Name, repo.Name, wf1TreePath, opts1)
610609

611610
// fetch the task triggered by push
@@ -619,8 +618,8 @@ jobs:
619618
// trigger the task by schedule
620619
spec := unittest.AssertExistsAndLoadBean(t, &actions_model.ActionScheduleSpec{RepoID: repo.ID})
621620
spec.Next = timeutil.TimeStampNow() // manually update "Next"
622-
assert.NoError(t, actions_model.UpdateScheduleSpec(context.Background(), spec, "next"))
623-
assert.NoError(t, actions_service.StartScheduleTasks(context.Background()))
621+
assert.NoError(t, actions_model.UpdateScheduleSpec(t.Context(), spec, "next"))
622+
assert.NoError(t, actions_service.StartScheduleTasks(t.Context()))
624623
runner.fetchNoTask(t) // cannot fetch because task1 is not completed
625624
runner.execTask(t, task1, &mockTaskOutcome{
626625
result: runnerv1.Result_RESULT_SUCCESS,
@@ -637,8 +636,8 @@ jobs:
637636
// trigger the task by schedule again
638637
spec = unittest.AssertExistsAndLoadBean(t, &actions_model.ActionScheduleSpec{RepoID: repo.ID})
639638
spec.Next = timeutil.TimeStampNow() // manually update "Next"
640-
assert.NoError(t, actions_model.UpdateScheduleSpec(context.Background(), spec, "next"))
641-
assert.NoError(t, actions_service.StartScheduleTasks(context.Background()))
639+
assert.NoError(t, actions_model.UpdateScheduleSpec(t.Context(), spec, "next"))
640+
assert.NoError(t, actions_service.StartScheduleTasks(t.Context()))
642641
runner.fetchNoTask(t) // cannot fetch because task2 is not completed
643642
run3 := unittest.AssertExistsAndLoadBean(t, &actions_model.ActionRun{RepoID: repo.ID, Status: actions_model.StatusBlocked})
644643
assert.Equal(t, "schedule-concurrency", run3.ConcurrencyGroup)
@@ -773,11 +772,11 @@ jobs:
773772
`
774773

775774
// push workflow 1, 2 and 3
776-
opts1 := getWorkflowCreateFileOptions(user2, repo.DefaultBranch, fmt.Sprintf("create %s", wf1TreePath), wf1FileContent)
775+
opts1 := getWorkflowCreateFileOptions(user2, repo.DefaultBranch, "create %s"+wf1TreePath, wf1FileContent)
777776
createWorkflowFile(t, token, user2.Name, repo.Name, wf1TreePath, opts1)
778-
opts2 := getWorkflowCreateFileOptions(user2, repo.DefaultBranch, fmt.Sprintf("create %s", wf2TreePath), wf2FileContent)
777+
opts2 := getWorkflowCreateFileOptions(user2, repo.DefaultBranch, "create %s"+wf2TreePath, wf2FileContent)
779778
createWorkflowFile(t, token, user2.Name, repo.Name, wf2TreePath, opts2)
780-
opts3 := getWorkflowCreateFileOptions(user2, repo.DefaultBranch, fmt.Sprintf("create %s", wf3TreePath), wf3FileContent)
779+
opts3 := getWorkflowCreateFileOptions(user2, repo.DefaultBranch, "create %s"+wf3TreePath, wf3FileContent)
781780
createWorkflowFile(t, token, user2.Name, repo.Name, wf3TreePath, opts3)
782781
// fetch wf1-job1 and wf1-job2
783782
w1j1Task := runner1.fetchTask(t)

0 commit comments

Comments
 (0)