Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions models/actions/run_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ func UpdateRunJob(ctx context.Context, job *ActionRunJob, cond builder.Cond, col
if err != nil {
return 0, err
}
run.Status = aggregateJobStatus(jobs)
run.Status = AggregateJobStatus(jobs)
if run.Started.IsZero() && run.Status.IsRunning() {
run.Started = timeutil.TimeStampNow()
}
Expand All @@ -152,7 +152,7 @@ func UpdateRunJob(ctx context.Context, job *ActionRunJob, cond builder.Cond, col
return affected, nil
}

func aggregateJobStatus(jobs []*ActionRunJob) Status {
func AggregateJobStatus(jobs []*ActionRunJob) Status {
allDone := true
allWaiting := true
hasFailure := false
Expand Down
60 changes: 44 additions & 16 deletions routers/api/actions/runner/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,28 +162,56 @@ func findTaskNeeds(ctx context.Context, task *actions_model.ActionTask) (map[str
return nil, fmt.Errorf("FindRunJobs: %w", err)
}

ret := make(map[string]*runnerv1.TaskNeed, len(needs))
jobIDJobs := make(map[string][]*actions_model.ActionRunJob)
for _, job := range jobs {
if !needs.Contains(job.JobID) {
continue
}
if job.TaskID == 0 || !job.Status.IsDone() {
// it shouldn't happen, or the job has been rerun
jobIDJobs[job.JobID] = append(jobIDJobs[job.JobID], job)
}

ret := make(map[string]*runnerv1.TaskNeed, len(needs))
for jobID, jobsWithSameID := range jobIDJobs {
if !needs.Contains(jobID) {
continue
}
outputs := make(map[string]string)
got, err := actions_model.FindTaskOutputByTaskID(ctx, job.TaskID)
if err != nil {
return nil, fmt.Errorf("FindTaskOutputByTaskID: %w", err)
var jobOutputs map[string]string
for _, job := range jobsWithSameID {
if job.TaskID == 0 || !job.Status.IsDone() {
// it shouldn't happen, or the job has been rerun
continue
}
got, err := actions_model.FindTaskOutputByTaskID(ctx, job.TaskID)
if err != nil {
return nil, fmt.Errorf("FindTaskOutputByTaskID: %w", err)
}
outputs := make(map[string]string, len(got))
for _, v := range got {
outputs[v.OutputKey] = v.OutputValue
}
if len(jobOutputs) == 0 {
jobOutputs = outputs
} else {
jobOutputs = mergeTwoOutputs(outputs, jobOutputs)
}
}
for _, v := range got {
outputs[v.OutputKey] = v.OutputValue
}
ret[job.JobID] = &runnerv1.TaskNeed{
Outputs: outputs,
Result: runnerv1.Result(job.Status),
ret[jobID] = &runnerv1.TaskNeed{
Outputs: jobOutputs,
Result: runnerv1.Result(actions_model.AggregateJobStatus(jobsWithSameID)),
}
}

return ret, nil
}

// mergeTwoOutputs merges two outputs from two different ActionRunJobs
// Values with the same output name may be overridden. The user should ensure the output names are unique.
// See https://docs.github.com/en/actions/writing-workflows/workflow-syntax-for-github-actions#using-job-outputs-in-a-matrix-job
func mergeTwoOutputs(o1, o2 map[string]string) map[string]string {
ret := make(map[string]string, len(o1))
for k1, v1 := range o1 {
if len(v1) > 0 {
ret[k1] = v1
} else {
ret[k1] = o2[k1]
}
}
return ret
}
Loading