Skip to content

Commit 341c5d8

Browse files
committed
fix maybe
1 parent bfc4f6e commit 341c5d8

File tree

2 files changed

+93
-67
lines changed

2 files changed

+93
-67
lines changed

services/actions/clear_tasks.go

Lines changed: 13 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,7 @@ func stopTasks(ctx context.Context, opts actions_model.FindTaskOptions) error {
9999
return nil
100100
}
101101

102-
// CancelAbandonedJobs cancels the jobs which have waiting status, but haven't been picked by a runner for a long time
102+
// CancelAbandonedJobs cancels jobs that have not been picked by any runner for a long time
103103
func CancelAbandonedJobs(ctx context.Context) error {
104104
jobs, err := db.Find[actions_model.ActionRunJob](ctx, actions_model.FindRunJobOptions{
105105
Statuses: []actions_model.Status{actions_model.StatusWaiting, actions_model.StatusBlocked},
@@ -121,31 +121,29 @@ func CancelAbandonedJobs(ctx context.Context) error {
121121
updated := false
122122
if err := db.WithTx(ctx, func(ctx context.Context) error {
123123
n, err := actions_model.UpdateRunJob(ctx, job, nil, "status", "stopped")
124-
updated = err == nil && n > 0
125-
return err
124+
if err != nil {
125+
return err
126+
}
127+
if err := job.LoadAttributes(ctx); err != nil {
128+
return err
129+
}
130+
updated = n > 0
131+
if updated && job.Run.Status.IsDone() {
132+
updatedRuns[job.RunID] = job
133+
}
134+
return nil
126135
}); err != nil {
127136
log.Warn("cancel abandoned job %v: %v", job.ID, err)
128137
// go on
129138
}
130139
CreateCommitStatus(ctx, job)
131140
if updated {
132-
updatedRuns[job.RunID] = job
133141
notify_service.WorkflowJobStatusUpdate(ctx, job.Run.Repo, job.Run.TriggerUser, job, nil)
134142
}
135143
}
136144

137145
for _, job := range updatedRuns {
138-
c, err := db.Count[actions_model.ActionRunJob](ctx, actions_model.FindRunJobOptions{
139-
RunID: job.RunID,
140-
Statuses: []actions_model.Status{actions_model.StatusWaiting, actions_model.StatusBlocked, actions_model.StatusRunning},
141-
})
142-
if err != nil {
143-
log.Error("Count waiting jobs for run %d: %v", job.RunID, err)
144-
continue
145-
}
146-
if c == 0 {
147-
NotifyWorkflowRunStatusUpdateWithReload(ctx, job)
148-
}
146+
notify_service.WorkflowRunStatusUpdate(ctx, job.Run.Repo, job.Run.TriggerUser, job.Run)
149147
}
150148

151149
return nil

tests/integration/repo_webhook_test.go

Lines changed: 80 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -1105,21 +1105,6 @@ type workflowRunWebhook struct {
11051105
}
11061106

11071107
func Test_WebhookWorkflowRun(t *testing.T) {
1108-
webhookData := &workflowRunWebhook{}
1109-
provider := newMockWebhookProvider(func(r *http.Request) {
1110-
assert.Contains(t, r.Header["X-Github-Event-Type"], "workflow_run", "X-GitHub-Event-Type should contain workflow_run")
1111-
assert.Contains(t, r.Header["X-Gitea-Event-Type"], "workflow_run", "X-Gitea-Event-Type should contain workflow_run")
1112-
assert.Contains(t, r.Header["X-Gogs-Event-Type"], "workflow_run", "X-Gogs-Event-Type should contain workflow_run")
1113-
content, _ := io.ReadAll(r.Body)
1114-
var payload api.WorkflowRunPayload
1115-
err := json.Unmarshal(content, &payload)
1116-
assert.NoError(t, err)
1117-
webhookData.payloads = append(webhookData.payloads, payload)
1118-
webhookData.triggeredEvent = "workflow_run"
1119-
}, http.StatusOK)
1120-
defer provider.Close()
1121-
webhookData.URL = provider.URL()
1122-
11231108
testCases := []struct {
11241109
name string
11251110
testFunc func(t *testing.T, webhookData *workflowRunWebhook)
@@ -1141,23 +1126,29 @@ func Test_WebhookWorkflowRun(t *testing.T) {
11411126
testFunc: testWorkflowRunEventsOnRerun,
11421127
},
11431128
{
1144-
name: "WorkflowRunEventsOnCancellingAllJobsAbandonedRun",
1145-
testFunc: func(t *testing.T, webhookData *workflowRunWebhook) {
1146-
testWorkflowRunEventsOnCancellingAbandonedRun(t, webhookData, true)
1147-
},
1148-
},
1149-
{
1150-
name: "WorkflowRunEventsOnCancellingPartiallyAbandonedRun",
1151-
testFunc: func(t *testing.T, webhookData *workflowRunWebhook) {
1152-
testWorkflowRunEventsOnCancellingAbandonedRun(t, webhookData, false)
1153-
},
1129+
name: "WorkflowRunEventsOnCancellingAbandonedRun",
1130+
testFunc: testWorkflowRunEventsOnCancellingAbandonedRun,
11541131
},
11551132
}
11561133
for _, obj := range testCases {
11571134
t.Run(obj.name, func(t *testing.T) {
1158-
webhookData.payloads = nil
1159-
webhookData.triggeredEvent = ""
11601135
onGiteaRun(t, func(t *testing.T, giteaURL *url.URL) {
1136+
webhookData := &workflowRunWebhook{}
1137+
provider := newMockWebhookProvider(func(r *http.Request) {
1138+
assert.Contains(t, r.Header["X-Github-Event-Type"], "workflow_run", "X-GitHub-Event-Type should contain workflow_run")
1139+
assert.Contains(t, r.Header["X-Gitea-Event-Type"], "workflow_run", "X-Gitea-Event-Type should contain workflow_run")
1140+
assert.Contains(t, r.Header["X-Gogs-Event-Type"], "workflow_run", "X-Gogs-Event-Type should contain workflow_run")
1141+
content, _ := io.ReadAll(r.Body)
1142+
var payload api.WorkflowRunPayload
1143+
err := json.Unmarshal(content, &payload)
1144+
assert.NoError(t, err)
1145+
webhookData.payloads = append(webhookData.payloads, payload)
1146+
webhookData.triggeredEvent = "workflow_run"
1147+
}, http.StatusOK)
1148+
defer provider.Close()
1149+
webhookData.URL = provider.URL()
1150+
webhookData.payloads = nil
1151+
webhookData.triggeredEvent = ""
11611152
obj.testFunc(t, webhookData)
11621153
})
11631154
})
@@ -1429,32 +1420,35 @@ jobs:
14291420
assert.Len(t, webhookData.payloads, 3)
14301421
}
14311422

1432-
func testWorkflowRunEventsOnCancellingAbandonedRun(t *testing.T, webhookData *workflowRunWebhook, allJobsAbandoned bool) {
1433-
defer test.MockVariableValue(&setting.Actions.AbandonedJobTimeout, (time.Duration)(0))()
1423+
func testWorkflowRunEventsOnCancellingAbandonedRun(t *testing.T, webhookData *workflowRunWebhook) {
1424+
defer test.MockVariableValue(&setting.Actions.AbandonedJobTimeout, 0*time.Nanosecond)()
14341425

14351426
// 1. create a new webhook with special webhook for repo1
14361427
user2 := unittest.AssertExistsAndLoadBean(t, &user_model.User{ID: 2})
14371428
session := loginUser(t, "user2")
14381429
token := getTokenForLoggedInUser(t, session, auth_model.AccessTokenScopeWriteRepository, auth_model.AccessTokenScopeWriteUser)
14391430

1431+
repoName := "test-workflow-run-cancelling-abandoned-run"
1432+
testRepo := unittest.AssertExistsAndLoadBean(t, &repo.Repository{ID: createActionsTestRepo(t, token, repoName, false).ID})
1433+
14401434
runners := make([]*mockRunner, 2)
14411435
for i := range runners {
14421436
runners[i] = newMockRunner()
1443-
runners[i].registerAsRepoRunner(t, "user2", "repo1", fmt.Sprintf("mock-runner-%d", i), []string{"ubuntu-latest"}, false)
1437+
runners[i].registerAsRepoRunner(t, "user2", repoName, fmt.Sprintf("mock-runner-%d", i), []string{"ubuntu-latest"}, false)
14441438
}
14451439

1446-
testAPICreateWebhookForRepo(t, session, "user2", "repo1", webhookData.URL, "workflow_run")
1447-
1448-
repo1 := unittest.AssertExistsAndLoadBean(t, &repo.Repository{ID: 1})
1440+
testAPICreateWebhookForRepo(t, session, "user2", repoName, webhookData.URL, "workflow_run")
14491441

1450-
gitRepo1, err := gitrepo.OpenRepository(t.Context(), repo1)
1442+
ctx := t.Context()
1443+
gitRepo, err := gitrepo.OpenRepository(ctx, testRepo)
14511444
assert.NoError(t, err)
14521445

14531446
// 2.2 trigger the webhooks
14541447

14551448
// add workflow file to the repo
14561449
// init the workflow
1457-
wfTreePath := ".gitea/workflows/push.yml"
1450+
wfilename := "push.yml"
1451+
wfTreePath := ".gitea/workflows/" + wfilename
14581452
wfFileContent := `on:
14591453
push:
14601454
workflow_dispatch:
@@ -1521,39 +1515,73 @@ jobs:
15211515
runs-on: ubuntu-latest
15221516
steps:
15231517
- run: exit 0`
1524-
opts := getWorkflowCreateFileOptions(user2, repo1.DefaultBranch, "create "+wfTreePath, wfFileContent)
1525-
createWorkflowFile(t, token, "user2", "repo1", wfTreePath, opts)
15261518

1527-
commitID, err := gitRepo1.GetBranchCommitID(repo1.DefaultBranch)
1519+
opts := getWorkflowCreateFileOptions(user2, testRepo.DefaultBranch, "create "+wfTreePath, wfFileContent)
1520+
createWorkflowFile(t, token, "user2", repoName, wfTreePath, opts)
1521+
1522+
commitID, err := gitRepo.GetBranchCommitID(testRepo.DefaultBranch)
15281523
assert.NoError(t, err)
15291524

15301525
// 3. validate the webhook is triggered
15311526
assert.Equal(t, "workflow_run", webhookData.triggeredEvent)
15321527
assert.Len(t, webhookData.payloads, 1)
15331528
assert.Equal(t, "requested", webhookData.payloads[0].Action)
15341529
assert.Equal(t, "queued", webhookData.payloads[0].WorkflowRun.Status)
1535-
assert.Equal(t, repo1.DefaultBranch, webhookData.payloads[0].WorkflowRun.HeadBranch)
1530+
assert.Equal(t, testRepo.DefaultBranch, webhookData.payloads[0].WorkflowRun.HeadBranch)
15361531
assert.Equal(t, commitID, webhookData.payloads[0].WorkflowRun.HeadSha)
1537-
assert.Equal(t, "repo1", webhookData.payloads[0].Repo.Name)
1538-
assert.Equal(t, "user2/repo1", webhookData.payloads[0].Repo.FullName)
1532+
assert.Equal(t, repoName, webhookData.payloads[0].Repo.Name)
1533+
assert.Equal(t, "user2/"+repoName, webhookData.payloads[0].Repo.FullName)
15391534

15401535
for _, runner := range runners {
1541-
task := runner.fetchTask(t)
1542-
if !allJobsAbandoned {
1543-
runner.execTask(t, task, &mockTaskOutcome{
1544-
result: runnerv1.Result_RESULT_SUCCESS,
1545-
})
1546-
}
1536+
runner.fetchTask(t)
15471537
}
15481538

1549-
err = actions.CancelAbandonedJobs(t.Context())
1539+
err = actions.CancelAbandonedJobs(ctx)
15501540
assert.NoError(t, err)
1541+
assert.Len(t, webhookData.payloads, 2)
1542+
assert.Equal(t, "completed", webhookData.payloads[1].Action)
1543+
assert.Equal(t, "completed", webhookData.payloads[1].WorkflowRun.Status)
1544+
assert.Equal(t, testRepo.DefaultBranch, webhookData.payloads[1].WorkflowRun.HeadBranch)
1545+
assert.Equal(t, commitID, webhookData.payloads[1].WorkflowRun.HeadSha)
1546+
assert.Equal(t, repoName, webhookData.payloads[1].Repo.Name)
1547+
assert.Equal(t, "user2/"+repoName, webhookData.payloads[1].Repo.FullName)
15511548

1552-
if allJobsAbandoned {
1553-
assert.Len(t, webhookData.payloads, 1)
1554-
} else {
1555-
assert.Len(t, webhookData.payloads, 2)
1549+
apiReqValues := url.Values{}
1550+
apiReqValues.Set("ref", testRepo.DefaultBranch)
1551+
req := NewRequestWithURLValues(t, "POST",
1552+
fmt.Sprintf("/api/v1/repos/%s/actions/workflows/%s/dispatches", testRepo.FullName(), wfilename),
1553+
apiReqValues).AddTokenAuth(token)
1554+
MakeRequest(t, req, http.StatusNoContent)
1555+
1556+
for i := range runners {
1557+
runners[i] = newMockRunner()
1558+
runners[i].registerAsRepoRunner(t, "user2", repoName, fmt.Sprintf("mock-runner-2-%d", i), []string{"ubuntu-latest"}, false)
15561559
}
1560+
1561+
assert.Len(t, webhookData.payloads, 3)
1562+
assert.Equal(t, "requested", webhookData.payloads[2].Action)
1563+
assert.Equal(t, "queued", webhookData.payloads[2].WorkflowRun.Status)
1564+
assert.Equal(t, testRepo.DefaultBranch, webhookData.payloads[2].WorkflowRun.HeadBranch)
1565+
assert.Equal(t, commitID, webhookData.payloads[2].WorkflowRun.HeadSha)
1566+
assert.Equal(t, repoName, webhookData.payloads[2].Repo.Name)
1567+
assert.Equal(t, "user2/"+repoName, webhookData.payloads[2].Repo.FullName)
1568+
1569+
for _, runner := range runners {
1570+
task := runner.fetchTask(t)
1571+
runner.execTask(t, task, &mockTaskOutcome{
1572+
result: runnerv1.Result_RESULT_SUCCESS,
1573+
})
1574+
}
1575+
1576+
err = actions.CancelAbandonedJobs(ctx)
1577+
assert.NoError(t, err)
1578+
assert.Len(t, webhookData.payloads, 4)
1579+
assert.Equal(t, "completed", webhookData.payloads[3].Action)
1580+
assert.Equal(t, "completed", webhookData.payloads[3].WorkflowRun.Status)
1581+
assert.Equal(t, testRepo.DefaultBranch, webhookData.payloads[3].WorkflowRun.HeadBranch)
1582+
assert.Equal(t, commitID, webhookData.payloads[3].WorkflowRun.HeadSha)
1583+
assert.Equal(t, repoName, webhookData.payloads[3].Repo.Name)
1584+
assert.Equal(t, "user2/"+repoName, webhookData.payloads[3].Repo.FullName)
15571585
}
15581586

15591587
func testWebhookWorkflowRun(t *testing.T, webhookData *workflowRunWebhook) {

0 commit comments

Comments
 (0)