Skip to content

Commit cdb4682

Browse files
feat: move UpdateTaskByState to services
This function is also can't be in models in order to enable calling the action run state change notification channel.
1 parent 81b5c7c commit cdb4682

File tree

3 files changed

+92
-93
lines changed

3 files changed

+92
-93
lines changed

models/actions/task.go

Lines changed: 0 additions & 92 deletions
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,8 @@ import (
1717
"forgejo.org/modules/timeutil"
1818
"forgejo.org/modules/util"
1919

20-
runnerv1 "code.gitea.io/actions-proto-go/runner/v1"
2120
lru "github.com/hashicorp/golang-lru/v2"
2221
"github.com/nektos/act/pkg/jobparser"
23-
"google.golang.org/protobuf/types/known/timestamppb"
2422
"xorm.io/builder"
2523
)
2624

@@ -337,89 +335,6 @@ func UpdateTask(ctx context.Context, task *ActionTask, cols ...string) error {
337335
return err
338336
}
339337

340-
// UpdateTaskByState updates the task by the state.
341-
// It will always update the task if the state is not final, even there is no change.
342-
// So it will update ActionTask.Updated to avoid the task being judged as a zombie task.
343-
func UpdateTaskByState(ctx context.Context, runnerID int64, state *runnerv1.TaskState) (*ActionTask, error) {
344-
stepStates := map[int64]*runnerv1.StepState{}
345-
for _, v := range state.Steps {
346-
stepStates[v.Id] = v
347-
}
348-
349-
ctx, commiter, err := db.TxContext(ctx)
350-
if err != nil {
351-
return nil, err
352-
}
353-
defer commiter.Close()
354-
355-
e := db.GetEngine(ctx)
356-
357-
task := &ActionTask{}
358-
if has, err := e.ID(state.Id).Get(task); err != nil {
359-
return nil, err
360-
} else if !has {
361-
return nil, util.ErrNotExist
362-
} else if runnerID != task.RunnerID {
363-
return nil, fmt.Errorf("invalid runner for task")
364-
}
365-
366-
if task.Status.IsDone() {
367-
// the state is final, do nothing
368-
return task, nil
369-
}
370-
371-
// state.Result is not unspecified means the task is finished
372-
if state.Result != runnerv1.Result_RESULT_UNSPECIFIED {
373-
task.Status = Status(state.Result)
374-
task.Stopped = timeutil.TimeStamp(state.StoppedAt.AsTime().Unix())
375-
if err := UpdateTask(ctx, task, "status", "stopped"); err != nil {
376-
return nil, err
377-
}
378-
if _, err := UpdateRunJob(ctx, &ActionRunJob{
379-
ID: task.JobID,
380-
Status: task.Status,
381-
Stopped: task.Stopped,
382-
}, nil); err != nil {
383-
return nil, err
384-
}
385-
} else {
386-
// Force update ActionTask.Updated to avoid the task being judged as a zombie task
387-
task.Updated = timeutil.TimeStampNow()
388-
if err := UpdateTask(ctx, task, "updated"); err != nil {
389-
return nil, err
390-
}
391-
}
392-
393-
if err := task.LoadAttributes(ctx); err != nil {
394-
return nil, err
395-
}
396-
397-
for _, step := range task.Steps {
398-
var result runnerv1.Result
399-
if v, ok := stepStates[step.Index]; ok {
400-
result = v.Result
401-
step.LogIndex = v.LogIndex
402-
step.LogLength = v.LogLength
403-
step.Started = convertTimestamp(v.StartedAt)
404-
step.Stopped = convertTimestamp(v.StoppedAt)
405-
}
406-
if result != runnerv1.Result_RESULT_UNSPECIFIED {
407-
step.Status = Status(result)
408-
} else if step.Started != 0 {
409-
step.Status = StatusRunning
410-
}
411-
if _, err := e.ID(step.ID).Update(step); err != nil {
412-
return nil, err
413-
}
414-
}
415-
416-
if err := commiter.Commit(); err != nil {
417-
return nil, err
418-
}
419-
420-
return task, nil
421-
}
422-
423338
func FindOldTasksToExpire(ctx context.Context, olderThan timeutil.TimeStamp, limit int) ([]*ActionTask, error) {
424339
e := db.GetEngine(ctx)
425340

@@ -430,13 +345,6 @@ func FindOldTasksToExpire(ctx context.Context, olderThan timeutil.TimeStamp, lim
430345
Find(&tasks)
431346
}
432347

433-
func convertTimestamp(timestamp *timestamppb.Timestamp) timeutil.TimeStamp {
434-
if timestamp.GetSeconds() == 0 && timestamp.GetNanos() == 0 {
435-
return timeutil.TimeStamp(0)
436-
}
437-
return timeutil.TimeStamp(timestamp.AsTime().Unix())
438-
}
439-
440348
func logFileName(repoFullName string, taskID int64) string {
441349
ret := fmt.Sprintf("%s/%02x/%d.log", repoFullName, taskID%256, taskID)
442350

routers/api/actions/runner/runner.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -178,7 +178,7 @@ func (s *Service) UpdateTask(
178178
) (*connect.Response[runnerv1.UpdateTaskResponse], error) {
179179
runner := GetRunner(ctx)
180180

181-
task, err := actions_model.UpdateTaskByState(ctx, runner.ID, req.Msg.State)
181+
task, err := actions_service.UpdateTaskByState(ctx, runner.ID, req.Msg.State)
182182
if err != nil {
183183
return nil, connect.NewError(connect.CodeInternal, fmt.Errorf("update task: %w", err))
184184
}

services/actions/task.go

Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ import (
1515

1616
runnerv1 "code.gitea.io/actions-proto-go/runner/v1"
1717
"google.golang.org/protobuf/types/known/structpb"
18+
"google.golang.org/protobuf/types/known/timestamppb"
1819
)
1920

2021
func PickTask(ctx context.Context, runner *actions_model.ActionRunner) (*runnerv1.Task, bool, error) {
@@ -158,3 +159,93 @@ func StopTask(ctx context.Context, taskID int64, status actions_model.Status) er
158159

159160
return nil
160161
}
162+
163+
// UpdateTaskByState updates the task by the state.
164+
// It will always update the task if the state is not final, even there is no change.
165+
// So it will update ActionTask.Updated to avoid the task being judged as a zombie task.
166+
func UpdateTaskByState(ctx context.Context, runnerID int64, state *runnerv1.TaskState) (*actions_model.ActionTask, error) {
167+
stepStates := map[int64]*runnerv1.StepState{}
168+
for _, v := range state.Steps {
169+
stepStates[v.Id] = v
170+
}
171+
172+
ctx, commiter, err := db.TxContext(ctx)
173+
if err != nil {
174+
return nil, err
175+
}
176+
defer commiter.Close()
177+
178+
e := db.GetEngine(ctx)
179+
180+
task := &actions_model.ActionTask{}
181+
if has, err := e.ID(state.Id).Get(task); err != nil {
182+
return nil, err
183+
} else if !has {
184+
return nil, util.ErrNotExist
185+
} else if runnerID != task.RunnerID {
186+
return nil, fmt.Errorf("invalid runner for task")
187+
}
188+
189+
if task.Status.IsDone() {
190+
// the state is final, do nothing
191+
return task, nil
192+
}
193+
194+
// state.Result is not unspecified means the task is finished
195+
if state.Result != runnerv1.Result_RESULT_UNSPECIFIED {
196+
task.Status = actions_model.Status(state.Result)
197+
task.Stopped = timeutil.TimeStamp(state.StoppedAt.AsTime().Unix())
198+
if err := actions_model.UpdateTask(ctx, task, "status", "stopped"); err != nil {
199+
return nil, err
200+
}
201+
if _, err := actions_model.UpdateRunJob(ctx, &actions_model.ActionRunJob{
202+
ID: task.JobID,
203+
Status: task.Status,
204+
Stopped: task.Stopped,
205+
}, nil); err != nil {
206+
return nil, err
207+
}
208+
} else {
209+
// Force update ActionTask.Updated to avoid the task being judged as a zombie task
210+
task.Updated = timeutil.TimeStampNow()
211+
if err := actions_model.UpdateTask(ctx, task, "updated"); err != nil {
212+
return nil, err
213+
}
214+
}
215+
216+
if err := task.LoadAttributes(ctx); err != nil {
217+
return nil, err
218+
}
219+
220+
for _, step := range task.Steps {
221+
var result runnerv1.Result
222+
if v, ok := stepStates[step.Index]; ok {
223+
result = v.Result
224+
step.LogIndex = v.LogIndex
225+
step.LogLength = v.LogLength
226+
step.Started = convertTimestamp(v.StartedAt)
227+
step.Stopped = convertTimestamp(v.StoppedAt)
228+
}
229+
if result != runnerv1.Result_RESULT_UNSPECIFIED {
230+
step.Status = actions_model.Status(result)
231+
} else if step.Started != 0 {
232+
step.Status = actions_model.StatusRunning
233+
}
234+
if _, err := e.ID(step.ID).Update(step); err != nil {
235+
return nil, err
236+
}
237+
}
238+
239+
if err := commiter.Commit(); err != nil {
240+
return nil, err
241+
}
242+
243+
return task, nil
244+
}
245+
246+
func convertTimestamp(timestamp *timestamppb.Timestamp) timeutil.TimeStamp {
247+
if timestamp.GetSeconds() == 0 && timestamp.GetNanos() == 0 {
248+
return timeutil.TimeStamp(0)
249+
}
250+
return timeutil.TimeStamp(timestamp.AsTime().Unix())
251+
}

0 commit comments

Comments
 (0)