Skip to content

Commit e45ca56

Browse files
committed
improve handling cancellation
1 parent dc003e4 commit e45ca56

File tree

5 files changed

+32
-35
lines changed

5 files changed

+32
-35
lines changed

models/actions/run.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -285,7 +285,11 @@ func CancelJobs(ctx context.Context, jobs []*ActionRunJob) ([]*ActionRunJob, err
285285
if err := StopTask(ctx, job.TaskID, StatusCancelled); err != nil {
286286
return cancelledJobs, err
287287
}
288-
cancelledJobs = append(cancelledJobs, job)
288+
updatedJob, err := GetRunJobByID(ctx, job.ID)
289+
if err != nil {
290+
return cancelledJobs, fmt.Errorf("get job: %w", err)
291+
}
292+
cancelledJobs = append(cancelledJobs, updatedJob)
289293
}
290294

291295
// Return nil to indicate successful cancellation of all running and waiting jobs.

routers/api/actions/runner/runner.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -227,7 +227,7 @@ func (s *Service) UpdateTask(
227227
}
228228

229229
if req.Msg.State.Result != runnerv1.Result_RESULT_UNSPECIFIED {
230-
if err := actions_service.EmitJobsIfReady(task.Job.RunID); err != nil {
230+
if err := actions_service.EmitJobsIfReadyByRun(task.Job.RunID); err != nil {
231231
log.Error("Emit ready jobs of run %d: %v", task.Job.RunID, err)
232232
}
233233
}

routers/web/repo/actions/view.go

Lines changed: 5 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@ import (
2727
"code.gitea.io/gitea/modules/log"
2828
"code.gitea.io/gitea/modules/storage"
2929
"code.gitea.io/gitea/modules/templates"
30-
"code.gitea.io/gitea/modules/timeutil"
3130
"code.gitea.io/gitea/modules/util"
3231
"code.gitea.io/gitea/modules/web"
3332
"code.gitea.io/gitea/routers/common"
@@ -572,45 +571,19 @@ func Cancel(ctx *context_module.Context) {
572571
var updatedjobs []*actions_model.ActionRunJob
573572

574573
if err := db.WithTx(ctx, func(ctx context.Context) error {
575-
for _, job := range jobs {
576-
status := job.Status
577-
if status.IsDone() {
578-
continue
579-
}
580-
if job.TaskID == 0 {
581-
job.Status = actions_model.StatusCancelled
582-
job.Stopped = timeutil.TimeStampNow()
583-
n, err := actions_model.UpdateRunJob(ctx, job, builder.Eq{"task_id": 0}, "status", "stopped")
584-
if err != nil {
585-
return err
586-
}
587-
if n == 0 {
588-
return errors.New("job has changed, try again")
589-
}
590-
if n > 0 {
591-
updatedjobs = append(updatedjobs, job)
592-
}
593-
continue
594-
}
595-
if err := actions_model.StopTask(ctx, job.TaskID, actions_model.StatusCancelled); err != nil {
596-
return err
597-
}
574+
cancelledJobs, err := actions_model.CancelJobs(ctx, jobs)
575+
if err != nil {
576+
return fmt.Errorf("cancel jobs: %w", err)
598577
}
578+
updatedjobs = append(updatedjobs, cancelledJobs...)
599579
return nil
600580
}); err != nil {
601581
ctx.ServerError("StopTask", err)
602582
return
603583
}
604584

605585
actions_service.CreateCommitStatus(ctx, jobs...)
606-
607-
run, err := actions_model.GetRunByIndex(ctx, ctx.Repo.Repository.ID, runIndex)
608-
if err != nil {
609-
ctx.ServerError("GetRunByIndex", err)
610-
}
611-
if err := actions_service.EmitJobsIfReady(run.ID); err != nil {
612-
log.Error("Emit ready jobs of run %d: %v", run.ID, err)
613-
}
586+
actions_service.EmitJobsIfReadyByJobs(updatedjobs)
614587

615588
for _, job := range updatedjobs {
616589
_ = job.LoadAttributes(ctx)

services/actions/clear_tasks.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,12 +52,14 @@ func notifyWorkflowJobStatusUpdate(ctx context.Context, jobs []*actions_model.Ac
5252
func CancelPreviousJobs(ctx context.Context, repoID int64, ref, workflowID string, event webhook_module.HookEventType) error {
5353
jobs, err := actions_model.CancelPreviousJobs(ctx, repoID, ref, workflowID, event)
5454
notifyWorkflowJobStatusUpdate(ctx, jobs)
55+
EmitJobsIfReadyByJobs(jobs)
5556
return err
5657
}
5758

5859
func CleanRepoScheduleTasks(ctx context.Context, repo *repo_model.Repository) error {
5960
jobs, err := actions_model.CleanRepoScheduleTasks(ctx, repo)
6061
notifyWorkflowJobStatusUpdate(ctx, jobs)
62+
EmitJobsIfReadyByJobs(jobs)
6163
return err
6264
}
6365

@@ -109,6 +111,7 @@ func stopTasks(ctx context.Context, opts actions_model.FindTaskOptions) error {
109111
}
110112

111113
notifyWorkflowJobStatusUpdate(ctx, jobs)
114+
EmitJobsIfReadyByJobs(jobs)
112115

113116
return nil
114117
}
@@ -125,6 +128,7 @@ func CancelAbandonedJobs(ctx context.Context) error {
125128
}
126129

127130
now := timeutil.TimeStampNow()
131+
var updatedJobs []*actions_model.ActionRunJob
128132
for _, job := range jobs {
129133
job.Status = actions_model.StatusCancelled
130134
job.Stopped = now
@@ -139,10 +143,13 @@ func CancelAbandonedJobs(ctx context.Context) error {
139143
}
140144
CreateCommitStatus(ctx, job)
141145
if updated {
146+
updatedJobs = append(updatedJobs, job)
142147
NotifyWorkflowRunStatusUpdateWithReload(ctx, job)
143148
notify_service.WorkflowJobStatusUpdate(ctx, job.Run.Repo, job.Run.TriggerUser, job, nil)
144149
}
145150
}
146151

152+
EmitJobsIfReadyByJobs(updatedJobs)
153+
147154
return nil
148155
}

services/actions/job_emitter.go

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ type jobUpdate struct {
2626
RunID int64
2727
}
2828

29-
func EmitJobsIfReady(runID int64) error {
29+
func EmitJobsIfReadyByRun(runID int64) error {
3030
err := jobEmitterQueue.Push(&jobUpdate{
3131
RunID: runID,
3232
})
@@ -36,6 +36,19 @@ func EmitJobsIfReady(runID int64) error {
3636
return err
3737
}
3838

39+
func EmitJobsIfReadyByJobs(jobs []*actions_model.ActionRunJob) {
40+
checkedRuns := make(container.Set[int64])
41+
for _, job := range jobs {
42+
if !job.Status.IsDone() || checkedRuns.Contains(job.RunID) {
43+
continue
44+
}
45+
if err := EmitJobsIfReadyByRun(job.RunID); err != nil {
46+
log.Error("Check jobs of run %d: %v", job.RunID, err)
47+
}
48+
checkedRuns.Add(job.RunID)
49+
}
50+
}
51+
3952
func jobEmitterQueueHandler(items ...*jobUpdate) []*jobUpdate {
4053
ctx := graceful.GetManager().ShutdownContext()
4154
var ret []*jobUpdate

0 commit comments

Comments
 (0)