Skip to content

Commit 81b5c7c

Browse files
feat: move StopTask, CancelPreviousJobs and CleanRepoScheduleTasks to services/actions
This enables all action run state changes (from a not done to a done state) to also send a notification. Moved these: - models/actions/task.go|423 col 6| func StopTask(ctx context.Context, taskID int64, status Status) error { - models/actions/run.go|190 col 6| func CancelPreviousJobs(ctx context.Context, repoID int64, ref, workflowID string, event webhook_module.HookEventType) error { - models/actions/schedule.go|122 col 6| func CleanRepoScheduleTasks(ctx context.Context, repo *repo_model.Repository, cancelPreviousJobs bool) error {
1 parent c977585 commit 81b5c7c

File tree

12 files changed

+156
-156
lines changed

12 files changed

+156
-156
lines changed

models/actions/run.go

Lines changed: 0 additions & 69 deletions
Original file line numberDiff line numberDiff line change
@@ -185,75 +185,6 @@ func updateRepoRunsNumbers(ctx context.Context, repo *repo_model.Repository) err
185185
return err
186186
}
187187

188-
// CancelPreviousJobs cancels all previous jobs of the same repository, reference, workflow, and event.
189-
// It's useful when a new run is triggered, and all previous runs needn't be continued anymore.
190-
func CancelPreviousJobs(ctx context.Context, repoID int64, ref, workflowID string, event webhook_module.HookEventType) error {
191-
// Find all runs in the specified repository, reference, and workflow with non-final status
192-
runs, total, err := db.FindAndCount[ActionRun](ctx, FindRunOptions{
193-
RepoID: repoID,
194-
Ref: ref,
195-
WorkflowID: workflowID,
196-
TriggerEvent: event,
197-
Status: []Status{StatusRunning, StatusWaiting, StatusBlocked},
198-
})
199-
if err != nil {
200-
return err
201-
}
202-
203-
// If there are no runs found, there's no need to proceed with cancellation, so return nil.
204-
if total == 0 {
205-
return nil
206-
}
207-
208-
// Iterate over each found run and cancel its associated jobs.
209-
for _, run := range runs {
210-
// Find all jobs associated with the current run.
211-
jobs, err := db.Find[ActionRunJob](ctx, FindRunJobOptions{
212-
RunID: run.ID,
213-
})
214-
if err != nil {
215-
return err
216-
}
217-
218-
// Iterate over each job and attempt to cancel it.
219-
for _, job := range jobs {
220-
// Skip jobs that are already in a terminal state (completed, cancelled, etc.).
221-
status := job.Status
222-
if status.IsDone() {
223-
continue
224-
}
225-
226-
// If the job has no associated task (probably an error), set its status to 'Cancelled' and stop it.
227-
if job.TaskID == 0 {
228-
job.Status = StatusCancelled
229-
job.Stopped = timeutil.TimeStampNow()
230-
231-
// Update the job's status and stopped time in the database.
232-
n, err := UpdateRunJob(ctx, job, builder.Eq{"task_id": 0}, "status", "stopped")
233-
if err != nil {
234-
return err
235-
}
236-
237-
// If the update affected 0 rows, it means the job has changed in the meantime, so we need to try again.
238-
if n == 0 {
239-
return fmt.Errorf("job has changed, try again")
240-
}
241-
242-
// Continue with the next job.
243-
continue
244-
}
245-
246-
// If the job has an associated task, try to stop the task, effectively cancelling the job.
247-
if err := StopTask(ctx, job.TaskID, StatusCancelled); err != nil {
248-
return err
249-
}
250-
}
251-
}
252-
253-
// Return nil to indicate successful cancellation of all running and waiting jobs.
254-
return nil
255-
}
256-
257188
// InsertRun inserts a run
258189
// The title will be cut off at 255 characters if it's longer than 255 characters.
259190
func InsertRun(ctx context.Context, run *ActionRun, jobs []*jobparser.SingleWorkflow) error {

models/actions/schedule.go

Lines changed: 0 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@ package actions
55

66
import (
77
"context"
8-
"fmt"
98
"time"
109

1110
"forgejo.org/models/db"
@@ -119,27 +118,6 @@ func DeleteScheduleTaskByRepo(ctx context.Context, id int64) error {
119118
return committer.Commit()
120119
}
121120

122-
func CleanRepoScheduleTasks(ctx context.Context, repo *repo_model.Repository, cancelPreviousJobs bool) error {
123-
// If actions disabled when there is schedule task, this will remove the outdated schedule tasks
124-
// There is no other place we can do this because the app.ini will be changed manually
125-
if err := DeleteScheduleTaskByRepo(ctx, repo.ID); err != nil {
126-
return fmt.Errorf("DeleteCronTaskByRepo: %v", err)
127-
}
128-
if cancelPreviousJobs {
129-
// cancel running cron jobs of this repository and delete old schedules
130-
if err := CancelPreviousJobs(
131-
ctx,
132-
repo.ID,
133-
repo.DefaultBranch,
134-
"",
135-
webhook_module.HookEventSchedule,
136-
); err != nil {
137-
return fmt.Errorf("CancelPreviousJobs: %v", err)
138-
}
139-
}
140-
return nil
141-
}
142-
143121
type FindScheduleOptions struct {
144122
db.ListOptions
145123
RepoID int64

models/actions/task.go

Lines changed: 0 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -420,57 +420,6 @@ func UpdateTaskByState(ctx context.Context, runnerID int64, state *runnerv1.Task
420420
return task, nil
421421
}
422422

423-
func StopTask(ctx context.Context, taskID int64, status Status) error {
424-
if !status.IsDone() {
425-
return fmt.Errorf("cannot stop task with status %v", status)
426-
}
427-
e := db.GetEngine(ctx)
428-
429-
task := &ActionTask{}
430-
if has, err := e.ID(taskID).Get(task); err != nil {
431-
return err
432-
} else if !has {
433-
return util.ErrNotExist
434-
}
435-
if task.Status.IsDone() {
436-
return nil
437-
}
438-
439-
now := timeutil.TimeStampNow()
440-
task.Status = status
441-
task.Stopped = now
442-
if _, err := UpdateRunJob(ctx, &ActionRunJob{
443-
ID: task.JobID,
444-
Status: task.Status,
445-
Stopped: task.Stopped,
446-
}, nil); err != nil {
447-
return err
448-
}
449-
450-
if err := UpdateTask(ctx, task, "status", "stopped"); err != nil {
451-
return err
452-
}
453-
454-
if err := task.LoadAttributes(ctx); err != nil {
455-
return err
456-
}
457-
458-
for _, step := range task.Steps {
459-
if !step.Status.IsDone() {
460-
step.Status = status
461-
if step.Started == 0 {
462-
step.Started = now
463-
}
464-
step.Stopped = now
465-
}
466-
if _, err := e.ID(step.ID).Update(step); err != nil {
467-
return err
468-
}
469-
}
470-
471-
return nil
472-
}
473-
474423
func FindOldTasksToExpire(ctx context.Context, olderThan timeutil.TimeStamp, limit int) ([]*ActionTask, error) {
475424
e := db.GetEngine(ctx)
476425

routers/api/v1/repo/repo.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@ import (
1111
"strings"
1212
"time"
1313

14-
actions_model "forgejo.org/models/actions"
1514
activities_model "forgejo.org/models/activities"
1615
"forgejo.org/models/db"
1716
"forgejo.org/models/organization"
@@ -1065,7 +1064,7 @@ func updateRepoArchivedState(ctx *context.APIContext, opts api.EditRepoOption) e
10651064
ctx.Error(http.StatusInternalServerError, "ArchiveRepoState", err)
10661065
return err
10671066
}
1068-
if err := actions_model.CleanRepoScheduleTasks(ctx, repo, true); err != nil {
1067+
if err := actions_service.CleanRepoScheduleTasks(ctx, repo, true); err != nil {
10691068
log.Error("CleanRepoScheduleTasks for archived repo %s/%s: %v", ctx.Repo.Owner.Name, repo.Name, err)
10701069
}
10711070
log.Trace("Repository was archived: %s/%s", ctx.Repo.Owner.Name, repo.Name)

routers/web/repo/actions/view.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -521,7 +521,7 @@ func Cancel(ctx *context_module.Context) {
521521
}
522522
continue
523523
}
524-
if err := actions_model.StopTask(ctx, job.TaskID, actions_model.StatusCancelled); err != nil {
524+
if err := actions_service.StopTask(ctx, job.TaskID, actions_model.StatusCancelled); err != nil {
525525
return err
526526
}
527527
}

routers/web/repo/setting/setting.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@ import (
1414
"time"
1515

1616
"forgejo.org/models"
17-
actions_model "forgejo.org/models/actions"
1817
"forgejo.org/models/db"
1918
"forgejo.org/models/organization"
2019
quota_model "forgejo.org/models/quota"
@@ -1034,7 +1033,7 @@ func SettingsPost(ctx *context.Context) {
10341033
return
10351034
}
10361035

1037-
if err := actions_model.CleanRepoScheduleTasks(ctx, repo, true); err != nil {
1036+
if err := actions_service.CleanRepoScheduleTasks(ctx, repo, true); err != nil {
10381037
log.Error("CleanRepoScheduleTasks for archived repo %s/%s: %v", ctx.Repo.Owner.Name, repo.Name, err)
10391038
}
10401039

services/actions/clear_tasks.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ func stopTasks(ctx context.Context, opts actions_model.FindTaskOptions) error {
4141
jobs := make([]*actions_model.ActionRunJob, 0, len(tasks))
4242
for _, task := range tasks {
4343
if err := db.WithTx(ctx, func(ctx context.Context) error {
44-
if err := actions_model.StopTask(ctx, task.ID, actions_model.StatusFailure); err != nil {
44+
if err := StopTask(ctx, task.ID, actions_model.StatusFailure); err != nil {
4545
return err
4646
}
4747
if err := task.LoadJob(ctx); err != nil {

services/actions/notifier_helper.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -139,7 +139,7 @@ func notify(ctx context.Context, input *notifyInput) error {
139139
return nil
140140
}
141141
if unit_model.TypeActions.UnitGlobalDisabled() {
142-
if err := actions_model.CleanRepoScheduleTasks(ctx, input.Repo, true); err != nil {
142+
if err := CleanRepoScheduleTasks(ctx, input.Repo, true); err != nil {
143143
log.Error("CleanRepoScheduleTasks: %v", err)
144144
}
145145
return nil
@@ -373,7 +373,7 @@ func handleWorkflows(
373373
// cancel running jobs if the event is push or pull_request_sync
374374
if run.Event == webhook_module.HookEventPush ||
375375
run.Event == webhook_module.HookEventPullRequestSync {
376-
if err := actions_model.CancelPreviousJobs(
376+
if err := CancelPreviousJobs(
377377
ctx,
378378
run.RepoID,
379379
run.Ref,
@@ -504,7 +504,7 @@ func handleSchedules(
504504
log.Error("CountSchedules: %v", err)
505505
return err
506506
} else if count > 0 {
507-
if err := actions_model.CleanRepoScheduleTasks(ctx, input.Repo, false); err != nil {
507+
if err := CleanRepoScheduleTasks(ctx, input.Repo, false); err != nil {
508508
log.Error("CleanRepoScheduleTasks: %v", err)
509509
}
510510
}

services/actions/schedule_tasks.go

Lines changed: 92 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ import (
1717
webhook_module "forgejo.org/modules/webhook"
1818

1919
"github.com/nektos/act/pkg/jobparser"
20+
"xorm.io/builder"
2021
)
2122

2223
// StartScheduleTasks start the task
@@ -55,7 +56,7 @@ func startTasks(ctx context.Context) error {
5556
// cancel running jobs if the event is push
5657
if row.Schedule.Event == webhook_module.HookEventPush {
5758
// cancel running jobs of the same workflow
58-
if err := actions_model.CancelPreviousJobs(
59+
if err := CancelPreviousJobs(
5960
ctx,
6061
row.RepoID,
6162
row.Schedule.Ref,
@@ -152,3 +153,93 @@ func CreateScheduleTask(ctx context.Context, cron *actions_model.ActionSchedule)
152153
// Return nil if no errors occurred
153154
return nil
154155
}
156+
157+
// CancelPreviousJobs cancels all previous jobs of the same repository, reference, workflow, and event.
158+
// It's useful when a new run is triggered, and all previous runs needn't be continued anymore.
159+
func CancelPreviousJobs(ctx context.Context, repoID int64, ref, workflowID string, event webhook_module.HookEventType) error {
160+
// Find all runs in the specified repository, reference, and workflow with non-final status
161+
runs, total, err := db.FindAndCount[actions_model.ActionRun](ctx, actions_model.FindRunOptions{
162+
RepoID: repoID,
163+
Ref: ref,
164+
WorkflowID: workflowID,
165+
TriggerEvent: event,
166+
Status: []actions_model.Status{actions_model.StatusRunning, actions_model.StatusWaiting, actions_model.StatusBlocked},
167+
})
168+
if err != nil {
169+
return err
170+
}
171+
172+
// If there are no runs found, there's no need to proceed with cancellation, so return nil.
173+
if total == 0 {
174+
return nil
175+
}
176+
177+
// Iterate over each found run and cancel its associated jobs.
178+
for _, run := range runs {
179+
// Find all jobs associated with the current run.
180+
jobs, err := db.Find[actions_model.ActionRunJob](ctx, actions_model.FindRunJobOptions{
181+
RunID: run.ID,
182+
})
183+
if err != nil {
184+
return err
185+
}
186+
187+
// Iterate over each job and attempt to cancel it.
188+
for _, job := range jobs {
189+
// Skip jobs that are already in a terminal state (completed, cancelled, etc.).
190+
status := job.Status
191+
if status.IsDone() {
192+
continue
193+
}
194+
195+
// If the job has no associated task (probably an error), set its status to 'Cancelled' and stop it.
196+
if job.TaskID == 0 {
197+
job.Status = actions_model.StatusCancelled
198+
job.Stopped = timeutil.TimeStampNow()
199+
200+
// Update the job's status and stopped time in the database.
201+
n, err := actions_model.UpdateRunJob(ctx, job, builder.Eq{"task_id": 0}, "status", "stopped")
202+
if err != nil {
203+
return err
204+
}
205+
206+
// If the update affected 0 rows, it means the job has changed in the meantime, so we need to try again.
207+
if n == 0 {
208+
return fmt.Errorf("job has changed, try again")
209+
}
210+
211+
// Continue with the next job.
212+
continue
213+
}
214+
215+
// If the job has an associated task, try to stop the task, effectively cancelling the job.
216+
if err := StopTask(ctx, job.TaskID, actions_model.StatusCancelled); err != nil {
217+
return err
218+
}
219+
}
220+
}
221+
222+
// Return nil to indicate successful cancellation of all running and waiting jobs.
223+
return nil
224+
}
225+
226+
func CleanRepoScheduleTasks(ctx context.Context, repo *repo_model.Repository, cancelPreviousJobs bool) error {
227+
// If actions disabled when there is schedule task, this will remove the outdated schedule tasks
228+
// There is no other place we can do this because the app.ini will be changed manually
229+
if err := actions_model.DeleteScheduleTaskByRepo(ctx, repo.ID); err != nil {
230+
return fmt.Errorf("DeleteCronTaskByRepo: %v", err)
231+
}
232+
if cancelPreviousJobs {
233+
// cancel running cron jobs of this repository and delete old schedules
234+
if err := CancelPreviousJobs(
235+
ctx,
236+
repo.ID,
237+
repo.DefaultBranch,
238+
"",
239+
webhook_module.HookEventSchedule,
240+
); err != nil {
241+
return fmt.Errorf("CancelPreviousJobs: %v", err)
242+
}
243+
}
244+
return nil
245+
}

0 commit comments

Comments
 (0)