Skip to content

Commit a783a72

Browse files
author
Earl Warren
committed
chore: refactor for Actions Done Notification (go-gitea#7510)
Reviewed-on: https://codeberg.org/forgejo/forgejo/pulls/7510 Reviewed-by: Earl Warren <[email protected]>
2 parents 1c2a298 + cdb4682 commit a783a72

File tree

13 files changed

+248
-249
lines changed

13 files changed

+248
-249
lines changed

models/actions/run.go

Lines changed: 0 additions & 69 deletions
Original file line numberDiff line numberDiff line change
@@ -185,75 +185,6 @@ func updateRepoRunsNumbers(ctx context.Context, repo *repo_model.Repository) err
185185
return err
186186
}
187187

188-
// CancelPreviousJobs cancels all previous jobs of the same repository, reference, workflow, and event.
189-
// It's useful when a new run is triggered, and all previous runs needn't be continued anymore.
190-
func CancelPreviousJobs(ctx context.Context, repoID int64, ref, workflowID string, event webhook_module.HookEventType) error {
191-
// Find all runs in the specified repository, reference, and workflow with non-final status
192-
runs, total, err := db.FindAndCount[ActionRun](ctx, FindRunOptions{
193-
RepoID: repoID,
194-
Ref: ref,
195-
WorkflowID: workflowID,
196-
TriggerEvent: event,
197-
Status: []Status{StatusRunning, StatusWaiting, StatusBlocked},
198-
})
199-
if err != nil {
200-
return err
201-
}
202-
203-
// If there are no runs found, there's no need to proceed with cancellation, so return nil.
204-
if total == 0 {
205-
return nil
206-
}
207-
208-
// Iterate over each found run and cancel its associated jobs.
209-
for _, run := range runs {
210-
// Find all jobs associated with the current run.
211-
jobs, err := db.Find[ActionRunJob](ctx, FindRunJobOptions{
212-
RunID: run.ID,
213-
})
214-
if err != nil {
215-
return err
216-
}
217-
218-
// Iterate over each job and attempt to cancel it.
219-
for _, job := range jobs {
220-
// Skip jobs that are already in a terminal state (completed, cancelled, etc.).
221-
status := job.Status
222-
if status.IsDone() {
223-
continue
224-
}
225-
226-
// If the job has no associated task (probably an error), set its status to 'Cancelled' and stop it.
227-
if job.TaskID == 0 {
228-
job.Status = StatusCancelled
229-
job.Stopped = timeutil.TimeStampNow()
230-
231-
// Update the job's status and stopped time in the database.
232-
n, err := UpdateRunJob(ctx, job, builder.Eq{"task_id": 0}, "status", "stopped")
233-
if err != nil {
234-
return err
235-
}
236-
237-
// If the update affected 0 rows, it means the job has changed in the meantime, so we need to try again.
238-
if n == 0 {
239-
return fmt.Errorf("job has changed, try again")
240-
}
241-
242-
// Continue with the next job.
243-
continue
244-
}
245-
246-
// If the job has an associated task, try to stop the task, effectively cancelling the job.
247-
if err := StopTask(ctx, job.TaskID, StatusCancelled); err != nil {
248-
return err
249-
}
250-
}
251-
}
252-
253-
// Return nil to indicate successful cancellation of all running and waiting jobs.
254-
return nil
255-
}
256-
257188
// InsertRun inserts a run
258189
// The title will be cut off at 255 characters if it's longer than 255 characters.
259190
func InsertRun(ctx context.Context, run *ActionRun, jobs []*jobparser.SingleWorkflow) error {

models/actions/schedule.go

Lines changed: 0 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@ package actions
55

66
import (
77
"context"
8-
"fmt"
98
"time"
109

1110
"forgejo.org/models/db"
@@ -119,27 +118,6 @@ func DeleteScheduleTaskByRepo(ctx context.Context, id int64) error {
119118
return committer.Commit()
120119
}
121120

122-
func CleanRepoScheduleTasks(ctx context.Context, repo *repo_model.Repository, cancelPreviousJobs bool) error {
123-
// If actions disabled when there is schedule task, this will remove the outdated schedule tasks
124-
// There is no other place we can do this because the app.ini will be changed manually
125-
if err := DeleteScheduleTaskByRepo(ctx, repo.ID); err != nil {
126-
return fmt.Errorf("DeleteCronTaskByRepo: %v", err)
127-
}
128-
if cancelPreviousJobs {
129-
// cancel running cron jobs of this repository and delete old schedules
130-
if err := CancelPreviousJobs(
131-
ctx,
132-
repo.ID,
133-
repo.DefaultBranch,
134-
"",
135-
webhook_module.HookEventSchedule,
136-
); err != nil {
137-
return fmt.Errorf("CancelPreviousJobs: %v", err)
138-
}
139-
}
140-
return nil
141-
}
142-
143121
type FindScheduleOptions struct {
144122
db.ListOptions
145123
RepoID int64

models/actions/task.go

Lines changed: 0 additions & 143 deletions
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,8 @@ import (
1717
"forgejo.org/modules/timeutil"
1818
"forgejo.org/modules/util"
1919

20-
runnerv1 "code.gitea.io/actions-proto-go/runner/v1"
2120
lru "github.com/hashicorp/golang-lru/v2"
2221
"github.com/nektos/act/pkg/jobparser"
23-
"google.golang.org/protobuf/types/known/timestamppb"
2422
"xorm.io/builder"
2523
)
2624

@@ -337,140 +335,6 @@ func UpdateTask(ctx context.Context, task *ActionTask, cols ...string) error {
337335
return err
338336
}
339337

340-
// UpdateTaskByState updates the task by the state.
341-
// It will always update the task if the state is not final, even there is no change.
342-
// So it will update ActionTask.Updated to avoid the task being judged as a zombie task.
343-
func UpdateTaskByState(ctx context.Context, runnerID int64, state *runnerv1.TaskState) (*ActionTask, error) {
344-
stepStates := map[int64]*runnerv1.StepState{}
345-
for _, v := range state.Steps {
346-
stepStates[v.Id] = v
347-
}
348-
349-
ctx, commiter, err := db.TxContext(ctx)
350-
if err != nil {
351-
return nil, err
352-
}
353-
defer commiter.Close()
354-
355-
e := db.GetEngine(ctx)
356-
357-
task := &ActionTask{}
358-
if has, err := e.ID(state.Id).Get(task); err != nil {
359-
return nil, err
360-
} else if !has {
361-
return nil, util.ErrNotExist
362-
} else if runnerID != task.RunnerID {
363-
return nil, fmt.Errorf("invalid runner for task")
364-
}
365-
366-
if task.Status.IsDone() {
367-
// the state is final, do nothing
368-
return task, nil
369-
}
370-
371-
// state.Result is not unspecified means the task is finished
372-
if state.Result != runnerv1.Result_RESULT_UNSPECIFIED {
373-
task.Status = Status(state.Result)
374-
task.Stopped = timeutil.TimeStamp(state.StoppedAt.AsTime().Unix())
375-
if err := UpdateTask(ctx, task, "status", "stopped"); err != nil {
376-
return nil, err
377-
}
378-
if _, err := UpdateRunJob(ctx, &ActionRunJob{
379-
ID: task.JobID,
380-
Status: task.Status,
381-
Stopped: task.Stopped,
382-
}, nil); err != nil {
383-
return nil, err
384-
}
385-
} else {
386-
// Force update ActionTask.Updated to avoid the task being judged as a zombie task
387-
task.Updated = timeutil.TimeStampNow()
388-
if err := UpdateTask(ctx, task, "updated"); err != nil {
389-
return nil, err
390-
}
391-
}
392-
393-
if err := task.LoadAttributes(ctx); err != nil {
394-
return nil, err
395-
}
396-
397-
for _, step := range task.Steps {
398-
var result runnerv1.Result
399-
if v, ok := stepStates[step.Index]; ok {
400-
result = v.Result
401-
step.LogIndex = v.LogIndex
402-
step.LogLength = v.LogLength
403-
step.Started = convertTimestamp(v.StartedAt)
404-
step.Stopped = convertTimestamp(v.StoppedAt)
405-
}
406-
if result != runnerv1.Result_RESULT_UNSPECIFIED {
407-
step.Status = Status(result)
408-
} else if step.Started != 0 {
409-
step.Status = StatusRunning
410-
}
411-
if _, err := e.ID(step.ID).Update(step); err != nil {
412-
return nil, err
413-
}
414-
}
415-
416-
if err := commiter.Commit(); err != nil {
417-
return nil, err
418-
}
419-
420-
return task, nil
421-
}
422-
423-
func StopTask(ctx context.Context, taskID int64, status Status) error {
424-
if !status.IsDone() {
425-
return fmt.Errorf("cannot stop task with status %v", status)
426-
}
427-
e := db.GetEngine(ctx)
428-
429-
task := &ActionTask{}
430-
if has, err := e.ID(taskID).Get(task); err != nil {
431-
return err
432-
} else if !has {
433-
return util.ErrNotExist
434-
}
435-
if task.Status.IsDone() {
436-
return nil
437-
}
438-
439-
now := timeutil.TimeStampNow()
440-
task.Status = status
441-
task.Stopped = now
442-
if _, err := UpdateRunJob(ctx, &ActionRunJob{
443-
ID: task.JobID,
444-
Status: task.Status,
445-
Stopped: task.Stopped,
446-
}, nil); err != nil {
447-
return err
448-
}
449-
450-
if err := UpdateTask(ctx, task, "status", "stopped"); err != nil {
451-
return err
452-
}
453-
454-
if err := task.LoadAttributes(ctx); err != nil {
455-
return err
456-
}
457-
458-
for _, step := range task.Steps {
459-
if !step.Status.IsDone() {
460-
step.Status = status
461-
if step.Started == 0 {
462-
step.Started = now
463-
}
464-
step.Stopped = now
465-
}
466-
if _, err := e.ID(step.ID).Update(step); err != nil {
467-
return err
468-
}
469-
}
470-
471-
return nil
472-
}
473-
474338
func FindOldTasksToExpire(ctx context.Context, olderThan timeutil.TimeStamp, limit int) ([]*ActionTask, error) {
475339
e := db.GetEngine(ctx)
476340

@@ -481,13 +345,6 @@ func FindOldTasksToExpire(ctx context.Context, olderThan timeutil.TimeStamp, lim
481345
Find(&tasks)
482346
}
483347

484-
func convertTimestamp(timestamp *timestamppb.Timestamp) timeutil.TimeStamp {
485-
if timestamp.GetSeconds() == 0 && timestamp.GetNanos() == 0 {
486-
return timeutil.TimeStamp(0)
487-
}
488-
return timeutil.TimeStamp(timestamp.AsTime().Unix())
489-
}
490-
491348
func logFileName(repoFullName string, taskID int64) string {
492349
ret := fmt.Sprintf("%s/%02x/%d.log", repoFullName, taskID%256, taskID)
493350

routers/api/actions/runner/runner.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -178,7 +178,7 @@ func (s *Service) UpdateTask(
178178
) (*connect.Response[runnerv1.UpdateTaskResponse], error) {
179179
runner := GetRunner(ctx)
180180

181-
task, err := actions_model.UpdateTaskByState(ctx, runner.ID, req.Msg.State)
181+
task, err := actions_service.UpdateTaskByState(ctx, runner.ID, req.Msg.State)
182182
if err != nil {
183183
return nil, connect.NewError(connect.CodeInternal, fmt.Errorf("update task: %w", err))
184184
}

routers/api/v1/repo/repo.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@ import (
1111
"strings"
1212
"time"
1313

14-
actions_model "forgejo.org/models/actions"
1514
activities_model "forgejo.org/models/activities"
1615
"forgejo.org/models/db"
1716
"forgejo.org/models/organization"
@@ -1065,7 +1064,7 @@ func updateRepoArchivedState(ctx *context.APIContext, opts api.EditRepoOption) e
10651064
ctx.Error(http.StatusInternalServerError, "ArchiveRepoState", err)
10661065
return err
10671066
}
1068-
if err := actions_model.CleanRepoScheduleTasks(ctx, repo, true); err != nil {
1067+
if err := actions_service.CleanRepoScheduleTasks(ctx, repo, true); err != nil {
10691068
log.Error("CleanRepoScheduleTasks for archived repo %s/%s: %v", ctx.Repo.Owner.Name, repo.Name, err)
10701069
}
10711070
log.Trace("Repository was archived: %s/%s", ctx.Repo.Owner.Name, repo.Name)

routers/web/repo/actions/view.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -521,7 +521,7 @@ func Cancel(ctx *context_module.Context) {
521521
}
522522
continue
523523
}
524-
if err := actions_model.StopTask(ctx, job.TaskID, actions_model.StatusCancelled); err != nil {
524+
if err := actions_service.StopTask(ctx, job.TaskID, actions_model.StatusCancelled); err != nil {
525525
return err
526526
}
527527
}

routers/web/repo/setting/setting.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@ import (
1414
"time"
1515

1616
"forgejo.org/models"
17-
actions_model "forgejo.org/models/actions"
1817
"forgejo.org/models/db"
1918
"forgejo.org/models/organization"
2019
quota_model "forgejo.org/models/quota"
@@ -1034,7 +1033,7 @@ func SettingsPost(ctx *context.Context) {
10341033
return
10351034
}
10361035

1037-
if err := actions_model.CleanRepoScheduleTasks(ctx, repo, true); err != nil {
1036+
if err := actions_service.CleanRepoScheduleTasks(ctx, repo, true); err != nil {
10381037
log.Error("CleanRepoScheduleTasks for archived repo %s/%s: %v", ctx.Repo.Owner.Name, repo.Name, err)
10391038
}
10401039

services/actions/clear_tasks.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ func stopTasks(ctx context.Context, opts actions_model.FindTaskOptions) error {
4141
jobs := make([]*actions_model.ActionRunJob, 0, len(tasks))
4242
for _, task := range tasks {
4343
if err := db.WithTx(ctx, func(ctx context.Context) error {
44-
if err := actions_model.StopTask(ctx, task.ID, actions_model.StatusFailure); err != nil {
44+
if err := StopTask(ctx, task.ID, actions_model.StatusFailure); err != nil {
4545
return err
4646
}
4747
if err := task.LoadJob(ctx); err != nil {

services/actions/notifier_helper.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -139,7 +139,7 @@ func notify(ctx context.Context, input *notifyInput) error {
139139
return nil
140140
}
141141
if unit_model.TypeActions.UnitGlobalDisabled() {
142-
if err := actions_model.CleanRepoScheduleTasks(ctx, input.Repo, true); err != nil {
142+
if err := CleanRepoScheduleTasks(ctx, input.Repo, true); err != nil {
143143
log.Error("CleanRepoScheduleTasks: %v", err)
144144
}
145145
return nil
@@ -373,7 +373,7 @@ func handleWorkflows(
373373
// cancel running jobs if the event is push or pull_request_sync
374374
if run.Event == webhook_module.HookEventPush ||
375375
run.Event == webhook_module.HookEventPullRequestSync {
376-
if err := actions_model.CancelPreviousJobs(
376+
if err := CancelPreviousJobs(
377377
ctx,
378378
run.RepoID,
379379
run.Ref,
@@ -504,7 +504,7 @@ func handleSchedules(
504504
log.Error("CountSchedules: %v", err)
505505
return err
506506
} else if count > 0 {
507-
if err := actions_model.CleanRepoScheduleTasks(ctx, input.Repo, false); err != nil {
507+
if err := CleanRepoScheduleTasks(ctx, input.Repo, false); err != nil {
508508
log.Error("CleanRepoScheduleTasks: %v", err)
509509
}
510510
}

0 commit comments

Comments
 (0)